View Javadoc

1   package org.eclipse.jetty.websocket;
2   
3   import java.io.IOException;
4   import java.net.InetSocketAddress;
5   import java.net.ProtocolException;
6   import java.net.SocketAddress;
7   import java.net.URI;
8   import java.nio.channels.ByteChannel;
9   import java.nio.channels.SocketChannel;
10  import java.util.List;
11  import java.util.Map;
12  import java.util.concurrent.ConcurrentHashMap;
13  import java.util.concurrent.CopyOnWriteArrayList;
14  import java.util.concurrent.CountDownLatch;
15  import java.util.concurrent.ExecutionException;
16  import java.util.concurrent.Future;
17  import java.util.concurrent.TimeUnit;
18  import java.util.concurrent.TimeoutException;
19  
20  import org.eclipse.jetty.util.log.Logger;
21  
22  
23  /* ------------------------------------------------------------ */
24  /**
25   * <p>{@link WebSocketClient} allows to create multiple connections to multiple destinations
26   * that can speak the websocket protocol.</p>
27   * <p>When creating websocket connections, {@link WebSocketClient} accepts a {@link WebSocket}
28   * object (to receive events from the server), and returns a {@link WebSocket.Connection} to
29   * send data to the server.</p>
30   * <p>Example usage is as follows:</p>
31   * <pre>
32   *   WebSocketClientFactory factory = new WebSocketClientFactory();
33   *   factory.start();
34   *
35   *   WebSocketClient client = factory.newWebSocketClient();
36   *   // Configure the client
37   *
38   *   WebSocket.Connection connection = client.open(new URI("ws://127.0.0.1:8080/"), new WebSocket.OnTextMessage()
39   *   {
40   *     public void onOpen(Connection connection)
41   *     {
42   *       // open notification
43   *     }
44   *
45   *     public void onClose(int closeCode, String message)
46   *     {
47   *       // close notification
48   *     }
49   *
50   *     public void onMessage(String data)
51   *     {
52   *       // handle incoming message
53   *     }
54   *   }).get(5, TimeUnit.SECONDS);
55   *
56   *   connection.sendMessage("Hello World");
57   * </pre>
58   */
59  public class WebSocketClient
60  {
61      private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClient.class.getName());
62  
63      private final WebSocketClientFactory _factory;
64      private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
65      private final List<String> _extensions=new CopyOnWriteArrayList<String>();
66      private String _origin;
67      private String _protocol;
68      private int _maxIdleTime=-1;
69      private int _maxTextMessageSize=16*1024;
70      private int _maxBinaryMessageSize=-1;
71      private MaskGen _maskGen;
72      private SocketAddress _bindAddress;
73  
74      /* ------------------------------------------------------------ */
75      /**
76       * <p>Creates a WebSocketClient from a private WebSocketClientFactory.</p>
77       * <p>This can be wasteful of resources if many clients are created.</p>
78       *
79       * @deprecated Use {@link WebSocketClientFactory#newWebSocketClient()}
80       * @throws Exception if the private WebSocketClientFactory fails to start
81       */
82      @Deprecated
83      public WebSocketClient() throws Exception
84      {
85          _factory=new WebSocketClientFactory();
86          _factory.start();
87          _maskGen=_factory.getMaskGen();
88      }
89  
90      /* ------------------------------------------------------------ */
91      /**
92       * <p>Creates a WebSocketClient with shared WebSocketClientFactory.</p>
93       *
94       * @param factory the shared {@link WebSocketClientFactory}
95       */
96      public WebSocketClient(WebSocketClientFactory factory)
97      {
98          _factory=factory;
99          _maskGen=_factory.getMaskGen();
100     }
101 
102     /* ------------------------------------------------------------ */
103     /**
104      * @return The WebSocketClientFactory this client was created with.
105      */
106     public WebSocketClientFactory getFactory()
107     {
108         return _factory;
109     }
110 
111     /* ------------------------------------------------------------ */
112     /**
113      * @return the address to bind the socket channel to
114      * @see #setBindAddress(SocketAddress)
115      */
116     public SocketAddress getBindAddress()
117     {
118         return _bindAddress;
119     }
120 
121     /* ------------------------------------------------------------ */
122     /**
123      * @param bindAddress the address to bind the socket channel to
124      * @see #getBindAddress()
125      */
126     public void setBindAddress(SocketAddress bindAddress)
127     {
128         this._bindAddress = bindAddress;
129     }
130 
131     /* ------------------------------------------------------------ */
132     /**
133      * @return The maxIdleTime in ms for connections opened by this client,
134      * or -1 if the default from {@link WebSocketClientFactory#getSelectorManager()} is used.
135      * @see #setMaxIdleTime(int)
136      */
137     public int getMaxIdleTime()
138     {
139         return _maxIdleTime;
140     }
141 
142     /* ------------------------------------------------------------ */
143     /**
144      * @param maxIdleTime The max idle time in ms for connections opened by this client
145      * @see #getMaxIdleTime()
146      */
147     public void setMaxIdleTime(int maxIdleTime)
148     {
149         _maxIdleTime=maxIdleTime;
150     }
151 
152     /* ------------------------------------------------------------ */
153     /**
154      * @return The subprotocol string for connections opened by this client.
155      * @see #setProtocol(String)
156      */
157     public String getProtocol()
158     {
159         return _protocol;
160     }
161 
162     /* ------------------------------------------------------------ */
163     /**
164      * @param protocol The subprotocol string for connections opened by this client.
165      * @see #getProtocol()
166      */
167     public void setProtocol(String protocol)
168     {
169         _protocol = protocol;
170     }
171 
172     /* ------------------------------------------------------------ */
173     /**
174      * @return The origin URI of the client
175      * @see #setOrigin(String)
176      */
177     public String getOrigin()
178     {
179         return _origin;
180     }
181 
182     /* ------------------------------------------------------------ */
183     /**
184      * @param origin The origin URI of the client (eg "http://example.com")
185      * @see #getOrigin()
186      */
187     public void setOrigin(String origin)
188     {
189         _origin = origin;
190     }
191 
192     /* ------------------------------------------------------------ */
193     /**
194      * <p>Returns the map of the cookies that are sent during the initial HTTP handshake
195      * that upgrades to the websocket protocol.</p>
196      * @return The read-write cookie map
197      */
198     public Map<String,String> getCookies()
199     {
200         return _cookies;
201     }
202 
203     /* ------------------------------------------------------------ */
204     /**
205      * @return The list of websocket protocol extensions
206      */
207     public List<String> getExtensions()
208     {
209         return _extensions;
210     }
211 
212     /* ------------------------------------------------------------ */
213     /**
214      * @return the mask generator to use, or null if not mask generator should be used
215      * @see #setMaskGen(MaskGen)
216      */
217     public MaskGen getMaskGen()
218     {
219         return _maskGen;
220     }
221 
222     /* ------------------------------------------------------------ */
223     /**
224      * @param maskGen the mask generator to use, or null if not mask generator should be used
225      * @see #getMaskGen()
226      */
227     public void setMaskGen(MaskGen maskGen)
228     {
229         _maskGen = maskGen;
230     }
231 
232     /* ------------------------------------------------------------ */
233     /**
234      * @return The initial maximum text message size (in characters) for a connection
235      */
236     public int getMaxTextMessageSize()
237     {
238         return _maxTextMessageSize;
239     }
240 
241     /* ------------------------------------------------------------ */
242     /**
243      * Set the initial maximum text message size for a connection. This can be changed by
244      * the application calling {@link WebSocket.Connection#setMaxTextMessageSize(int)}.
245      * @param maxTextMessageSize The default maximum text message size (in characters) for a connection
246      */
247     public void setMaxTextMessageSize(int maxTextMessageSize)
248     {
249         _maxTextMessageSize = maxTextMessageSize;
250     }
251 
252     /* ------------------------------------------------------------ */
253     /**
254      * @return The initial maximum binary message size (in bytes)  for a connection
255      */
256     public int getMaxBinaryMessageSize()
257     {
258         return _maxBinaryMessageSize;
259     }
260 
261     /* ------------------------------------------------------------ */
262     /**
263      * Set the initial maximum binary message size for a connection. This can be changed by
264      * the application calling {@link WebSocket.Connection#setMaxBinaryMessageSize(int)}.
265      * @param maxTextMessageSize The default maximum binary message size (in bytes) for a connection
266      */
267     public void setMaxBinaryMessageSize(int maxBinaryMessageSize)
268     {
269         _maxBinaryMessageSize = maxBinaryMessageSize;
270     }
271 
272     /* ------------------------------------------------------------ */
273     /**
274      * <p>Opens a websocket connection to the URI and blocks until the connection is accepted or there is an error.</p>
275      *
276      * @param uri The URI to connect to.
277      * @param websocket The {@link WebSocket} instance to handle incoming events.
278      * @param maxConnectTime The interval to wait for a successful connection
279      * @param units the units of the maxConnectTime
280      * @return A {@link WebSocket.Connection}
281      * @throws IOException if the connection fails
282      * @throws InterruptedException if the thread is interrupted
283      * @throws TimeoutException if the timeout elapses before the connection is completed
284      * @see #open(URI, WebSocket)
285      */
286     public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
287     {
288         try
289         {
290             return open(uri,websocket).get(maxConnectTime,units);
291         }
292         catch (ExecutionException e)
293         {
294             Throwable cause = e.getCause();
295             if (cause instanceof IOException)
296                 throw (IOException)cause;
297             if (cause instanceof Error)
298                 throw (Error)cause;
299             if (cause instanceof RuntimeException)
300                 throw (RuntimeException)cause;
301             throw new RuntimeException(cause);
302         }
303     }
304 
305     /* ------------------------------------------------------------ */
306     /**
307      * <p>Asynchronously opens a websocket connection and returns a {@link Future} to obtain the connection.</p>
308      * <p>The caller must call {@link Future#get(long, TimeUnit)} if they wish to impose a connect timeout on the open.</p>
309      *
310      * @param uri The URI to connect to.
311      * @param websocket The {@link WebSocket} instance to handle incoming events.
312      * @return A {@link Future} to the {@link WebSocket.Connection}
313      * @throws IOException if the connection fails
314      * @see #open(URI, WebSocket, long, TimeUnit)
315      */
316     public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
317     {
318         if (!_factory.isStarted())
319             throw new IllegalStateException("Factory !started");
320         String scheme=uri.getScheme();
321         if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
322             throw new IllegalArgumentException("Bad WebSocket scheme '"+scheme+"'");
323         if ("wss".equalsIgnoreCase(scheme))
324             throw new IOException("wss not supported");
325 
326         SocketChannel channel = SocketChannel.open();
327         if (_bindAddress != null)
328             channel.socket().bind(_bindAddress);
329         channel.socket().setTcpNoDelay(true);
330 
331         InetSocketAddress address=new InetSocketAddress(uri.getHost(),uri.getPort());
332 
333         final WebSocketFuture holder=new WebSocketFuture(websocket,uri,this,channel);
334 
335         channel.configureBlocking(false);
336         channel.connect(address);
337         _factory.getSelectorManager().register( channel, holder);
338 
339         return holder;
340     }
341 
342     /* ------------------------------------------------------------ */
343     /** The Future Websocket Connection.
344      */
345     static class WebSocketFuture implements Future<WebSocket.Connection>
346     {
347         final WebSocket _websocket;
348         final URI _uri;
349         final String _protocol;
350         final String _origin;
351         final MaskGen _maskGen;
352         final int _maxIdleTime;
353         final int _maxTextMessageSize;
354         final int _maxBinaryMessageSize;
355         final Map<String,String> _cookies;
356         final List<String> _extensions;
357         final CountDownLatch _done = new CountDownLatch(1);
358 
359         ByteChannel _channel;
360         WebSocketConnection _connection;
361         Throwable _exception;
362 
363         private WebSocketFuture(WebSocket websocket, URI uri, WebSocketClient client, ByteChannel channel)
364         {
365             _websocket=websocket;
366             _uri=uri;
367             _protocol=client._protocol;
368             _origin=client._origin;
369             _maskGen=client._maskGen;
370             _maxIdleTime=client._maxIdleTime;
371             _maxTextMessageSize=client._maxTextMessageSize;
372             _maxBinaryMessageSize=client._maxBinaryMessageSize;
373             _cookies=client._cookies;
374             _extensions=client._extensions;
375             _channel=channel;
376         }
377 
378         public void onConnection(WebSocketConnection connection)
379         {
380             try
381             {
382                 connection.getConnection().setMaxTextMessageSize(_maxTextMessageSize);
383                 connection.getConnection().setMaxBinaryMessageSize(_maxBinaryMessageSize);
384                 
385                 synchronized (this)
386                 {
387                     if (_channel!=null)
388                         _connection=connection;
389                 }
390 
391                 if (_connection!=null)
392                 {
393                     if (_websocket instanceof WebSocket.OnFrame)
394                         ((WebSocket.OnFrame)_websocket).onHandshake((WebSocket.FrameConnection)connection.getConnection());
395 
396                     _websocket.onOpen(connection.getConnection());
397 
398                 }
399             }
400             finally
401             {
402                 _done.countDown();
403             }
404         }
405 
406         public void handshakeFailed(Throwable ex)
407         {
408             try
409             {
410                 ByteChannel channel=null;
411                 synchronized (this)
412                 {
413                     if (_channel!=null)
414                     {
415                         channel=_channel;
416                         _channel=null;
417                         _exception=ex;
418                     }
419                 }
420 
421                 if (channel!=null)
422                 {
423                     if (ex instanceof ProtocolException)
424                         closeChannel(channel,WebSocketConnectionD13.CLOSE_PROTOCOL,ex.getMessage());
425                     else
426                         closeChannel(channel,WebSocketConnectionD13.CLOSE_NO_CLOSE,ex.getMessage());
427                 }
428             }
429             finally
430             {
431                 _done.countDown();
432             }
433         }
434 
435         public Map<String,String> getCookies()
436         {
437             return _cookies;
438         }
439 
440         public String getProtocol()
441         {
442             return _protocol;
443         }
444 
445         public WebSocket getWebSocket()
446         {
447             return _websocket;
448         }
449 
450         public URI getURI()
451         {
452             return _uri;
453         }
454 
455         public int getMaxIdleTime()
456         {
457             return _maxIdleTime;
458         }
459 
460         public String getOrigin()
461         {
462             return _origin;
463         }
464 
465         public MaskGen getMaskGen()
466         {
467             return _maskGen;
468         }
469 
470         public String toString()
471         {
472             return "[" + _uri + ","+_websocket+"]@"+hashCode();
473         }
474 
475         public boolean cancel(boolean mayInterruptIfRunning)
476         {
477             try
478             {
479                 ByteChannel channel=null;
480                 synchronized (this)
481                 {
482                     if (_connection==null && _exception==null && _channel!=null)
483                     {
484                         channel=_channel;
485                         _channel=null;
486                     }
487                 }
488 
489                 if (channel!=null)
490                 {
491                     closeChannel(channel,WebSocketConnectionD13.CLOSE_NO_CLOSE,"cancelled");
492                     return true;
493                 }
494                 return false;
495             }
496             finally
497             {
498                 _done.countDown();
499             }
500         }
501 
502         public boolean isCancelled()
503         {
504             synchronized (this)
505             {
506                 return _channel==null && _connection==null;
507             }
508         }
509 
510         public boolean isDone()
511         {
512             synchronized (this)
513             {
514                 return _connection!=null && _exception==null;
515             }
516         }
517 
518         public org.eclipse.jetty.websocket.WebSocket.Connection get() throws InterruptedException, ExecutionException
519         {
520             try
521             {
522                 return get(Long.MAX_VALUE,TimeUnit.SECONDS);
523             }
524             catch(TimeoutException e)
525             {
526                 throw new IllegalStateException("The universe has ended",e);
527             }
528         }
529 
530         public org.eclipse.jetty.websocket.WebSocket.Connection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
531                 TimeoutException
532         {
533             _done.await(timeout,unit);
534             ByteChannel channel=null;
535             org.eclipse.jetty.websocket.WebSocket.Connection connection=null;
536             Throwable exception;
537             synchronized (this)
538             {
539                 exception=_exception;
540                 if (_connection==null)
541                 {
542                     exception=_exception;
543                     channel=_channel;
544                     _channel=null;
545                 }
546                 else
547                     connection=_connection.getConnection();
548             }
549 
550             if (channel!=null)
551                 closeChannel(channel,WebSocketConnectionD13.CLOSE_NO_CLOSE,"timeout");
552             if (exception!=null)
553                 throw new ExecutionException(exception);
554             if (connection!=null)
555                 return connection;
556             throw new TimeoutException();
557         }
558 
559         private void closeChannel(ByteChannel channel,int code, String message)
560         {
561             try
562             {
563                 _websocket.onClose(code,message);
564             }
565             catch(Exception e)
566             {
567                 __log.warn(e);
568             }
569 
570             try
571             {
572                 channel.close();
573             }
574             catch(IOException e)
575             {
576                 __log.debug(e);
577             }
578         }
579     }
580 }