1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.message;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.nio.ByteBuffer;
24 import java.util.concurrent.BlockingDeque;
25 import java.util.concurrent.LinkedBlockingDeque;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicBoolean;
28
29 import org.eclipse.jetty.util.BufferUtil;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32
33
34
35
36
37
38 public class MessageInputStream extends InputStream implements MessageAppender
39 {
40 private static final Logger LOG = Log.getLogger(MessageInputStream.class);
41 private static final ByteBuffer EOF = ByteBuffer.allocate(0).asReadOnlyBuffer();
42
43 private final BlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
44 private AtomicBoolean closed = new AtomicBoolean(false);
45 private final long timeoutMs;
46 private ByteBuffer activeBuffer = null;
47
48 public MessageInputStream()
49 {
50 this(-1);
51 }
52
53 public MessageInputStream(int timeoutMs)
54 {
55 this.timeoutMs = timeoutMs;
56 }
57
58 @Override
59 public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException
60 {
61 if (LOG.isDebugEnabled())
62 {
63 LOG.debug("Appending {} chunk: {}",fin?"final":"non-final",BufferUtil.toDetailString(framePayload));
64 }
65
66
67 if (closed.get())
68 {
69 return;
70 }
71
72
73
74
75 try
76 {
77 if (framePayload == null)
78 {
79
80 return;
81 }
82
83 int capacity = framePayload.remaining();
84 if (capacity <= 0)
85 {
86
87 return;
88 }
89
90 ByteBuffer copy = framePayload.isDirect()?ByteBuffer.allocateDirect(capacity):ByteBuffer.allocate(capacity);
91 copy.put(framePayload).flip();
92 buffers.put(copy);
93 }
94 catch (InterruptedException e)
95 {
96 throw new IOException(e);
97 }
98 finally
99 {
100 if (fin)
101 {
102 buffers.offer(EOF);
103 }
104 }
105 }
106
107 @Override
108 public void close() throws IOException
109 {
110 if (closed.compareAndSet(false, true))
111 {
112 buffers.offer(EOF);
113 super.close();
114 }
115 }
116
117 @Override
118 public void mark(int readlimit)
119 {
120
121 }
122
123 @Override
124 public boolean markSupported()
125 {
126 return false;
127 }
128
129 @Override
130 public void messageComplete()
131 {
132 if (LOG.isDebugEnabled())
133 LOG.debug("Message completed");
134 buffers.offer(EOF);
135 }
136
137 @Override
138 public int read() throws IOException
139 {
140 try
141 {
142 if (closed.get())
143 {
144 if (LOG.isDebugEnabled())
145 LOG.debug("Stream closed");
146 return -1;
147 }
148
149
150 while (activeBuffer == null || !activeBuffer.hasRemaining())
151 {
152 if (LOG.isDebugEnabled())
153 LOG.debug("Waiting {} ms to read", timeoutMs);
154 if (timeoutMs < 0)
155 {
156
157 activeBuffer = buffers.take();
158 }
159 else
160 {
161
162 activeBuffer = buffers.poll(timeoutMs, TimeUnit.MILLISECONDS);
163 if (activeBuffer == null)
164 {
165 throw new IOException(String.format("Read timeout: %,dms expired", timeoutMs));
166 }
167 }
168
169 if (activeBuffer == EOF)
170 {
171 if (LOG.isDebugEnabled())
172 LOG.debug("Reached EOF");
173
174 closed.set(true);
175
176 buffers.clear();
177 return -1;
178 }
179 }
180
181 return activeBuffer.get() & 0xFF;
182 }
183 catch (InterruptedException x)
184 {
185 if (LOG.isDebugEnabled())
186 LOG.debug("Interrupted while waiting to read", x);
187 closed.set(true);
188 return -1;
189 }
190 }
191
192 @Override
193 public void reset() throws IOException
194 {
195 throw new IOException("reset() not supported");
196 }
197 }