View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.nio.ByteBuffer;
24  import java.nio.charset.StandardCharsets;
25  import java.util.concurrent.Future;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import org.eclipse.jetty.util.BufferUtil;
29  import org.eclipse.jetty.util.log.Log;
30  import org.eclipse.jetty.util.log.Logger;
31  import org.eclipse.jetty.websocket.api.BatchMode;
32  import org.eclipse.jetty.websocket.api.RemoteEndpoint;
33  import org.eclipse.jetty.websocket.api.WriteCallback;
34  import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
35  import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
36  import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
37  import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
38  import org.eclipse.jetty.websocket.common.frames.DataFrame;
39  import org.eclipse.jetty.websocket.common.frames.PingFrame;
40  import org.eclipse.jetty.websocket.common.frames.PongFrame;
41  import org.eclipse.jetty.websocket.common.frames.TextFrame;
42  import org.eclipse.jetty.websocket.common.io.FrameFlusher;
43  import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
44  
45  /**
46   * Endpoint for Writing messages to the Remote websocket.
47   */
48  public class WebSocketRemoteEndpoint implements RemoteEndpoint
49  {
50      private enum MsgType
51      {
52          BLOCKING,
53          ASYNC,
54          STREAMING,
55          PARTIAL_TEXT,
56          PARTIAL_BINARY
57      }
58  
59      private static final WriteCallback NOOP_CALLBACK = new WriteCallback()
60      {
61          @Override
62          public void writeSuccess()
63          {
64          }
65  
66          @Override
67          public void writeFailed(Throwable x)
68          {
69          }
70      };
71  
72      private static final Logger LOG = Log.getLogger(WebSocketRemoteEndpoint.class);
73  
74      private final static int ASYNC_MASK = 0x0000FFFF;
75      private final static int BLOCK_MASK = 0x00010000;
76      private final static int STREAM_MASK = 0x00020000;
77      private final static int PARTIAL_TEXT_MASK = 0x00040000;
78      private final static int PARTIAL_BINARY_MASK = 0x00080000;
79  
80      private final LogicalConnection connection;
81      private final OutgoingFrames outgoing;
82      private final AtomicInteger msgState = new AtomicInteger();
83      private final BlockingWriteCallback blocker = new BlockingWriteCallback();
84      private volatile BatchMode batchMode;
85  
86      public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing)
87      {
88          this(connection, outgoing, BatchMode.AUTO);
89      }
90  
91      public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing, BatchMode batchMode)
92      {
93          if (connection == null)
94          {
95              throw new IllegalArgumentException("LogicalConnection cannot be null");
96          }
97          this.connection = connection;
98          this.outgoing = outgoing;
99          this.batchMode = batchMode;
100     }
101 
102     private void blockingWrite(WebSocketFrame frame) throws IOException
103     {
104         try(WriteBlocker b=blocker.acquireWriteBlocker())
105         {
106             uncheckedSendFrame(frame, b);
107             b.block();
108         }
109     }
110 
111     private boolean lockMsg(MsgType type)
112     {
113         // Blocking -> BLOCKING  ; Async -> ASYNC     ; Partial -> PARTIAL_XXXX ; Stream -> STREAMING
114         // Blocking -> Pending!! ; Async -> BLOCKING  ; Partial -> Pending!!    ; Stream -> STREAMING 
115         // Blocking -> BLOCKING  ; Async -> ASYNC     ; Partial -> Pending!!    ; Stream -> STREAMING
116         // Blocking -> Pending!! ; Async -> STREAMING ; Partial -> Pending!!    ; Stream -> STREAMING
117         // Blocking -> Pending!! ; Async -> Pending!! ; Partial -> PARTIAL_TEXT ; Stream -> Pending!!
118         // Blocking -> Pending!! ; Async -> Pending!! ; Partial -> PARTIAL_BIN  ; Stream -> Pending!!
119 
120         while (true)
121         {
122             int state = msgState.get();
123 
124             switch (type)
125             {
126                 case BLOCKING:
127                     if ((state & (PARTIAL_BINARY_MASK + PARTIAL_TEXT_MASK)) != 0)
128                         throw new IllegalStateException(String.format("Partial message pending %x for %s", state, type));
129                     if ((state & BLOCK_MASK) != 0)
130                         throw new IllegalStateException(String.format("Blocking message pending %x for %s", state, type));
131                     if (msgState.compareAndSet(state, state | BLOCK_MASK))
132                         return state == 0;
133                     break;
134 
135                 case ASYNC:
136                     if ((state & (PARTIAL_BINARY_MASK + PARTIAL_TEXT_MASK)) != 0)
137                         throw new IllegalStateException(String.format("Partial message pending %x for %s", state, type));
138                     if ((state & ASYNC_MASK) == ASYNC_MASK)
139                         throw new IllegalStateException(String.format("Too many async sends: %x", state));
140                     if (msgState.compareAndSet(state, state + 1))
141                         return state == 0;
142                     break;
143 
144                 case STREAMING:
145                     if ((state & (PARTIAL_BINARY_MASK + PARTIAL_TEXT_MASK)) != 0)
146                         throw new IllegalStateException(String.format("Partial message pending %x for %s", state, type));
147                     if ((state & STREAM_MASK) != 0)
148                         throw new IllegalStateException(String.format("Already streaming %x for %s", state, type));
149                     if (msgState.compareAndSet(state, state | STREAM_MASK))
150                         return state == 0;
151                     break;
152 
153                 case PARTIAL_BINARY:
154                     if (state == PARTIAL_BINARY_MASK)
155                         return false;
156                     if (state == 0)
157                     {
158                         if (msgState.compareAndSet(0, state | PARTIAL_BINARY_MASK))
159                             return true;
160                     }
161                     throw new IllegalStateException(String.format("Cannot send %s in state %x", type, state));
162 
163                 case PARTIAL_TEXT:
164                     if (state == PARTIAL_TEXT_MASK)
165                         return false;
166                     if (state == 0)
167                     {
168                         if (msgState.compareAndSet(0, state | PARTIAL_TEXT_MASK))
169                             return true;
170                     }
171                     throw new IllegalStateException(String.format("Cannot send %s in state %x", type, state));
172             }
173         }
174     }
175 
176     private void unlockMsg(MsgType type)
177     {
178         while (true)
179         {
180             int state = msgState.get();
181 
182             switch (type)
183             {
184                 case BLOCKING:
185                     if ((state & BLOCK_MASK) == 0)
186                         throw new IllegalStateException(String.format("Not Blocking in state %x", state));
187                     if (msgState.compareAndSet(state, state & ~BLOCK_MASK))
188                         return;
189                     break;
190 
191                 case ASYNC:
192                     if ((state & ASYNC_MASK) == 0)
193                         throw new IllegalStateException(String.format("Not Async in %x", state));
194                     if (msgState.compareAndSet(state, state - 1))
195                         return;
196                     break;
197 
198                 case STREAMING:
199                     if ((state & STREAM_MASK) == 0)
200                         throw new IllegalStateException(String.format("Not Streaming in state %x", state));
201                     if (msgState.compareAndSet(state, state & ~STREAM_MASK))
202                         return;
203                     break;
204 
205                 case PARTIAL_BINARY:
206                     if (msgState.compareAndSet(PARTIAL_BINARY_MASK, 0))
207                         return;
208                     throw new IllegalStateException(String.format("Not Partial Binary in state %x", state));
209 
210                 case PARTIAL_TEXT:
211                     if (msgState.compareAndSet(PARTIAL_TEXT_MASK, 0))
212                         return;
213                     throw new IllegalStateException(String.format("Not Partial Text in state %x", state));
214 
215             }
216         }
217     }
218 
219 
220     public InetSocketAddress getInetSocketAddress()
221     {
222         return connection.getRemoteAddress();
223     }
224 
225     /**
226      * Internal
227      *
228      * @param frame the frame to write
229      * @return the future for the network write of the frame
230      */
231     private Future<Void> sendAsyncFrame(WebSocketFrame frame)
232     {
233         FutureWriteCallback future = new FutureWriteCallback();
234         uncheckedSendFrame(frame, future);
235         return future;
236     }
237 
238     /**
239      * Blocking write of bytes.
240      */
241     @Override
242     public void sendBytes(ByteBuffer data) throws IOException
243     {
244         lockMsg(MsgType.BLOCKING);
245         try
246         {
247             connection.getIOState().assertOutputOpen();
248             if (LOG.isDebugEnabled())
249             {
250                 LOG.debug("sendBytes with {}", BufferUtil.toDetailString(data));
251             }
252             blockingWrite(new BinaryFrame().setPayload(data));
253         }
254         finally
255         {
256             unlockMsg(MsgType.BLOCKING);
257         }
258     }
259 
260     @Override
261     public Future<Void> sendBytesByFuture(ByteBuffer data)
262     {
263         lockMsg(MsgType.ASYNC);
264         try
265         {
266             if (LOG.isDebugEnabled())
267             {
268                 LOG.debug("sendBytesByFuture with {}", BufferUtil.toDetailString(data));
269             }
270             return sendAsyncFrame(new BinaryFrame().setPayload(data));
271         }
272         finally
273         {
274             unlockMsg(MsgType.ASYNC);
275         }
276     }
277 
278     @Override
279     public void sendBytes(ByteBuffer data, WriteCallback callback)
280     {
281         lockMsg(MsgType.ASYNC);
282         try
283         {
284             if (LOG.isDebugEnabled())
285             {
286                 LOG.debug("sendBytes({}, {})", BufferUtil.toDetailString(data), callback);
287             }
288             uncheckedSendFrame(new BinaryFrame().setPayload(data), callback == null ? NOOP_CALLBACK : callback);
289         }
290         finally
291         {
292             unlockMsg(MsgType.ASYNC);
293         }
294     }
295 
296     public void uncheckedSendFrame(WebSocketFrame frame, WriteCallback callback)
297     {
298         try
299         {
300             BatchMode batchMode = BatchMode.OFF;
301             if (frame.isDataFrame())
302                 batchMode = getBatchMode();
303             connection.getIOState().assertOutputOpen();
304             outgoing.outgoingFrame(frame, callback, batchMode);
305         }
306         catch (IOException e)
307         {
308             callback.writeFailed(e);
309         }
310     }
311 
312     @Override
313     public void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException
314     {
315         boolean first = lockMsg(MsgType.PARTIAL_BINARY);
316         try
317         {
318             if (LOG.isDebugEnabled())
319             {
320                 LOG.debug("sendPartialBytes({}, {})", BufferUtil.toDetailString(fragment), isLast);
321             }
322             DataFrame frame = first ? new BinaryFrame() : new ContinuationFrame();
323             frame.setPayload(fragment);
324             frame.setFin(isLast);
325             blockingWrite(frame);
326         }
327         finally
328         {
329             if (isLast)
330                 unlockMsg(MsgType.PARTIAL_BINARY);
331         }
332     }
333 
334     @Override
335     public void sendPartialString(String fragment, boolean isLast) throws IOException
336     {
337         boolean first = lockMsg(MsgType.PARTIAL_TEXT);
338         try
339         {
340             if (LOG.isDebugEnabled())
341             {
342                 LOG.debug("sendPartialString({}, {})", fragment, isLast);
343             }
344             DataFrame frame = first ? new TextFrame() : new ContinuationFrame();
345             frame.setPayload(BufferUtil.toBuffer(fragment, StandardCharsets.UTF_8));
346             frame.setFin(isLast);
347             blockingWrite(frame);
348         }
349         finally
350         {
351             if (isLast)
352                 unlockMsg(MsgType.PARTIAL_TEXT);
353         }
354     }
355 
356     @Override
357     public void sendPing(ByteBuffer applicationData) throws IOException
358     {
359         if (LOG.isDebugEnabled())
360         {
361             LOG.debug("sendPing with {}", BufferUtil.toDetailString(applicationData));
362         }
363         sendAsyncFrame(new PingFrame().setPayload(applicationData));
364     }
365 
366     @Override
367     public void sendPong(ByteBuffer applicationData) throws IOException
368     {
369         if (LOG.isDebugEnabled())
370         {
371             LOG.debug("sendPong with {}", BufferUtil.toDetailString(applicationData));
372         }
373         sendAsyncFrame(new PongFrame().setPayload(applicationData));
374     }
375 
376     @Override
377     public void sendString(String text) throws IOException
378     {
379         lockMsg(MsgType.BLOCKING);
380         try
381         {
382             WebSocketFrame frame = new TextFrame().setPayload(text);
383             if (LOG.isDebugEnabled())
384             {
385                 LOG.debug("sendString with {}", BufferUtil.toDetailString(frame.getPayload()));
386             }
387             blockingWrite(frame);
388         }
389         finally
390         {
391             unlockMsg(MsgType.BLOCKING);
392         }
393     }
394 
395     @Override
396     public Future<Void> sendStringByFuture(String text)
397     {
398         lockMsg(MsgType.ASYNC);
399         try
400         {
401             TextFrame frame = new TextFrame().setPayload(text);
402             if (LOG.isDebugEnabled())
403             {
404                 LOG.debug("sendStringByFuture with {}", BufferUtil.toDetailString(frame.getPayload()));
405             }
406             return sendAsyncFrame(frame);
407         }
408         finally
409         {
410             unlockMsg(MsgType.ASYNC);
411         }
412     }
413 
414     @Override
415     public void sendString(String text, WriteCallback callback)
416     {
417         lockMsg(MsgType.ASYNC);
418         try
419         {
420             TextFrame frame = new TextFrame().setPayload(text);
421             if (LOG.isDebugEnabled())
422             {
423                 LOG.debug("sendString({},{})", BufferUtil.toDetailString(frame.getPayload()), callback);
424             }
425             uncheckedSendFrame(frame, callback == null ? NOOP_CALLBACK : callback);
426         }
427         finally
428         {
429             unlockMsg(MsgType.ASYNC);
430         }
431     }
432 
433     @Override
434     public BatchMode getBatchMode()
435     {
436         return batchMode;
437     }
438 
439     @Override
440     public void setBatchMode(BatchMode batchMode)
441     {
442         this.batchMode = batchMode;
443     }
444 
445     @Override
446     public void flush() throws IOException
447     {
448         lockMsg(MsgType.ASYNC);
449         try (WriteBlocker b = blocker.acquireWriteBlocker())
450         {
451             uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, b);
452             b.block();
453         }
454         finally
455         {
456             unlockMsg(MsgType.ASYNC);
457         }
458     }
459 
460     @Override
461     public String toString()
462     {
463         return String.format("%s@%x[batching=%b]", getClass().getSimpleName(), hashCode(), getBatchMode());
464     }
465 }