View Javadoc

1   package org.eclipse.jetty.websocket;
2   
3   import java.io.IOException;
4   import java.net.InetSocketAddress;
5   import java.net.ProtocolException;
6   import java.net.SocketAddress;
7   import java.net.URI;
8   import java.nio.channels.ByteChannel;
9   import java.nio.channels.SocketChannel;
10  import java.util.List;
11  import java.util.Map;
12  import java.util.concurrent.ConcurrentHashMap;
13  import java.util.concurrent.CopyOnWriteArrayList;
14  import java.util.concurrent.CountDownLatch;
15  import java.util.concurrent.ExecutionException;
16  import java.util.concurrent.Future;
17  import java.util.concurrent.TimeUnit;
18  import java.util.concurrent.TimeoutException;
19  
20  import org.eclipse.jetty.util.log.Logger;
21  
22  
23  /* ------------------------------------------------------------ */
24  /**
25   * <p>{@link WebSocketClient} allows to create multiple connections to multiple destinations
26   * that can speak the websocket protocol.</p>
27   * <p>When creating websocket connections, {@link WebSocketClient} accepts a {@link WebSocket}
28   * object (to receive events from the server), and returns a {@link WebSocket.Connection} to
29   * send data to the server.</p>
30   * <p>Example usage is as follows:</p>
31   * <pre>
32   *   WebSocketClientFactory factory = new WebSocketClientFactory();
33   *   factory.start();
34   *
35   *   WebSocketClient client = factory.newWebSocketClient();
36   *   // Configure the client
37   *
38   *   WebSocket.Connection connection = client.open(new URI("ws://127.0.0.1:8080/"), new WebSocket.OnTextMessage()
39   *   {
40   *     public void onOpen(Connection connection)
41   *     {
42   *       // open notification
43   *     }
44   *
45   *     public void onClose(int closeCode, String message)
46   *     {
47   *       // close notification
48   *     }
49   *
50   *     public void onMessage(String data)
51   *     {
52   *       // handle incoming message
53   *     }
54   *   }).get(5, TimeUnit.SECONDS);
55   *
56   *   connection.sendMessage("Hello World");
57   * </pre>
58   */
59  public class WebSocketClient
60  {
61      private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClient.class.getName());
62  
63      private final WebSocketClientFactory _factory;
64      private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
65      private final List<String> _extensions=new CopyOnWriteArrayList<String>();
66      private String _origin;
67      private String _protocol;
68      private int _maxIdleTime=-1;
69      private int _maxTextMessageSize=16*1024;
70      private int _maxBinaryMessageSize=-1;
71      private MaskGen _maskGen;
72      private SocketAddress _bindAddress;
73  
74      /* ------------------------------------------------------------ */
75      /**
76       * <p>Creates a WebSocketClient from a private WebSocketClientFactory.</p>
77       * <p>This can be wasteful of resources if many clients are created.</p>
78       *
79       * @deprecated Use {@link WebSocketClientFactory#newWebSocketClient()}
80       * @throws Exception if the private WebSocketClientFactory fails to start
81       */
82      @Deprecated
83      public WebSocketClient() throws Exception
84      {
85          _factory=new WebSocketClientFactory();
86          _factory.start();
87          _maskGen=_factory.getMaskGen();
88      }
89  
90      /* ------------------------------------------------------------ */
91      /**
92       * <p>Creates a WebSocketClient with shared WebSocketClientFactory.</p>
93       *
94       * @param factory the shared {@link WebSocketClientFactory}
95       */
96      public WebSocketClient(WebSocketClientFactory factory)
97      {
98          _factory=factory;
99          _maskGen=_factory.getMaskGen();
100     }
101 
102     /* ------------------------------------------------------------ */
103     /**
104      * @return The WebSocketClientFactory this client was created with.
105      */
106     public WebSocketClientFactory getFactory()
107     {
108         return _factory;
109     }
110 
111     /* ------------------------------------------------------------ */
112     /**
113      * @return the address to bind the socket channel to
114      * @see #setBindAddress(SocketAddress)
115      */
116     public SocketAddress getBindAddress()
117     {
118         return _bindAddress;
119     }
120 
121     /* ------------------------------------------------------------ */
122     /**
123      * @param bindAddress the address to bind the socket channel to
124      * @see #getBindAddress()
125      */
126     public void setBindAddress(SocketAddress bindAddress)
127     {
128         this._bindAddress = bindAddress;
129     }
130 
131     /* ------------------------------------------------------------ */
132     /**
133      * @return The maxIdleTime in ms for connections opened by this client,
134      * or -1 if the default from {@link WebSocketClientFactory#getSelectorManager()} is used.
135      * @see #setMaxIdleTime(int)
136      */
137     public int getMaxIdleTime()
138     {
139         return _maxIdleTime;
140     }
141 
142     /* ------------------------------------------------------------ */
143     /**
144      * @param maxIdleTime The max idle time in ms for connections opened by this client
145      * @see #getMaxIdleTime()
146      */
147     public void setMaxIdleTime(int maxIdleTime)
148     {
149         _maxIdleTime=maxIdleTime;
150     }
151 
152     /* ------------------------------------------------------------ */
153     /**
154      * @return The subprotocol string for connections opened by this client.
155      * @see #setProtocol(String)
156      */
157     public String getProtocol()
158     {
159         return _protocol;
160     }
161 
162     /* ------------------------------------------------------------ */
163     /**
164      * @param protocol The subprotocol string for connections opened by this client.
165      * @see #getProtocol()
166      */
167     public void setProtocol(String protocol)
168     {
169         _protocol = protocol;
170     }
171 
172     /* ------------------------------------------------------------ */
173     /**
174      * @return The origin URI of the client
175      * @see #setOrigin(String)
176      */
177     public String getOrigin()
178     {
179         return _origin;
180     }
181 
182     /* ------------------------------------------------------------ */
183     /**
184      * @param origin The origin URI of the client (eg "http://example.com")
185      * @see #getOrigin()
186      */
187     public void setOrigin(String origin)
188     {
189         _origin = origin;
190     }
191 
192     /* ------------------------------------------------------------ */
193     /**
194      * <p>Returns the map of the cookies that are sent during the initial HTTP handshake
195      * that upgrades to the websocket protocol.</p>
196      * @return The read-write cookie map
197      */
198     public Map<String,String> getCookies()
199     {
200         return _cookies;
201     }
202 
203     /* ------------------------------------------------------------ */
204     /**
205      * @return The list of websocket protocol extensions
206      */
207     public List<String> getExtensions()
208     {
209         return _extensions;
210     }
211 
212     /* ------------------------------------------------------------ */
213     /**
214      * @return the mask generator to use, or null if not mask generator should be used
215      * @see #setMaskGen(MaskGen)
216      */
217     public MaskGen getMaskGen()
218     {
219         return _maskGen;
220     }
221 
222     /* ------------------------------------------------------------ */
223     /**
224      * @param maskGen the mask generator to use, or null if not mask generator should be used
225      * @see #getMaskGen()
226      */
227     public void setMaskGen(MaskGen maskGen)
228     {
229         _maskGen = maskGen;
230     }
231 
232     /* ------------------------------------------------------------ */
233     /**
234      * @return The initial maximum text message size (in characters) for a connection
235      */
236     public int getMaxTextMessageSize()
237     {
238         return _maxTextMessageSize;
239     }
240 
241     /* ------------------------------------------------------------ */
242     /**
243      * Set the initial maximum text message size for a connection. This can be changed by
244      * the application calling {@link WebSocket.Connection#setMaxTextMessageSize(int)}.
245      * @param maxTextMessageSize The default maximum text message size (in characters) for a connection
246      */
247     public void setMaxTextMessageSize(int maxTextMessageSize)
248     {
249         _maxTextMessageSize = maxTextMessageSize;
250     }
251 
252     /* ------------------------------------------------------------ */
253     /**
254      * @return The initial maximum binary message size (in bytes)  for a connection
255      */
256     public int getMaxBinaryMessageSize()
257     {
258         return _maxBinaryMessageSize;
259     }
260 
261     /* ------------------------------------------------------------ */
262     /**
263      * Set the initial maximum binary message size for a connection. This can be changed by
264      * the application calling {@link WebSocket.Connection#setMaxBinaryMessageSize(int)}.
265      * @param maxBinaryMessageSize The default maximum binary message size (in bytes) for a connection
266      */
267     public void setMaxBinaryMessageSize(int maxBinaryMessageSize)
268     {
269         _maxBinaryMessageSize = maxBinaryMessageSize;
270     }
271 
272     /* ------------------------------------------------------------ */
273     /**
274      * <p>Opens a websocket connection to the URI and blocks until the connection is accepted or there is an error.</p>
275      *
276      * @param uri The URI to connect to.
277      * @param websocket The {@link WebSocket} instance to handle incoming events.
278      * @param maxConnectTime The interval to wait for a successful connection
279      * @param units the units of the maxConnectTime
280      * @return A {@link WebSocket.Connection}
281      * @throws IOException if the connection fails
282      * @throws InterruptedException if the thread is interrupted
283      * @throws TimeoutException if the timeout elapses before the connection is completed
284      * @see #open(URI, WebSocket)
285      */
286     public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
287     {
288         try
289         {
290             return open(uri,websocket).get(maxConnectTime,units);
291         }
292         catch (ExecutionException e)
293         {
294             Throwable cause = e.getCause();
295             if (cause instanceof IOException)
296                 throw (IOException)cause;
297             if (cause instanceof Error)
298                 throw (Error)cause;
299             if (cause instanceof RuntimeException)
300                 throw (RuntimeException)cause;
301             throw new RuntimeException(cause);
302         }
303     }
304 
305     /* ------------------------------------------------------------ */
306     /**
307      * <p>Asynchronously opens a websocket connection and returns a {@link Future} to obtain the connection.</p>
308      * <p>The caller must call {@link Future#get(long, TimeUnit)} if they wish to impose a connect timeout on the open.</p>
309      *
310      * @param uri The URI to connect to.
311      * @param websocket The {@link WebSocket} instance to handle incoming events.
312      * @return A {@link Future} to the {@link WebSocket.Connection}
313      * @throws IOException if the connection fails
314      * @see #open(URI, WebSocket, long, TimeUnit)
315      */
316     public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
317     {
318         if (!_factory.isStarted())
319             throw new IllegalStateException("Factory !started");
320         String scheme=uri.getScheme();
321         if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
322             throw new IllegalArgumentException("Bad WebSocket scheme '"+scheme+"'");
323         if ("wss".equalsIgnoreCase(scheme))
324             throw new IOException("wss not supported");
325 
326         SocketChannel channel = SocketChannel.open();
327         if (_bindAddress != null)
328             channel.socket().bind(_bindAddress);
329         channel.socket().setTcpNoDelay(true);
330 
331         InetSocketAddress address=new InetSocketAddress(uri.getHost(),uri.getPort());
332 
333         final WebSocketFuture holder=new WebSocketFuture(websocket,uri,this,channel);
334 
335         channel.configureBlocking(false);
336         channel.connect(address);
337         _factory.getSelectorManager().register( channel, holder);
338 
339         return holder;
340     }
341 
342     /* ------------------------------------------------------------ */
343     /** The Future Websocket Connection.
344      */
345     static class WebSocketFuture implements Future<WebSocket.Connection>
346     {
347         final WebSocket _websocket;
348         final URI _uri;
349         final String _protocol;
350         final String _origin;
351         final MaskGen _maskGen;
352         final int _maxIdleTime;
353         final int _maxTextMessageSize;
354         final int _maxBinaryMessageSize;
355         final Map<String,String> _cookies;
356         final List<String> _extensions;
357         final CountDownLatch _done = new CountDownLatch(1);
358 
359         ByteChannel _channel;
360         WebSocketConnection _connection;
361         Throwable _exception;
362 
363         private WebSocketFuture(WebSocket websocket, URI uri, WebSocketClient client, ByteChannel channel)
364         {
365             _websocket=websocket;
366             _uri=uri;
367             _protocol=client._protocol;
368             _origin=client._origin;
369             _maskGen=client._maskGen;
370             _maxIdleTime=client._maxIdleTime;
371             _maxTextMessageSize=client._maxTextMessageSize;
372             _maxBinaryMessageSize=client._maxBinaryMessageSize;
373             _cookies=client._cookies;
374             _extensions=client._extensions;
375             _channel=channel;
376         }
377 
378         public void onConnection(WebSocketConnection connection)
379         {
380             try
381             {
382                 connection.getConnection().setMaxTextMessageSize(_maxTextMessageSize);
383                 connection.getConnection().setMaxBinaryMessageSize(_maxBinaryMessageSize);
384 
385                 WebSocketConnection con;
386                 synchronized (this)
387                 {
388                     if (_channel!=null)
389                         _connection=connection;
390                     con=_connection;
391                 }
392 
393                 if (con!=null)
394                 {
395                     if (_websocket instanceof WebSocket.OnFrame)
396                         ((WebSocket.OnFrame)_websocket).onHandshake((WebSocket.FrameConnection)con.getConnection());
397 
398                     _websocket.onOpen(con.getConnection());
399                 }
400             }
401             finally
402             {
403                 _done.countDown();
404             }
405         }
406 
407         public void handshakeFailed(Throwable ex)
408         {
409             try
410             {
411                 ByteChannel channel=null;
412                 synchronized (this)
413                 {
414                     if (_channel!=null)
415                     {
416                         channel=_channel;
417                         _channel=null;
418                         _exception=ex;
419                     }
420                 }
421 
422                 if (channel!=null)
423                 {
424                     if (ex instanceof ProtocolException)
425                         closeChannel(channel,WebSocketConnectionD13.CLOSE_PROTOCOL,ex.getMessage());
426                     else
427                         closeChannel(channel,WebSocketConnectionD13.CLOSE_NO_CLOSE,ex.getMessage());
428                 }
429             }
430             finally
431             {
432                 _done.countDown();
433             }
434         }
435 
436         public Map<String,String> getCookies()
437         {
438             return _cookies;
439         }
440 
441         public String getProtocol()
442         {
443             return _protocol;
444         }
445 
446         public WebSocket getWebSocket()
447         {
448             return _websocket;
449         }
450 
451         public URI getURI()
452         {
453             return _uri;
454         }
455 
456         public int getMaxIdleTime()
457         {
458             return _maxIdleTime;
459         }
460 
461         public String getOrigin()
462         {
463             return _origin;
464         }
465 
466         public MaskGen getMaskGen()
467         {
468             return _maskGen;
469         }
470 
471         public String toString()
472         {
473             return "[" + _uri + ","+_websocket+"]@"+hashCode();
474         }
475 
476         public boolean cancel(boolean mayInterruptIfRunning)
477         {
478             try
479             {
480                 ByteChannel channel=null;
481                 synchronized (this)
482                 {
483                     if (_connection==null && _exception==null && _channel!=null)
484                     {
485                         channel=_channel;
486                         _channel=null;
487                     }
488                 }
489 
490                 if (channel!=null)
491                 {
492                     closeChannel(channel,WebSocketConnectionD13.CLOSE_NO_CLOSE,"cancelled");
493                     return true;
494                 }
495                 return false;
496             }
497             finally
498             {
499                 _done.countDown();
500             }
501         }
502 
503         public boolean isCancelled()
504         {
505             synchronized (this)
506             {
507                 return _channel==null && _connection==null;
508             }
509         }
510 
511         public boolean isDone()
512         {
513             synchronized (this)
514             {
515                 return _connection!=null && _exception==null;
516             }
517         }
518 
519         public org.eclipse.jetty.websocket.WebSocket.Connection get() throws InterruptedException, ExecutionException
520         {
521             try
522             {
523                 return get(Long.MAX_VALUE,TimeUnit.SECONDS);
524             }
525             catch(TimeoutException e)
526             {
527                 throw new IllegalStateException("The universe has ended",e);
528             }
529         }
530 
531         public org.eclipse.jetty.websocket.WebSocket.Connection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
532                 TimeoutException
533         {
534             _done.await(timeout,unit);
535             ByteChannel channel=null;
536             org.eclipse.jetty.websocket.WebSocket.Connection connection=null;
537             Throwable exception;
538             synchronized (this)
539             {
540                 exception=_exception;
541                 if (_connection==null)
542                 {
543                     exception=_exception;
544                     channel=_channel;
545                     _channel=null;
546                 }
547                 else
548                     connection=_connection.getConnection();
549             }
550 
551             if (channel!=null)
552                 closeChannel(channel,WebSocketConnectionD13.CLOSE_NO_CLOSE,"timeout");
553             if (exception!=null)
554                 throw new ExecutionException(exception);
555             if (connection!=null)
556                 return connection;
557             throw new TimeoutException();
558         }
559 
560         private void closeChannel(ByteChannel channel,int code, String message)
561         {
562             try
563             {
564                 _websocket.onClose(code,message);
565             }
566             catch(Exception e)
567             {
568                 __log.warn(e);
569             }
570 
571             try
572             {
573                 channel.close();
574             }
575             catch(IOException e)
576             {
577                 __log.debug(e);
578             }
579         }
580     }
581 }