1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.nio.ByteBuffer;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.concurrent.locks.ReentrantLock;
27
28 import org.eclipse.jetty.util.BufferUtil;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31 import org.eclipse.jetty.websocket.api.RemoteEndpoint;
32 import org.eclipse.jetty.websocket.api.WriteCallback;
33 import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
34 import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
35 import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
36 import org.eclipse.jetty.websocket.common.frames.DataFrame;
37 import org.eclipse.jetty.websocket.common.frames.PingFrame;
38 import org.eclipse.jetty.websocket.common.frames.PongFrame;
39 import org.eclipse.jetty.websocket.common.frames.TextFrame;
40 import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
41
42
43
44
45 public class WebSocketRemoteEndpoint implements RemoteEndpoint
46 {
47 private static final String PRIORMSG_ERROR = "Prior message pending, cannot start new message yet.";
48
49 private static final int NONE = 0;
50 private static final int TEXT = 1;
51 private static final int BINARY = 2;
52 private static final int CONTROL = 3;
53 private static final WriteCallback NOOP_CALLBACK = new WriteCallback()
54 {
55 @Override
56 public void writeSuccess()
57 {
58 }
59
60 @Override
61 public void writeFailed(Throwable x)
62 {
63 }
64 };
65
66 private static final Logger LOG = Log.getLogger(WebSocketRemoteEndpoint.class);
67 public final LogicalConnection connection;
68 public final OutgoingFrames outgoing;
69 private final ReentrantLock msgLock = new ReentrantLock();
70 private final AtomicInteger msgType = new AtomicInteger(NONE);
71 private boolean partialStarted = false;
72
73 public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing)
74 {
75 if (connection == null)
76 {
77 throw new IllegalArgumentException("LogicalConnection cannot be null");
78 }
79 this.connection = connection;
80 this.outgoing = outgoing;
81 }
82
83 private void blockingWrite(WebSocketFrame frame) throws IOException
84 {
85
86
87 BlockingWriteCallback callback = new BlockingWriteCallback();
88 sendFrame(frame,callback);
89 callback.block();
90 }
91
92 public InetSocketAddress getInetSocketAddress()
93 {
94 return connection.getRemoteAddress();
95 }
96
97
98
99
100
101
102
103
104 private Future<Void> sendAsyncFrame(WebSocketFrame frame)
105 {
106 FutureWriteCallback future = new FutureWriteCallback();
107 sendFrame(frame,future);
108 return future;
109 }
110
111
112
113
114 @Override
115 public void sendBytes(ByteBuffer data) throws IOException
116 {
117 if (msgLock.tryLock())
118 {
119 try
120 {
121 msgType.set(BINARY);
122 connection.getIOState().assertOutputOpen();
123 if (LOG.isDebugEnabled())
124 {
125 LOG.debug("sendBytes with {}",BufferUtil.toDetailString(data));
126 }
127 blockingWrite(new BinaryFrame().setPayload(data));
128 }
129 finally
130 {
131 msgType.set(NONE);
132 msgLock.unlock();
133 }
134 }
135 else
136 {
137 throw new IllegalStateException(PRIORMSG_ERROR);
138 }
139 }
140
141 @Override
142 public Future<Void> sendBytesByFuture(ByteBuffer data)
143 {
144 msgType.set(BINARY);
145 if (LOG.isDebugEnabled())
146 {
147 LOG.debug("sendBytesByFuture with {}",BufferUtil.toDetailString(data));
148 }
149 return sendAsyncFrame(new BinaryFrame().setPayload(data));
150 }
151
152 @Override
153 public void sendBytes(ByteBuffer data, WriteCallback callback)
154 {
155 msgType.set(BINARY);
156 if (LOG.isDebugEnabled())
157 {
158 LOG.debug("sendBytes({}, {})",BufferUtil.toDetailString(data),callback);
159 }
160 sendFrame(new BinaryFrame().setPayload(data),callback==null?NOOP_CALLBACK:callback);
161 }
162
163 public void sendFrame(WebSocketFrame frame, WriteCallback callback)
164 {
165 try
166 {
167 connection.getIOState().assertOutputOpen();
168 outgoing.outgoingFrame(frame,callback);
169 }
170 catch (IOException e)
171 {
172 callback.writeFailed(e);
173 }
174 }
175
176 @Override
177 public void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException
178 {
179 if (msgLock.tryLock())
180 {
181 try
182 {
183 if (msgType.get() == TEXT)
184 {
185 throw new IllegalStateException("Prior TEXT message pending, cannot start new BINARY message yet.");
186 }
187 msgType.set(BINARY);
188
189 if (LOG.isDebugEnabled())
190 {
191 LOG.debug("sendPartialBytes({}, {})",BufferUtil.toDetailString(fragment),isLast);
192 }
193 DataFrame frame = null;
194 if (partialStarted)
195 {
196 frame = new ContinuationFrame().setPayload(fragment);
197 }
198 else
199 {
200 frame = new BinaryFrame().setPayload(fragment);
201 }
202 frame.setFin(isLast);
203 blockingWrite(frame);
204 partialStarted = !isLast;
205 }
206 finally
207 {
208 if (isLast)
209 {
210 msgType.set(NONE);
211 }
212 msgLock.unlock();
213 }
214 }
215 else
216 {
217 throw new IllegalStateException(PRIORMSG_ERROR);
218 }
219 }
220
221 @Override
222 public void sendPartialString(String fragment, boolean isLast) throws IOException
223 {
224 if (msgLock.tryLock())
225 {
226 try
227 {
228 if (msgType.get() == BINARY)
229 {
230 throw new IllegalStateException("Prior BINARY message pending, cannot start new TEXT message yet.");
231 }
232 msgType.set(TEXT);
233
234 if (LOG.isDebugEnabled())
235 {
236 LOG.debug("sendPartialString({}, {})",fragment,isLast);
237 }
238 DataFrame frame = null;
239 if (partialStarted)
240 {
241 frame = new ContinuationFrame().setPayload(fragment);
242 }
243 else
244 {
245 frame = new TextFrame().setPayload(fragment);
246 }
247 frame.setFin(isLast);
248 blockingWrite(frame);
249 partialStarted = !isLast;
250 }
251 finally
252 {
253 if (isLast)
254 {
255 msgType.set(NONE);
256 }
257 msgLock.unlock();
258 }
259 }
260 else
261 {
262 throw new IllegalStateException(PRIORMSG_ERROR);
263 }
264 }
265
266 @Override
267 public void sendPing(ByteBuffer applicationData) throws IOException
268 {
269 if (msgLock.tryLock())
270 {
271 try
272 {
273 msgType.set(CONTROL);
274 if (LOG.isDebugEnabled())
275 {
276 LOG.debug("sendPing with {}",BufferUtil.toDetailString(applicationData));
277 }
278 blockingWrite(new PingFrame().setPayload(applicationData));
279 }
280 finally
281 {
282 msgType.set(NONE);
283 msgLock.unlock();
284 }
285 }
286 else
287 {
288 throw new IllegalStateException(PRIORMSG_ERROR);
289 }
290 }
291
292 @Override
293 public void sendPong(ByteBuffer applicationData) throws IOException
294 {
295 if (msgLock.tryLock())
296 {
297 try
298 {
299 msgType.set(CONTROL);
300 if (LOG.isDebugEnabled())
301 {
302 LOG.debug("sendPong with {}",BufferUtil.toDetailString(applicationData));
303 }
304 blockingWrite(new PongFrame().setPayload(applicationData));
305 }
306 finally
307 {
308 msgType.set(NONE);
309 msgLock.unlock();
310 }
311 }
312 else
313 {
314 throw new IllegalStateException(PRIORMSG_ERROR);
315 }
316 }
317
318 @Override
319 public void sendString(String text) throws IOException
320 {
321 if (msgLock.tryLock())
322 {
323 try
324 {
325 msgType.set(TEXT);
326 WebSocketFrame frame = new TextFrame().setPayload(text);
327 if (LOG.isDebugEnabled())
328 {
329 LOG.debug("sendString with {}",BufferUtil.toDetailString(frame.getPayload()));
330 }
331 blockingWrite(frame);
332 }
333 finally
334 {
335 msgType.set(NONE);
336 msgLock.unlock();
337 }
338 }
339 else
340 {
341 throw new IllegalStateException(PRIORMSG_ERROR);
342 }
343 }
344
345 @Override
346 public Future<Void> sendStringByFuture(String text)
347 {
348 msgType.set(TEXT);
349 TextFrame frame = new TextFrame().setPayload(text);
350 if (LOG.isDebugEnabled())
351 {
352 LOG.debug("sendStringByFuture with {}",BufferUtil.toDetailString(frame.getPayload()));
353 }
354 return sendAsyncFrame(frame);
355 }
356
357 @Override
358 public void sendString(String text, WriteCallback callback)
359 {
360 msgType.set(TEXT);
361 TextFrame frame = new TextFrame().setPayload(text);
362 if (LOG.isDebugEnabled())
363 {
364 LOG.debug("sendString({},{})",BufferUtil.toDetailString(frame.getPayload()),callback);
365 }
366 sendFrame(frame,callback==null?NOOP_CALLBACK:callback);
367 }
368 }