View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.nio.ByteBuffer;
24  import java.util.concurrent.Future;
25  import java.util.concurrent.atomic.AtomicInteger;
26  import java.util.concurrent.locks.ReentrantLock;
27  
28  import org.eclipse.jetty.util.BufferUtil;
29  import org.eclipse.jetty.util.log.Log;
30  import org.eclipse.jetty.util.log.Logger;
31  import org.eclipse.jetty.websocket.api.RemoteEndpoint;
32  import org.eclipse.jetty.websocket.api.WriteCallback;
33  import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
34  import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
35  import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
36  import org.eclipse.jetty.websocket.common.frames.DataFrame;
37  import org.eclipse.jetty.websocket.common.frames.PingFrame;
38  import org.eclipse.jetty.websocket.common.frames.PongFrame;
39  import org.eclipse.jetty.websocket.common.frames.TextFrame;
40  import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
41  
42  /**
43   * Endpoint for Writing messages to the Remote websocket.
44   */
45  public class WebSocketRemoteEndpoint implements RemoteEndpoint
46  {
47      private static final String PRIORMSG_ERROR = "Prior message pending, cannot start new message yet.";
48      /** Type of Message */
49      private static final int NONE = 0;
50      private static final int TEXT = 1;
51      private static final int BINARY = 2;
52      private static final int CONTROL = 3;
53      private static final WriteCallback NOOP_CALLBACK = new WriteCallback()
54      {
55          @Override
56          public void writeSuccess()
57          {
58          }
59          
60          @Override
61          public void writeFailed(Throwable x)
62          {
63          }
64      };
65  
66      private static final Logger LOG = Log.getLogger(WebSocketRemoteEndpoint.class);
67      public final LogicalConnection connection;
68      public final OutgoingFrames outgoing;
69      private final ReentrantLock msgLock = new ReentrantLock();
70      private final AtomicInteger msgType = new AtomicInteger(NONE);
71      private boolean partialStarted = false;
72  
73      public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing)
74      {
75          if (connection == null)
76          {
77              throw new IllegalArgumentException("LogicalConnection cannot be null");
78          }
79          this.connection = connection;
80          this.outgoing = outgoing;
81      }
82  
83      private void blockingWrite(WebSocketFrame frame) throws IOException
84      {
85          // TODO Blocking callbacks can be recycled, but they do not handle concurrent calls,
86          // so if some mutual exclusion can be applied, then this callback can be reused.
87          BlockingWriteCallback callback = new BlockingWriteCallback();
88          sendFrame(frame,callback);
89          callback.block();
90      }
91  
92      public InetSocketAddress getInetSocketAddress()
93      {
94          return connection.getRemoteAddress();
95      }
96  
97      /**
98       * Internal
99       * 
100      * @param frame
101      *            the frame to write
102      * @return the future for the network write of the frame
103      */
104     private Future<Void> sendAsyncFrame(WebSocketFrame frame)
105     {
106         FutureWriteCallback future = new FutureWriteCallback();
107         sendFrame(frame,future);
108         return future;
109     }
110 
111     /**
112      * Blocking write of bytes.
113      */
114     @Override
115     public void sendBytes(ByteBuffer data) throws IOException
116     {
117         if (msgLock.tryLock())
118         {
119             try
120             {
121                 msgType.set(BINARY);
122                 connection.getIOState().assertOutputOpen();
123                 if (LOG.isDebugEnabled())
124                 {
125                     LOG.debug("sendBytes with {}",BufferUtil.toDetailString(data));
126                 }
127                 blockingWrite(new BinaryFrame().setPayload(data));
128             }
129             finally
130             {
131                 msgType.set(NONE);
132                 msgLock.unlock();
133             }
134         }
135         else
136         {
137             throw new IllegalStateException(PRIORMSG_ERROR);
138         }
139     }
140 
141     @Override
142     public Future<Void> sendBytesByFuture(ByteBuffer data)
143     {
144         msgType.set(BINARY);
145         if (LOG.isDebugEnabled())
146         {
147             LOG.debug("sendBytesByFuture with {}",BufferUtil.toDetailString(data));
148         }
149         return sendAsyncFrame(new BinaryFrame().setPayload(data));
150     }
151 
152     @Override
153     public void sendBytes(ByteBuffer data, WriteCallback callback)
154     {
155         msgType.set(BINARY);
156         if (LOG.isDebugEnabled())
157         {
158             LOG.debug("sendBytes({}, {})",BufferUtil.toDetailString(data),callback);
159         }
160         sendFrame(new BinaryFrame().setPayload(data),callback==null?NOOP_CALLBACK:callback);
161     }
162 
163     public void sendFrame(WebSocketFrame frame, WriteCallback callback)
164     {
165         try
166         {
167             connection.getIOState().assertOutputOpen();
168             outgoing.outgoingFrame(frame,callback);
169         }
170         catch (IOException e)
171         {
172             callback.writeFailed(e);
173         }
174     }
175 
176     @Override
177     public void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException
178     {
179         if (msgLock.tryLock())
180         {
181             try
182             {
183                 if (msgType.get() == TEXT)
184                 {
185                     throw new IllegalStateException("Prior TEXT message pending, cannot start new BINARY message yet.");
186                 }
187                 msgType.set(BINARY);
188 
189                 if (LOG.isDebugEnabled())
190                 {
191                     LOG.debug("sendPartialBytes({}, {})",BufferUtil.toDetailString(fragment),isLast);
192                 }
193                 DataFrame frame = null;
194                 if (partialStarted)
195                 {
196                     frame = new ContinuationFrame().setPayload(fragment);
197                 }
198                 else
199                 {
200                     frame = new BinaryFrame().setPayload(fragment);
201                 }
202                 frame.setFin(isLast);
203                 blockingWrite(frame);
204                 partialStarted = !isLast;
205             }
206             finally
207             {
208                 if (isLast)
209                 {
210                     msgType.set(NONE);
211                 }
212                 msgLock.unlock();
213             }
214         }
215         else
216         {
217             throw new IllegalStateException(PRIORMSG_ERROR);
218         }
219     }
220 
221     @Override
222     public void sendPartialString(String fragment, boolean isLast) throws IOException
223     {
224         if (msgLock.tryLock())
225         {
226             try
227             {
228                 if (msgType.get() == BINARY)
229                 {
230                     throw new IllegalStateException("Prior BINARY message pending, cannot start new TEXT message yet.");
231                 }
232                 msgType.set(TEXT);
233 
234                 if (LOG.isDebugEnabled())
235                 {
236                     LOG.debug("sendPartialString({}, {})",fragment,isLast);
237                 }
238                 DataFrame frame = null;
239                 if (partialStarted)
240                 {
241                     frame = new ContinuationFrame().setPayload(fragment);
242                 }
243                 else
244                 {
245                     frame = new TextFrame().setPayload(fragment);
246                 }
247                 frame.setFin(isLast);
248                 blockingWrite(frame);
249                 partialStarted = !isLast;
250             }
251             finally
252             {
253                 if (isLast)
254                 {
255                     msgType.set(NONE);
256                 }
257                 msgLock.unlock();
258             }
259         }
260         else
261         {
262             throw new IllegalStateException(PRIORMSG_ERROR);
263         }
264     }
265 
266     @Override
267     public void sendPing(ByteBuffer applicationData) throws IOException
268     {
269         if (msgLock.tryLock())
270         {
271             try
272             {
273                 msgType.set(CONTROL);
274                 if (LOG.isDebugEnabled())
275                 {
276                     LOG.debug("sendPing with {}",BufferUtil.toDetailString(applicationData));
277                 }
278                 blockingWrite(new PingFrame().setPayload(applicationData));
279             }
280             finally
281             {
282                 msgType.set(NONE);
283                 msgLock.unlock();
284             }
285         }
286         else
287         {
288             throw new IllegalStateException(PRIORMSG_ERROR);
289         }
290     }
291 
292     @Override
293     public void sendPong(ByteBuffer applicationData) throws IOException
294     {
295         if (msgLock.tryLock())
296         {
297             try
298             {
299                 msgType.set(CONTROL);
300                 if (LOG.isDebugEnabled())
301                 {
302                     LOG.debug("sendPong with {}",BufferUtil.toDetailString(applicationData));
303                 }
304                 blockingWrite(new PongFrame().setPayload(applicationData));
305             }
306             finally
307             {
308                 msgType.set(NONE);
309                 msgLock.unlock();
310             }
311         }
312         else
313         {
314             throw new IllegalStateException(PRIORMSG_ERROR);
315         }
316     }
317 
318     @Override
319     public void sendString(String text) throws IOException
320     {
321         if (msgLock.tryLock())
322         {
323             try
324             {
325                 msgType.set(TEXT);
326                 WebSocketFrame frame = new TextFrame().setPayload(text);
327                 if (LOG.isDebugEnabled())
328                 {
329                     LOG.debug("sendString with {}",BufferUtil.toDetailString(frame.getPayload()));
330                 }
331                 blockingWrite(frame);
332             }
333             finally
334             {
335                 msgType.set(NONE);
336                 msgLock.unlock();
337             }
338         }
339         else
340         {
341             throw new IllegalStateException(PRIORMSG_ERROR);
342         }
343     }
344 
345     @Override
346     public Future<Void> sendStringByFuture(String text)
347     {
348         msgType.set(TEXT);
349         TextFrame frame = new TextFrame().setPayload(text);
350         if (LOG.isDebugEnabled())
351         {
352             LOG.debug("sendStringByFuture with {}",BufferUtil.toDetailString(frame.getPayload()));
353         }
354         return sendAsyncFrame(frame);
355     }
356 
357     @Override
358     public void sendString(String text, WriteCallback callback)
359     {
360         msgType.set(TEXT);
361         TextFrame frame = new TextFrame().setPayload(text);
362         if (LOG.isDebugEnabled())
363         {
364             LOG.debug("sendString({},{})",BufferUtil.toDetailString(frame.getPayload()),callback);
365         }
366         sendFrame(frame,callback==null?NOOP_CALLBACK:callback);
367     }
368 }