View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.ProtocolException;
24  import java.net.SocketAddress;
25  import java.net.URI;
26  import java.nio.channels.ByteChannel;
27  import java.nio.channels.SocketChannel;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.CopyOnWriteArrayList;
32  import java.util.concurrent.CountDownLatch;
33  import java.util.concurrent.ExecutionException;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.TimeoutException;
37  
38  import org.eclipse.jetty.util.log.Logger;
39  
40  
41  /* ------------------------------------------------------------ */
42  /**
43   * <p>{@link WebSocketClient} allows to create multiple connections to multiple destinations
44   * that can speak the websocket protocol.</p>
45   * <p>When creating websocket connections, {@link WebSocketClient} accepts a {@link WebSocket}
46   * object (to receive events from the server), and returns a {@link WebSocket.Connection} to
47   * send data to the server.</p>
48   * <p>Example usage is as follows:</p>
49   * <pre>
50   *   WebSocketClientFactory factory = new WebSocketClientFactory();
51   *   factory.start();
52   *
53   *   WebSocketClient client = factory.newWebSocketClient();
54   *   // Configure the client
55   *
56   *   WebSocket.Connection connection = client.open(new URI("ws://127.0.0.1:8080/"), new WebSocket.OnTextMessage()
57   *   {
58   *     public void onOpen(Connection connection)
59   *     {
60   *       // open notification
61   *     }
62   *
63   *     public void onClose(int closeCode, String message)
64   *     {
65   *       // close notification
66   *     }
67   *
68   *     public void onMessage(String data)
69   *     {
70   *       // handle incoming message
71   *     }
72   *   }).get(5, TimeUnit.SECONDS);
73   *
74   *   connection.sendMessage("Hello World");
75   * </pre>
76   */
77  public class WebSocketClient
78  {
79      private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClient.class.getName());
80  
81      private final WebSocketClientFactory _factory;
82      private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
83      private final List<String> _extensions=new CopyOnWriteArrayList<String>();
84      private String _origin;
85      private String _protocol;
86      private int _maxIdleTime=-1;
87      private int _maxTextMessageSize=16*1024;
88      private int _maxBinaryMessageSize=-1;
89      private MaskGen _maskGen;
90      private SocketAddress _bindAddress;
91  
92      /* ------------------------------------------------------------ */
93      /**
94       * <p>Creates a WebSocketClient from a private WebSocketClientFactory.</p>
95       * <p>This can be wasteful of resources if many clients are created.</p>
96       *
97       * @deprecated Use {@link WebSocketClientFactory#newWebSocketClient()}
98       * @throws Exception if the private WebSocketClientFactory fails to start
99       */
100     @Deprecated
101     public WebSocketClient() throws Exception
102     {
103         _factory=new WebSocketClientFactory();
104         _factory.start();
105         _maskGen=_factory.getMaskGen();
106     }
107 
108     /* ------------------------------------------------------------ */
109     /**
110      * <p>Creates a WebSocketClient with shared WebSocketClientFactory.</p>
111      *
112      * @param factory the shared {@link WebSocketClientFactory}
113      */
114     public WebSocketClient(WebSocketClientFactory factory)
115     {
116         _factory=factory;
117         _maskGen=_factory.getMaskGen();
118     }
119 
120     /* ------------------------------------------------------------ */
121     /**
122      * @return The WebSocketClientFactory this client was created with.
123      */
124     public WebSocketClientFactory getFactory()
125     {
126         return _factory;
127     }
128 
129     /* ------------------------------------------------------------ */
130     /**
131      * @return the address to bind the socket channel to
132      * @see #setBindAddress(SocketAddress)
133      */
134     public SocketAddress getBindAddress()
135     {
136         return _bindAddress;
137     }
138 
139     /* ------------------------------------------------------------ */
140     /**
141      * @param bindAddress the address to bind the socket channel to
142      * @see #getBindAddress()
143      */
144     public void setBindAddress(SocketAddress bindAddress)
145     {
146         this._bindAddress = bindAddress;
147     }
148 
149     /* ------------------------------------------------------------ */
150     /**
151      * @return The maxIdleTime in ms for connections opened by this client,
152      * or -1 if the default from {@link WebSocketClientFactory#getSelectorManager()} is used.
153      * @see #setMaxIdleTime(int)
154      */
155     public int getMaxIdleTime()
156     {
157         return _maxIdleTime;
158     }
159 
160     /* ------------------------------------------------------------ */
161     /**
162      * @param maxIdleTime The max idle time in ms for connections opened by this client
163      * @see #getMaxIdleTime()
164      */
165     public void setMaxIdleTime(int maxIdleTime)
166     {
167         _maxIdleTime=maxIdleTime;
168     }
169 
170     /* ------------------------------------------------------------ */
171     /**
172      * @return The subprotocol string for connections opened by this client.
173      * @see #setProtocol(String)
174      */
175     public String getProtocol()
176     {
177         return _protocol;
178     }
179 
180     /* ------------------------------------------------------------ */
181     /**
182      * @param protocol The subprotocol string for connections opened by this client.
183      * @see #getProtocol()
184      */
185     public void setProtocol(String protocol)
186     {
187         _protocol = protocol;
188     }
189 
190     /* ------------------------------------------------------------ */
191     /**
192      * @return The origin URI of the client
193      * @see #setOrigin(String)
194      */
195     public String getOrigin()
196     {
197         return _origin;
198     }
199 
200     /* ------------------------------------------------------------ */
201     /**
202      * @param origin The origin URI of the client (eg "http://example.com")
203      * @see #getOrigin()
204      */
205     public void setOrigin(String origin)
206     {
207         _origin = origin;
208     }
209 
210     /* ------------------------------------------------------------ */
211     /**
212      * <p>Returns the map of the cookies that are sent during the initial HTTP handshake
213      * that upgrades to the websocket protocol.</p>
214      * @return The read-write cookie map
215      */
216     public Map<String,String> getCookies()
217     {
218         return _cookies;
219     }
220 
221     /* ------------------------------------------------------------ */
222     /**
223      * @return The list of websocket protocol extensions
224      */
225     public List<String> getExtensions()
226     {
227         return _extensions;
228     }
229 
230     /* ------------------------------------------------------------ */
231     /**
232      * @return the mask generator to use, or null if not mask generator should be used
233      * @see #setMaskGen(MaskGen)
234      */
235     public MaskGen getMaskGen()
236     {
237         return _maskGen;
238     }
239 
240     /* ------------------------------------------------------------ */
241     /**
242      * @param maskGen the mask generator to use, or null if not mask generator should be used
243      * @see #getMaskGen()
244      */
245     public void setMaskGen(MaskGen maskGen)
246     {
247         _maskGen = maskGen;
248     }
249 
250     /* ------------------------------------------------------------ */
251     /**
252      * @return The initial maximum text message size (in characters) for a connection
253      */
254     public int getMaxTextMessageSize()
255     {
256         return _maxTextMessageSize;
257     }
258 
259     /* ------------------------------------------------------------ */
260     /**
261      * Set the initial maximum text message size for a connection. This can be changed by
262      * the application calling {@link WebSocket.Connection#setMaxTextMessageSize(int)}.
263      * @param maxTextMessageSize The default maximum text message size (in characters) for a connection
264      */
265     public void setMaxTextMessageSize(int maxTextMessageSize)
266     {
267         _maxTextMessageSize = maxTextMessageSize;
268     }
269 
270     /* ------------------------------------------------------------ */
271     /**
272      * @return The initial maximum binary message size (in bytes)  for a connection
273      */
274     public int getMaxBinaryMessageSize()
275     {
276         return _maxBinaryMessageSize;
277     }
278 
279     /* ------------------------------------------------------------ */
280     /**
281      * Set the initial maximum binary message size for a connection. This can be changed by
282      * the application calling {@link WebSocket.Connection#setMaxBinaryMessageSize(int)}.
283      * @param maxBinaryMessageSize The default maximum binary message size (in bytes) for a connection
284      */
285     public void setMaxBinaryMessageSize(int maxBinaryMessageSize)
286     {
287         _maxBinaryMessageSize = maxBinaryMessageSize;
288     }
289 
290     /* ------------------------------------------------------------ */
291     /**
292      * <p>Opens a websocket connection to the URI and blocks until the connection is accepted or there is an error.</p>
293      *
294      * @param uri The URI to connect to.
295      * @param websocket The {@link WebSocket} instance to handle incoming events.
296      * @param maxConnectTime The interval to wait for a successful connection
297      * @param units the units of the maxConnectTime
298      * @return A {@link WebSocket.Connection}
299      * @throws IOException if the connection fails
300      * @throws InterruptedException if the thread is interrupted
301      * @throws TimeoutException if the timeout elapses before the connection is completed
302      * @see #open(URI, WebSocket)
303      */
304     public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
305     {
306         try
307         {
308             return open(uri,websocket).get(maxConnectTime,units);
309         }
310         catch (ExecutionException e)
311         {
312             Throwable cause = e.getCause();
313             if (cause instanceof IOException)
314                 throw (IOException)cause;
315             if (cause instanceof Error)
316                 throw (Error)cause;
317             if (cause instanceof RuntimeException)
318                 throw (RuntimeException)cause;
319             throw new RuntimeException(cause);
320         }
321     }
322 
323     /* ------------------------------------------------------------ */
324     /**
325      * <p>Asynchronously opens a websocket connection and returns a {@link Future} to obtain the connection.</p>
326      * <p>The caller must call {@link Future#get(long, TimeUnit)} if they wish to impose a connect timeout on the open.</p>
327      *
328      * @param uri The URI to connect to.
329      * @param websocket The {@link WebSocket} instance to handle incoming events.
330      * @return A {@link Future} to the {@link WebSocket.Connection}
331      * @throws IOException if the connection fails
332      * @see #open(URI, WebSocket, long, TimeUnit)
333      */
334     public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
335     {
336         if (!_factory.isStarted())
337             throw new IllegalStateException("Factory !started");
338 
339         InetSocketAddress address = toSocketAddress(uri);
340 
341         SocketChannel channel = SocketChannel.open();
342         if (_bindAddress != null)
343             channel.socket().bind(_bindAddress);
344         channel.socket().setTcpNoDelay(true);
345 
346         WebSocketFuture holder = new WebSocketFuture(websocket, uri, this, channel);
347 
348         channel.configureBlocking(false);
349         channel.connect(address);
350         _factory.getSelectorManager().register(channel, holder);
351 
352         return holder;
353     }
354 
355     public static InetSocketAddress toSocketAddress(URI uri)
356     {
357         String scheme = uri.getScheme();
358         if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
359             throw new IllegalArgumentException("Bad WebSocket scheme: " + scheme);
360         int port = uri.getPort();
361         if (port == 0)
362             throw new IllegalArgumentException("Bad WebSocket port: " + port);
363         if (port < 0)
364             port = "ws".equals(scheme) ? 80 : 443;
365 
366         return new InetSocketAddress(uri.getHost(), port);
367     }
368 
369     /* ------------------------------------------------------------ */
370     /** The Future Websocket Connection.
371      */
372     static class WebSocketFuture implements Future<WebSocket.Connection>
373     {
374         final WebSocket _websocket;
375         final URI _uri;
376         final WebSocketClient _client;
377         final CountDownLatch _done = new CountDownLatch(1);
378         ByteChannel _channel;
379         WebSocketConnection _connection;
380         Throwable _exception;
381 
382         private WebSocketFuture(WebSocket websocket, URI uri, WebSocketClient client, ByteChannel channel)
383         {
384             _websocket=websocket;
385             _uri=uri;
386             _client=client;
387             _channel=channel;
388         }
389 
390         public void onConnection(WebSocketConnection connection)
391         {
392             try
393             {
394                 _client.getFactory().addConnection(connection);
395 
396                 connection.getConnection().setMaxTextMessageSize(_client.getMaxTextMessageSize());
397                 connection.getConnection().setMaxBinaryMessageSize(_client.getMaxBinaryMessageSize());
398 
399                 WebSocketConnection con;
400                 synchronized (this)
401                 {
402                     if (_channel!=null)
403                         _connection=connection;
404                     con=_connection;
405                 }
406 
407                 if (con!=null)
408                 {
409                     if (_websocket instanceof WebSocket.OnFrame)
410                         ((WebSocket.OnFrame)_websocket).onHandshake((WebSocket.FrameConnection)con.getConnection());
411 
412                     _websocket.onOpen(con.getConnection());
413                 }
414             }
415             finally
416             {
417                 _done.countDown();
418             }
419         }
420 
421         public void handshakeFailed(Throwable ex)
422         {
423             try
424             {
425                 ByteChannel channel=null;
426                 synchronized (this)
427                 {
428                     if (_channel!=null)
429                     {
430                         channel=_channel;
431                         _channel=null;
432                         _exception=ex;
433                     }
434                 }
435 
436                 if (channel!=null)
437                 {
438                     if (ex instanceof ProtocolException)
439                         closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_PROTOCOL,ex.getMessage());
440                     else
441                         closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,ex.getMessage());
442                 }
443             }
444             finally
445             {
446                 _done.countDown();
447             }
448         }
449 
450         public Map<String,String> getCookies()
451         {
452             return _client.getCookies();
453         }
454 
455         public String getProtocol()
456         {
457             return _client.getProtocol();
458         }
459 
460         public WebSocket getWebSocket()
461         {
462             return _websocket;
463         }
464 
465         public URI getURI()
466         {
467             return _uri;
468         }
469 
470         public int getMaxIdleTime()
471         {
472             return _client.getMaxIdleTime();
473         }
474 
475         public String getOrigin()
476         {
477             return _client.getOrigin();
478         }
479 
480         public MaskGen getMaskGen()
481         {
482             return _client.getMaskGen();
483         }
484 
485         @Override
486         public String toString()
487         {
488             return "[" + _uri + ","+_websocket+"]@"+hashCode();
489         }
490 
491         public boolean cancel(boolean mayInterruptIfRunning)
492         {
493             try
494             {
495                 ByteChannel channel=null;
496                 synchronized (this)
497                 {
498                     if (_connection==null && _exception==null && _channel!=null)
499                     {
500                         channel=_channel;
501                         _channel=null;
502                     }
503                 }
504 
505                 if (channel!=null)
506                 {
507                     closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"cancelled");
508                     return true;
509                 }
510                 return false;
511             }
512             finally
513             {
514                 _done.countDown();
515             }
516         }
517 
518         public boolean isCancelled()
519         {
520             synchronized (this)
521             {
522                 return _channel==null && _connection==null;
523             }
524         }
525 
526         public boolean isDone()
527         {
528             synchronized (this)
529             {
530                 return _connection!=null && _exception==null;
531             }
532         }
533 
534         public org.eclipse.jetty.websocket.WebSocket.Connection get() throws InterruptedException, ExecutionException
535         {
536             try
537             {
538                 return get(Long.MAX_VALUE,TimeUnit.SECONDS);
539             }
540             catch(TimeoutException e)
541             {
542                 throw new IllegalStateException("The universe has ended",e);
543             }
544         }
545 
546         public org.eclipse.jetty.websocket.WebSocket.Connection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
547                 TimeoutException
548         {
549             _done.await(timeout,unit);
550             ByteChannel channel=null;
551             org.eclipse.jetty.websocket.WebSocket.Connection connection=null;
552             Throwable exception;
553             synchronized (this)
554             {
555                 exception=_exception;
556                 if (_connection==null)
557                 {
558                     exception=_exception;
559                     channel=_channel;
560                     _channel=null;
561                 }
562                 else
563                     connection=_connection.getConnection();
564             }
565 
566             if (channel!=null)
567                 closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"timeout");
568             if (exception!=null)
569                 throw new ExecutionException(exception);
570             if (connection!=null)
571                 return connection;
572             throw new TimeoutException();
573         }
574 
575         private void closeChannel(ByteChannel channel,int code, String message)
576         {
577             try
578             {
579                 _websocket.onClose(code,message);
580             }
581             catch(Exception e)
582             {
583                 __log.warn(e);
584             }
585 
586             try
587             {
588                 channel.close();
589             }
590             catch(IOException e)
591             {
592                 __log.debug(e);
593             }
594         }
595     }
596 }