View Javadoc

1   /*******************************************************************************
2    * Copyright (c) 2011 Intalio, Inc.
3    * ======================================================================
4    * All rights reserved. This program and the accompanying materials
5    * are made available under the terms of the Eclipse Public License v1.0
6    * and Apache License v2.0 which accompanies this distribution.
7    *
8    *   The Eclipse Public License is available at
9    *   http://www.eclipse.org/legal/epl-v10.html
10   *
11   *   The Apache License v2.0 is available at
12   *   http://www.opensource.org/licenses/apache2.0.php
13   *
14   * You may elect to redistribute this code under either of these licenses.
15   *******************************************************************************/
16  package org.eclipse.jetty.websocket;
17  
18  import java.io.IOException;
19  import java.net.InetSocketAddress;
20  import java.net.ProtocolException;
21  import java.net.SocketAddress;
22  import java.net.URI;
23  import java.nio.channels.ByteChannel;
24  import java.nio.channels.SocketChannel;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.CopyOnWriteArrayList;
29  import java.util.concurrent.CountDownLatch;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.TimeoutException;
34  
35  import org.eclipse.jetty.util.log.Logger;
36  
37  
38  /* ------------------------------------------------------------ */
39  /**
40   * <p>{@link WebSocketClient} allows to create multiple connections to multiple destinations
41   * that can speak the websocket protocol.</p>
42   * <p>When creating websocket connections, {@link WebSocketClient} accepts a {@link WebSocket}
43   * object (to receive events from the server), and returns a {@link WebSocket.Connection} to
44   * send data to the server.</p>
45   * <p>Example usage is as follows:</p>
46   * <pre>
47   *   WebSocketClientFactory factory = new WebSocketClientFactory();
48   *   factory.start();
49   *
50   *   WebSocketClient client = factory.newWebSocketClient();
51   *   // Configure the client
52   *
53   *   WebSocket.Connection connection = client.open(new URI("ws://127.0.0.1:8080/"), new WebSocket.OnTextMessage()
54   *   {
55   *     public void onOpen(Connection connection)
56   *     {
57   *       // open notification
58   *     }
59   *
60   *     public void onClose(int closeCode, String message)
61   *     {
62   *       // close notification
63   *     }
64   *
65   *     public void onMessage(String data)
66   *     {
67   *       // handle incoming message
68   *     }
69   *   }).get(5, TimeUnit.SECONDS);
70   *
71   *   connection.sendMessage("Hello World");
72   * </pre>
73   */
74  public class WebSocketClient
75  {
76      private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClient.class.getName());
77  
78      private final WebSocketClientFactory _factory;
79      private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
80      private final List<String> _extensions=new CopyOnWriteArrayList<String>();
81      private String _origin;
82      private String _protocol;
83      private int _maxIdleTime=-1;
84      private int _maxTextMessageSize=16*1024;
85      private int _maxBinaryMessageSize=-1;
86      private MaskGen _maskGen;
87      private SocketAddress _bindAddress;
88  
89      /* ------------------------------------------------------------ */
90      /**
91       * <p>Creates a WebSocketClient from a private WebSocketClientFactory.</p>
92       * <p>This can be wasteful of resources if many clients are created.</p>
93       *
94       * @deprecated Use {@link WebSocketClientFactory#newWebSocketClient()}
95       * @throws Exception if the private WebSocketClientFactory fails to start
96       */
97      @Deprecated
98      public WebSocketClient() throws Exception
99      {
100         _factory=new WebSocketClientFactory();
101         _factory.start();
102         _maskGen=_factory.getMaskGen();
103     }
104 
105     /* ------------------------------------------------------------ */
106     /**
107      * <p>Creates a WebSocketClient with shared WebSocketClientFactory.</p>
108      *
109      * @param factory the shared {@link WebSocketClientFactory}
110      */
111     public WebSocketClient(WebSocketClientFactory factory)
112     {
113         _factory=factory;
114         _maskGen=_factory.getMaskGen();
115     }
116 
117     /* ------------------------------------------------------------ */
118     /**
119      * @return The WebSocketClientFactory this client was created with.
120      */
121     public WebSocketClientFactory getFactory()
122     {
123         return _factory;
124     }
125 
126     /* ------------------------------------------------------------ */
127     /**
128      * @return the address to bind the socket channel to
129      * @see #setBindAddress(SocketAddress)
130      */
131     public SocketAddress getBindAddress()
132     {
133         return _bindAddress;
134     }
135 
136     /* ------------------------------------------------------------ */
137     /**
138      * @param bindAddress the address to bind the socket channel to
139      * @see #getBindAddress()
140      */
141     public void setBindAddress(SocketAddress bindAddress)
142     {
143         this._bindAddress = bindAddress;
144     }
145 
146     /* ------------------------------------------------------------ */
147     /**
148      * @return The maxIdleTime in ms for connections opened by this client,
149      * or -1 if the default from {@link WebSocketClientFactory#getSelectorManager()} is used.
150      * @see #setMaxIdleTime(int)
151      */
152     public int getMaxIdleTime()
153     {
154         return _maxIdleTime;
155     }
156 
157     /* ------------------------------------------------------------ */
158     /**
159      * @param maxIdleTime The max idle time in ms for connections opened by this client
160      * @see #getMaxIdleTime()
161      */
162     public void setMaxIdleTime(int maxIdleTime)
163     {
164         _maxIdleTime=maxIdleTime;
165     }
166 
167     /* ------------------------------------------------------------ */
168     /**
169      * @return The subprotocol string for connections opened by this client.
170      * @see #setProtocol(String)
171      */
172     public String getProtocol()
173     {
174         return _protocol;
175     }
176 
177     /* ------------------------------------------------------------ */
178     /**
179      * @param protocol The subprotocol string for connections opened by this client.
180      * @see #getProtocol()
181      */
182     public void setProtocol(String protocol)
183     {
184         _protocol = protocol;
185     }
186 
187     /* ------------------------------------------------------------ */
188     /**
189      * @return The origin URI of the client
190      * @see #setOrigin(String)
191      */
192     public String getOrigin()
193     {
194         return _origin;
195     }
196 
197     /* ------------------------------------------------------------ */
198     /**
199      * @param origin The origin URI of the client (eg "http://example.com")
200      * @see #getOrigin()
201      */
202     public void setOrigin(String origin)
203     {
204         _origin = origin;
205     }
206 
207     /* ------------------------------------------------------------ */
208     /**
209      * <p>Returns the map of the cookies that are sent during the initial HTTP handshake
210      * that upgrades to the websocket protocol.</p>
211      * @return The read-write cookie map
212      */
213     public Map<String,String> getCookies()
214     {
215         return _cookies;
216     }
217 
218     /* ------------------------------------------------------------ */
219     /**
220      * @return The list of websocket protocol extensions
221      */
222     public List<String> getExtensions()
223     {
224         return _extensions;
225     }
226 
227     /* ------------------------------------------------------------ */
228     /**
229      * @return the mask generator to use, or null if not mask generator should be used
230      * @see #setMaskGen(MaskGen)
231      */
232     public MaskGen getMaskGen()
233     {
234         return _maskGen;
235     }
236 
237     /* ------------------------------------------------------------ */
238     /**
239      * @param maskGen the mask generator to use, or null if not mask generator should be used
240      * @see #getMaskGen()
241      */
242     public void setMaskGen(MaskGen maskGen)
243     {
244         _maskGen = maskGen;
245     }
246 
247     /* ------------------------------------------------------------ */
248     /**
249      * @return The initial maximum text message size (in characters) for a connection
250      */
251     public int getMaxTextMessageSize()
252     {
253         return _maxTextMessageSize;
254     }
255 
256     /* ------------------------------------------------------------ */
257     /**
258      * Set the initial maximum text message size for a connection. This can be changed by
259      * the application calling {@link WebSocket.Connection#setMaxTextMessageSize(int)}.
260      * @param maxTextMessageSize The default maximum text message size (in characters) for a connection
261      */
262     public void setMaxTextMessageSize(int maxTextMessageSize)
263     {
264         _maxTextMessageSize = maxTextMessageSize;
265     }
266 
267     /* ------------------------------------------------------------ */
268     /**
269      * @return The initial maximum binary message size (in bytes)  for a connection
270      */
271     public int getMaxBinaryMessageSize()
272     {
273         return _maxBinaryMessageSize;
274     }
275 
276     /* ------------------------------------------------------------ */
277     /**
278      * Set the initial maximum binary message size for a connection. This can be changed by
279      * the application calling {@link WebSocket.Connection#setMaxBinaryMessageSize(int)}.
280      * @param maxBinaryMessageSize The default maximum binary message size (in bytes) for a connection
281      */
282     public void setMaxBinaryMessageSize(int maxBinaryMessageSize)
283     {
284         _maxBinaryMessageSize = maxBinaryMessageSize;
285     }
286 
287     /* ------------------------------------------------------------ */
288     /**
289      * <p>Opens a websocket connection to the URI and blocks until the connection is accepted or there is an error.</p>
290      *
291      * @param uri The URI to connect to.
292      * @param websocket The {@link WebSocket} instance to handle incoming events.
293      * @param maxConnectTime The interval to wait for a successful connection
294      * @param units the units of the maxConnectTime
295      * @return A {@link WebSocket.Connection}
296      * @throws IOException if the connection fails
297      * @throws InterruptedException if the thread is interrupted
298      * @throws TimeoutException if the timeout elapses before the connection is completed
299      * @see #open(URI, WebSocket)
300      */
301     public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
302     {
303         try
304         {
305             return open(uri,websocket).get(maxConnectTime,units);
306         }
307         catch (ExecutionException e)
308         {
309             Throwable cause = e.getCause();
310             if (cause instanceof IOException)
311                 throw (IOException)cause;
312             if (cause instanceof Error)
313                 throw (Error)cause;
314             if (cause instanceof RuntimeException)
315                 throw (RuntimeException)cause;
316             throw new RuntimeException(cause);
317         }
318     }
319 
320     /* ------------------------------------------------------------ */
321     /**
322      * <p>Asynchronously opens a websocket connection and returns a {@link Future} to obtain the connection.</p>
323      * <p>The caller must call {@link Future#get(long, TimeUnit)} if they wish to impose a connect timeout on the open.</p>
324      *
325      * @param uri The URI to connect to.
326      * @param websocket The {@link WebSocket} instance to handle incoming events.
327      * @return A {@link Future} to the {@link WebSocket.Connection}
328      * @throws IOException if the connection fails
329      * @see #open(URI, WebSocket, long, TimeUnit)
330      */
331     public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
332     {
333         if (!_factory.isStarted())
334             throw new IllegalStateException("Factory !started");
335 
336         InetSocketAddress address = toSocketAddress(uri);
337 
338         SocketChannel channel = SocketChannel.open();
339         if (_bindAddress != null)
340             channel.socket().bind(_bindAddress);
341         channel.socket().setTcpNoDelay(true);
342 
343         WebSocketFuture holder = new WebSocketFuture(websocket, uri, this, channel);
344 
345         channel.configureBlocking(false);
346         channel.connect(address);
347         _factory.getSelectorManager().register(channel, holder);
348 
349         return holder;
350     }
351 
352     public static InetSocketAddress toSocketAddress(URI uri)
353     {
354         String scheme = uri.getScheme();
355         if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
356             throw new IllegalArgumentException("Bad WebSocket scheme: " + scheme);
357         int port = uri.getPort();
358         if (port == 0)
359             throw new IllegalArgumentException("Bad WebSocket port: " + port);
360         if (port < 0)
361             port = "ws".equals(scheme) ? 80 : 443;
362 
363         return new InetSocketAddress(uri.getHost(), port);
364     }
365 
366     /* ------------------------------------------------------------ */
367     /** The Future Websocket Connection.
368      */
369     static class WebSocketFuture implements Future<WebSocket.Connection>
370     {
371         final WebSocket _websocket;
372         final URI _uri;
373         final WebSocketClient _client;
374         final CountDownLatch _done = new CountDownLatch(1);
375         ByteChannel _channel;
376         WebSocketConnection _connection;
377         Throwable _exception;
378 
379         private WebSocketFuture(WebSocket websocket, URI uri, WebSocketClient client, ByteChannel channel)
380         {
381             _websocket=websocket;
382             _uri=uri;
383             _client=client;
384             _channel=channel;
385         }
386 
387         public void onConnection(WebSocketConnection connection)
388         {
389             try
390             {
391                 _client.getFactory().addConnection(connection);
392 
393                 connection.getConnection().setMaxTextMessageSize(_client.getMaxTextMessageSize());
394                 connection.getConnection().setMaxBinaryMessageSize(_client.getMaxBinaryMessageSize());
395 
396                 WebSocketConnection con;
397                 synchronized (this)
398                 {
399                     if (_channel!=null)
400                         _connection=connection;
401                     con=_connection;
402                 }
403 
404                 if (con!=null)
405                 {
406                     if (_websocket instanceof WebSocket.OnFrame)
407                         ((WebSocket.OnFrame)_websocket).onHandshake((WebSocket.FrameConnection)con.getConnection());
408 
409                     _websocket.onOpen(con.getConnection());
410                 }
411             }
412             finally
413             {
414                 _done.countDown();
415             }
416         }
417 
418         public void handshakeFailed(Throwable ex)
419         {
420             try
421             {
422                 ByteChannel channel=null;
423                 synchronized (this)
424                 {
425                     if (_channel!=null)
426                     {
427                         channel=_channel;
428                         _channel=null;
429                         _exception=ex;
430                     }
431                 }
432 
433                 if (channel!=null)
434                 {
435                     if (ex instanceof ProtocolException)
436                         closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_PROTOCOL,ex.getMessage());
437                     else
438                         closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,ex.getMessage());
439                 }
440             }
441             finally
442             {
443                 _done.countDown();
444             }
445         }
446 
447         public Map<String,String> getCookies()
448         {
449             return _client.getCookies();
450         }
451 
452         public String getProtocol()
453         {
454             return _client.getProtocol();
455         }
456 
457         public WebSocket getWebSocket()
458         {
459             return _websocket;
460         }
461 
462         public URI getURI()
463         {
464             return _uri;
465         }
466 
467         public int getMaxIdleTime()
468         {
469             return _client.getMaxIdleTime();
470         }
471 
472         public String getOrigin()
473         {
474             return _client.getOrigin();
475         }
476 
477         public MaskGen getMaskGen()
478         {
479             return _client.getMaskGen();
480         }
481 
482         @Override
483         public String toString()
484         {
485             return "[" + _uri + ","+_websocket+"]@"+hashCode();
486         }
487 
488         public boolean cancel(boolean mayInterruptIfRunning)
489         {
490             try
491             {
492                 ByteChannel channel=null;
493                 synchronized (this)
494                 {
495                     if (_connection==null && _exception==null && _channel!=null)
496                     {
497                         channel=_channel;
498                         _channel=null;
499                     }
500                 }
501 
502                 if (channel!=null)
503                 {
504                     closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"cancelled");
505                     return true;
506                 }
507                 return false;
508             }
509             finally
510             {
511                 _done.countDown();
512             }
513         }
514 
515         public boolean isCancelled()
516         {
517             synchronized (this)
518             {
519                 return _channel==null && _connection==null;
520             }
521         }
522 
523         public boolean isDone()
524         {
525             synchronized (this)
526             {
527                 return _connection!=null && _exception==null;
528             }
529         }
530 
531         public org.eclipse.jetty.websocket.WebSocket.Connection get() throws InterruptedException, ExecutionException
532         {
533             try
534             {
535                 return get(Long.MAX_VALUE,TimeUnit.SECONDS);
536             }
537             catch(TimeoutException e)
538             {
539                 throw new IllegalStateException("The universe has ended",e);
540             }
541         }
542 
543         public org.eclipse.jetty.websocket.WebSocket.Connection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
544                 TimeoutException
545         {
546             _done.await(timeout,unit);
547             ByteChannel channel=null;
548             org.eclipse.jetty.websocket.WebSocket.Connection connection=null;
549             Throwable exception;
550             synchronized (this)
551             {
552                 exception=_exception;
553                 if (_connection==null)
554                 {
555                     exception=_exception;
556                     channel=_channel;
557                     _channel=null;
558                 }
559                 else
560                     connection=_connection.getConnection();
561             }
562 
563             if (channel!=null)
564                 closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"timeout");
565             if (exception!=null)
566                 throw new ExecutionException(exception);
567             if (connection!=null)
568                 return connection;
569             throw new TimeoutException();
570         }
571 
572         private void closeChannel(ByteChannel channel,int code, String message)
573         {
574             try
575             {
576                 _websocket.onClose(code,message);
577             }
578             catch(Exception e)
579             {
580                 __log.warn(e);
581             }
582 
583             try
584             {
585                 channel.close();
586             }
587             catch(IOException e)
588             {
589                 __log.debug(e);
590             }
591         }
592     }
593 }