1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.message;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.nio.ByteBuffer;
24 import java.util.concurrent.BlockingDeque;
25 import java.util.concurrent.LinkedBlockingDeque;
26 import java.util.concurrent.atomic.AtomicBoolean;
27
28 import org.eclipse.jetty.util.BufferUtil;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31 import org.eclipse.jetty.websocket.common.LogicalConnection;
32
33
34
35
36
37
38 public class MessageInputStream extends InputStream implements MessageAppender
39 {
40 private static final Logger LOG = Log.getLogger(MessageInputStream.class);
41
42
43
44 @SuppressWarnings("unused")
45 private final LogicalConnection connection;
46 private final BlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
47 private AtomicBoolean closed = new AtomicBoolean(false);
48
49 private AtomicBoolean buffersExhausted = new AtomicBoolean(false);
50 private ByteBuffer activeBuffer = null;
51
52 public MessageInputStream(LogicalConnection connection)
53 {
54 this.connection = connection;
55 }
56
57 @Override
58 public void appendMessage(ByteBuffer payload, boolean isLast) throws IOException
59 {
60 if (LOG.isDebugEnabled())
61 {
62 LOG.debug("appendMessage(ByteBuffer,{}): {}",isLast,BufferUtil.toDetailString(payload));
63 }
64
65 if (buffersExhausted.get())
66 {
67
68 throw new RuntimeException("Last frame already received");
69 }
70
71
72 if (closed.get())
73 {
74 return;
75 }
76
77
78 try
79 {
80 buffers.put(payload);
81 if (isLast)
82 {
83 buffersExhausted.set(true);
84 }
85 }
86 catch (InterruptedException e)
87 {
88 throw new IOException(e);
89 }
90 }
91
92 @Override
93 public void close() throws IOException
94 {
95 closed.set(true);
96 super.close();
97 }
98
99 @Override
100 public synchronized void mark(int readlimit)
101 {
102
103 }
104
105 @Override
106 public boolean markSupported()
107 {
108 return false;
109 }
110
111 @Override
112 public void messageComplete()
113 {
114 LOG.debug("messageComplete()");
115
116 buffersExhausted.set(true);
117
118 try
119 {
120 buffers.put(ByteBuffer.wrap(new byte[0]));
121 }
122 catch (InterruptedException ignore)
123 {
124
125 }
126 }
127
128 @Override
129 public int read() throws IOException
130 {
131 LOG.debug("read()");
132
133 try
134 {
135 if (closed.get())
136 {
137 return -1;
138 }
139
140 if (activeBuffer == null)
141 {
142 activeBuffer = buffers.take();
143 }
144
145 while (activeBuffer.remaining() <= 0)
146 {
147 if (buffersExhausted.get())
148 {
149 closed.set(true);
150 return -1;
151 }
152 activeBuffer = buffers.take();
153 }
154
155 return activeBuffer.get();
156 }
157 catch (InterruptedException e)
158 {
159 LOG.warn(e);
160 closed.set(true);
161 return -1;
162
163 }
164 }
165
166 @Override
167 public synchronized void reset() throws IOException
168 {
169 throw new IOException("reset() not supported");
170 }
171 }