1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.message;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.nio.ByteBuffer;
24
25 import org.eclipse.jetty.util.BufferUtil;
26 import org.eclipse.jetty.websocket.common.events.AnnotatedEventDriver;
27
28
29
30
31 public class MessageInputStream extends InputStream implements MessageAppender
32 {
33 private static final int BUFFER_SIZE = 65535;
34
35
36
37 private static final int COMPACT_THRESHOLD = 5;
38 private final AnnotatedEventDriver driver;
39 private final ByteBuffer buf;
40 private int size;
41 private boolean finished;
42 private boolean needsNotification;
43 private int readPosition;
44
45 public MessageInputStream(AnnotatedEventDriver driver)
46 {
47 this.driver = driver;
48 this.buf = ByteBuffer.allocate(BUFFER_SIZE);
49 BufferUtil.clearToFill(this.buf);
50 size = 0;
51 readPosition = this.buf.position();
52 finished = false;
53 needsNotification = true;
54 }
55
56 @Override
57 public void appendMessage(ByteBuffer payload) throws IOException
58 {
59 if (finished)
60 {
61 throw new IOException("Cannot append to finished buffer");
62 }
63
64 if (payload == null)
65 {
66
67 return;
68 }
69
70 driver.getPolicy().assertValidMessageSize(size + payload.remaining());
71 size += payload.remaining();
72
73 synchronized (buf)
74 {
75
76
77
78 BufferUtil.put(payload,buf);
79 }
80
81 if (needsNotification)
82 {
83 needsNotification = true;
84 this.driver.onInputStream(this);
85 }
86 }
87
88 @Override
89 public void close() throws IOException
90 {
91 finished = true;
92 super.close();
93 }
94
95 @Override
96 public void messageComplete()
97 {
98 finished = true;
99 }
100
101 @Override
102 public int read() throws IOException
103 {
104 synchronized (buf)
105 {
106 byte b = buf.get(readPosition);
107 readPosition++;
108 if (readPosition <= (buf.limit() - COMPACT_THRESHOLD))
109 {
110 int curPos = buf.position();
111 buf.compact();
112 int offsetPos = buf.position() - curPos;
113 readPosition += offsetPos;
114 }
115 return b;
116 }
117 }
118 }