1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.nio.ByteBuffer;
24 import java.nio.charset.StandardCharsets;
25 import java.util.concurrent.Future;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import org.eclipse.jetty.util.BufferUtil;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31 import org.eclipse.jetty.websocket.api.BatchMode;
32 import org.eclipse.jetty.websocket.api.RemoteEndpoint;
33 import org.eclipse.jetty.websocket.api.WriteCallback;
34 import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
35 import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
36 import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
37 import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
38 import org.eclipse.jetty.websocket.common.frames.DataFrame;
39 import org.eclipse.jetty.websocket.common.frames.PingFrame;
40 import org.eclipse.jetty.websocket.common.frames.PongFrame;
41 import org.eclipse.jetty.websocket.common.frames.TextFrame;
42 import org.eclipse.jetty.websocket.common.io.FrameFlusher;
43 import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
44
45
46
47
48 public class WebSocketRemoteEndpoint implements RemoteEndpoint
49 {
50 private enum MsgType
51 {
52 BLOCKING,
53 ASYNC,
54 STREAMING,
55 PARTIAL_TEXT,
56 PARTIAL_BINARY
57 }
58
59 private static final WriteCallback NOOP_CALLBACK = new WriteCallback()
60 {
61 @Override
62 public void writeSuccess()
63 {
64 }
65
66 @Override
67 public void writeFailed(Throwable x)
68 {
69 }
70 };
71
72 private static final Logger LOG = Log.getLogger(WebSocketRemoteEndpoint.class);
73
74 private final static int ASYNC_MASK = 0x0000FFFF;
75 private final static int BLOCK_MASK = 0x00010000;
76 private final static int STREAM_MASK = 0x00020000;
77 private final static int PARTIAL_TEXT_MASK = 0x00040000;
78 private final static int PARTIAL_BINARY_MASK = 0x00080000;
79
80 private final LogicalConnection connection;
81 private final OutgoingFrames outgoing;
82 private final AtomicInteger msgState = new AtomicInteger();
83 private final BlockingWriteCallback blocker = new BlockingWriteCallback();
84 private volatile BatchMode batchMode;
85
86 public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing)
87 {
88 this(connection, outgoing, BatchMode.AUTO);
89 }
90
91 public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing, BatchMode batchMode)
92 {
93 if (connection == null)
94 {
95 throw new IllegalArgumentException("LogicalConnection cannot be null");
96 }
97 this.connection = connection;
98 this.outgoing = outgoing;
99 this.batchMode = batchMode;
100 }
101
102 private void blockingWrite(WebSocketFrame frame) throws IOException
103 {
104 try(WriteBlocker b=blocker.acquireWriteBlocker())
105 {
106 uncheckedSendFrame(frame, b);
107 b.block();
108 }
109 }
110
111 private boolean lockMsg(MsgType type)
112 {
113
114
115
116
117
118
119
120 while (true)
121 {
122 int state = msgState.get();
123
124 switch (type)
125 {
126 case BLOCKING:
127 if ((state & (PARTIAL_BINARY_MASK + PARTIAL_TEXT_MASK)) != 0)
128 throw new IllegalStateException(String.format("Partial message pending %x for %s", state, type));
129 if ((state & BLOCK_MASK) != 0)
130 throw new IllegalStateException(String.format("Blocking message pending %x for %s", state, type));
131 if (msgState.compareAndSet(state, state | BLOCK_MASK))
132 return state == 0;
133 break;
134
135 case ASYNC:
136 if ((state & (PARTIAL_BINARY_MASK + PARTIAL_TEXT_MASK)) != 0)
137 throw new IllegalStateException(String.format("Partial message pending %x for %s", state, type));
138 if ((state & ASYNC_MASK) == ASYNC_MASK)
139 throw new IllegalStateException(String.format("Too many async sends: %x", state));
140 if (msgState.compareAndSet(state, state + 1))
141 return state == 0;
142 break;
143
144 case STREAMING:
145 if ((state & (PARTIAL_BINARY_MASK + PARTIAL_TEXT_MASK)) != 0)
146 throw new IllegalStateException(String.format("Partial message pending %x for %s", state, type));
147 if ((state & STREAM_MASK) != 0)
148 throw new IllegalStateException(String.format("Already streaming %x for %s", state, type));
149 if (msgState.compareAndSet(state, state | STREAM_MASK))
150 return state == 0;
151 break;
152
153 case PARTIAL_BINARY:
154 if (state == PARTIAL_BINARY_MASK)
155 return false;
156 if (state == 0)
157 {
158 if (msgState.compareAndSet(0, state | PARTIAL_BINARY_MASK))
159 return true;
160 }
161 throw new IllegalStateException(String.format("Cannot send %s in state %x", type, state));
162
163 case PARTIAL_TEXT:
164 if (state == PARTIAL_TEXT_MASK)
165 return false;
166 if (state == 0)
167 {
168 if (msgState.compareAndSet(0, state | PARTIAL_TEXT_MASK))
169 return true;
170 }
171 throw new IllegalStateException(String.format("Cannot send %s in state %x", type, state));
172 }
173 }
174 }
175
176 private void unlockMsg(MsgType type)
177 {
178 while (true)
179 {
180 int state = msgState.get();
181
182 switch (type)
183 {
184 case BLOCKING:
185 if ((state & BLOCK_MASK) == 0)
186 throw new IllegalStateException(String.format("Not Blocking in state %x", state));
187 if (msgState.compareAndSet(state, state & ~BLOCK_MASK))
188 return;
189 break;
190
191 case ASYNC:
192 if ((state & ASYNC_MASK) == 0)
193 throw new IllegalStateException(String.format("Not Async in %x", state));
194 if (msgState.compareAndSet(state, state - 1))
195 return;
196 break;
197
198 case STREAMING:
199 if ((state & STREAM_MASK) == 0)
200 throw new IllegalStateException(String.format("Not Streaming in state %x", state));
201 if (msgState.compareAndSet(state, state & ~STREAM_MASK))
202 return;
203 break;
204
205 case PARTIAL_BINARY:
206 if (msgState.compareAndSet(PARTIAL_BINARY_MASK, 0))
207 return;
208 throw new IllegalStateException(String.format("Not Partial Binary in state %x", state));
209
210 case PARTIAL_TEXT:
211 if (msgState.compareAndSet(PARTIAL_TEXT_MASK, 0))
212 return;
213 throw new IllegalStateException(String.format("Not Partial Text in state %x", state));
214
215 }
216 }
217 }
218
219
220 public InetSocketAddress getInetSocketAddress()
221 {
222 return connection.getRemoteAddress();
223 }
224
225
226
227
228
229
230
231 private Future<Void> sendAsyncFrame(WebSocketFrame frame)
232 {
233 FutureWriteCallback future = new FutureWriteCallback();
234 uncheckedSendFrame(frame, future);
235 return future;
236 }
237
238
239
240
241 @Override
242 public void sendBytes(ByteBuffer data) throws IOException
243 {
244 lockMsg(MsgType.BLOCKING);
245 try
246 {
247 connection.getIOState().assertOutputOpen();
248 if (LOG.isDebugEnabled())
249 {
250 LOG.debug("sendBytes with {}", BufferUtil.toDetailString(data));
251 }
252 blockingWrite(new BinaryFrame().setPayload(data));
253 }
254 finally
255 {
256 unlockMsg(MsgType.BLOCKING);
257 }
258 }
259
260 @Override
261 public Future<Void> sendBytesByFuture(ByteBuffer data)
262 {
263 lockMsg(MsgType.ASYNC);
264 try
265 {
266 if (LOG.isDebugEnabled())
267 {
268 LOG.debug("sendBytesByFuture with {}", BufferUtil.toDetailString(data));
269 }
270 return sendAsyncFrame(new BinaryFrame().setPayload(data));
271 }
272 finally
273 {
274 unlockMsg(MsgType.ASYNC);
275 }
276 }
277
278 @Override
279 public void sendBytes(ByteBuffer data, WriteCallback callback)
280 {
281 lockMsg(MsgType.ASYNC);
282 try
283 {
284 if (LOG.isDebugEnabled())
285 {
286 LOG.debug("sendBytes({}, {})", BufferUtil.toDetailString(data), callback);
287 }
288 uncheckedSendFrame(new BinaryFrame().setPayload(data), callback == null ? NOOP_CALLBACK : callback);
289 }
290 finally
291 {
292 unlockMsg(MsgType.ASYNC);
293 }
294 }
295
296 public void uncheckedSendFrame(WebSocketFrame frame, WriteCallback callback)
297 {
298 try
299 {
300 BatchMode batchMode = BatchMode.OFF;
301 if (frame.isDataFrame())
302 batchMode = getBatchMode();
303 connection.getIOState().assertOutputOpen();
304 outgoing.outgoingFrame(frame, callback, batchMode);
305 }
306 catch (IOException e)
307 {
308 callback.writeFailed(e);
309 }
310 }
311
312 @Override
313 public void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException
314 {
315 boolean first = lockMsg(MsgType.PARTIAL_BINARY);
316 try
317 {
318 if (LOG.isDebugEnabled())
319 {
320 LOG.debug("sendPartialBytes({}, {})", BufferUtil.toDetailString(fragment), isLast);
321 }
322 DataFrame frame = first ? new BinaryFrame() : new ContinuationFrame();
323 frame.setPayload(fragment);
324 frame.setFin(isLast);
325 blockingWrite(frame);
326 }
327 finally
328 {
329 if (isLast)
330 unlockMsg(MsgType.PARTIAL_BINARY);
331 }
332 }
333
334 @Override
335 public void sendPartialString(String fragment, boolean isLast) throws IOException
336 {
337 boolean first = lockMsg(MsgType.PARTIAL_TEXT);
338 try
339 {
340 if (LOG.isDebugEnabled())
341 {
342 LOG.debug("sendPartialString({}, {})", fragment, isLast);
343 }
344 DataFrame frame = first ? new TextFrame() : new ContinuationFrame();
345 frame.setPayload(BufferUtil.toBuffer(fragment, StandardCharsets.UTF_8));
346 frame.setFin(isLast);
347 blockingWrite(frame);
348 }
349 finally
350 {
351 if (isLast)
352 unlockMsg(MsgType.PARTIAL_TEXT);
353 }
354 }
355
356 @Override
357 public void sendPing(ByteBuffer applicationData) throws IOException
358 {
359 if (LOG.isDebugEnabled())
360 {
361 LOG.debug("sendPing with {}", BufferUtil.toDetailString(applicationData));
362 }
363 sendAsyncFrame(new PingFrame().setPayload(applicationData));
364 }
365
366 @Override
367 public void sendPong(ByteBuffer applicationData) throws IOException
368 {
369 if (LOG.isDebugEnabled())
370 {
371 LOG.debug("sendPong with {}", BufferUtil.toDetailString(applicationData));
372 }
373 sendAsyncFrame(new PongFrame().setPayload(applicationData));
374 }
375
376 @Override
377 public void sendString(String text) throws IOException
378 {
379 lockMsg(MsgType.BLOCKING);
380 try
381 {
382 WebSocketFrame frame = new TextFrame().setPayload(text);
383 if (LOG.isDebugEnabled())
384 {
385 LOG.debug("sendString with {}", BufferUtil.toDetailString(frame.getPayload()));
386 }
387 blockingWrite(frame);
388 }
389 finally
390 {
391 unlockMsg(MsgType.BLOCKING);
392 }
393 }
394
395 @Override
396 public Future<Void> sendStringByFuture(String text)
397 {
398 lockMsg(MsgType.ASYNC);
399 try
400 {
401 TextFrame frame = new TextFrame().setPayload(text);
402 if (LOG.isDebugEnabled())
403 {
404 LOG.debug("sendStringByFuture with {}", BufferUtil.toDetailString(frame.getPayload()));
405 }
406 return sendAsyncFrame(frame);
407 }
408 finally
409 {
410 unlockMsg(MsgType.ASYNC);
411 }
412 }
413
414 @Override
415 public void sendString(String text, WriteCallback callback)
416 {
417 lockMsg(MsgType.ASYNC);
418 try
419 {
420 TextFrame frame = new TextFrame().setPayload(text);
421 if (LOG.isDebugEnabled())
422 {
423 LOG.debug("sendString({},{})", BufferUtil.toDetailString(frame.getPayload()), callback);
424 }
425 uncheckedSendFrame(frame, callback == null ? NOOP_CALLBACK : callback);
426 }
427 finally
428 {
429 unlockMsg(MsgType.ASYNC);
430 }
431 }
432
433 @Override
434 public BatchMode getBatchMode()
435 {
436 return batchMode;
437 }
438
439 @Override
440 public void setBatchMode(BatchMode batchMode)
441 {
442 this.batchMode = batchMode;
443 }
444
445 @Override
446 public void flush() throws IOException
447 {
448 lockMsg(MsgType.ASYNC);
449 try (WriteBlocker b = blocker.acquireWriteBlocker())
450 {
451 uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, b);
452 b.block();
453 }
454 finally
455 {
456 unlockMsg(MsgType.ASYNC);
457 }
458 }
459
460 @Override
461 public String toString()
462 {
463 return String.format("%s@%x[batching=%b]", getClass().getSimpleName(), hashCode(), getBatchMode());
464 }
465 }