View Javadoc

1   package org.eclipse.jetty.websocket;
2   
3   import java.io.EOFException;
4   import java.io.IOException;
5   import java.net.InetSocketAddress;
6   import java.net.ProtocolException;
7   import java.net.URI;
8   import java.nio.channels.ByteChannel;
9   import java.nio.channels.SelectionKey;
10  import java.nio.channels.SocketChannel;
11  import java.util.List;
12  import java.util.Map;
13  import java.util.Random;
14  import java.util.concurrent.ConcurrentHashMap;
15  import java.util.concurrent.CopyOnWriteArrayList;
16  import java.util.concurrent.CountDownLatch;
17  import java.util.concurrent.ExecutionException;
18  import java.util.concurrent.Future;
19  import java.util.concurrent.TimeUnit;
20  import java.util.concurrent.TimeoutException;
21  
22  import org.eclipse.jetty.http.HttpFields;
23  import org.eclipse.jetty.http.HttpParser;
24  import org.eclipse.jetty.io.AbstractConnection;
25  import org.eclipse.jetty.io.Buffer;
26  import org.eclipse.jetty.io.Buffers;
27  import org.eclipse.jetty.io.ByteArrayBuffer;
28  import org.eclipse.jetty.io.ConnectedEndPoint;
29  import org.eclipse.jetty.io.Connection;
30  import org.eclipse.jetty.io.SimpleBuffers;
31  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
32  import org.eclipse.jetty.io.nio.SelectorManager;
33  import org.eclipse.jetty.util.B64Code;
34  import org.eclipse.jetty.util.QuotedStringTokenizer;
35  import org.eclipse.jetty.util.component.AggregateLifeCycle;
36  import org.eclipse.jetty.util.log.Logger;
37  import org.eclipse.jetty.util.thread.QueuedThreadPool;
38  import org.eclipse.jetty.util.thread.ThreadPool;
39  
40  
41  /* ------------------------------------------------------------ */
42  /** WebSocket Client
43   * <p>This WebSocket Client class can create multiple websocket connections to multiple destinations.  
44   * It uses the same {@link WebSocket} endpoint API as the server. 
45   * Simple usage is as follows: <pre>
46   *   WebSocketClient client = new WebSocketClient();
47   *   client.setMaxIdleTime(500);
48   *   client.start();
49   *
50   *   WebSocket.Connection connection =  client.open(new URI("ws://127.0.0.1:8080/"),new WebSocket.OnTextMessage()
51   *   {
52   *     public void onOpen(Connection connection)
53   *     {
54   *       // open notification
55   *     }
56   *
57   *     public void onClose(int closeCode, String message)
58   *     {
59   *       // close notification
60   *     }
61   *         
62   *     public void onMessage(String data)
63   *     {
64   *       // handle incoming message
65   *     }
66   *   }).get(5,TimeUnit.SECONDS);
67   *      
68   *   connection.sendMessage("Hello World");
69   * </pre>      
70   */
71  public class WebSocketClient extends AggregateLifeCycle
72  {   
73      private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClient.class.getCanonicalName());
74      private final static Random __random = new Random();
75      private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
76  
77      private final WebSocketClient _root;
78      private final WebSocketClient _parent;
79      private final ThreadPool _threadPool;
80      private final WebSocketClientSelector _selector;
81  
82      private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
83      private final List<String> _extensions=new CopyOnWriteArrayList<String>();
84      
85      private int _bufferSize=64*1024;
86      private String _protocol;
87      private int _maxIdleTime=-1;
88      
89      private WebSocketBuffers _buffers;
90      
91  
92      /* ------------------------------------------------------------ */
93      /** Create a WebSocket Client with default configuration.
94       */
95      public WebSocketClient()
96      {
97          this(new QueuedThreadPool());
98      }
99      
100     /* ------------------------------------------------------------ */
101     /** Create a WebSocket Client with shared threadpool.
102      * @param threadpool
103      */
104     public WebSocketClient(ThreadPool threadpool)
105     {
106         _root=this;
107         _parent=null;
108         _threadPool=threadpool;
109         _selector=new WebSocketClientSelector();
110         addBean(_selector);
111         addBean(_threadPool);
112     }
113     
114     /* ------------------------------------------------------------ */
115     /** Create a WebSocket Client from another.
116      * <p>If multiple clients are required so that connections created may have different 
117      * configurations, then it is more efficient to create a client based on another, so 
118      * that the thread pool and IO infrastructure may be shared.
119      */
120     public WebSocketClient(WebSocketClient parent)
121     {
122         _root=parent._root;
123         _parent=parent;
124         _threadPool=parent._threadPool;
125         _selector=parent._selector;
126         _parent.addBean(this);
127     }
128     
129     /* ------------------------------------------------------------ */
130     /**
131      * Get the selectorManager. Used to configure the manager.
132      * @return The {@link SelectorManager} instance.
133      */
134     public SelectorManager getSelectorManager()
135     {
136         return _selector;
137     }
138     
139     /* ------------------------------------------------------------ */
140     /** Get the ThreadPool.
141      * <p>Used to set/query the thread pool configuration.
142      * @return The {@link ThreadPool}
143      */
144     public ThreadPool getThreadPool()
145     {
146         return _threadPool;
147     }
148     
149     /* ------------------------------------------------------------ */
150     /** Get the maxIdleTime for connections opened by this client.
151      * @return The maxIdleTime in ms, or -1 if the default from {@link #getSelectorManager()} is used.
152      */
153     public int getMaxIdleTime()
154     {
155         return _maxIdleTime;
156     }
157 
158     /* ------------------------------------------------------------ */
159     /** Set the maxIdleTime for connections opened by this client.
160      * @param maxIdleTime max idle time in ms
161      */
162     public void setMaxIdleTime(int maxIdleTime)
163     {
164         _maxIdleTime=maxIdleTime;
165     }
166 
167     /* ------------------------------------------------------------ */
168     /** Get the WebSocket Buffer size for connections opened by this client.
169      * @return the buffer size in bytes.
170      */
171     public int getBufferSize()
172     {
173         return _bufferSize;
174     }
175 
176     /* ------------------------------------------------------------ */
177     /** Set the WebSocket Buffer size for connections opened by this client.
178      * @param bufferSize the buffer size in bytes.
179      */
180     public void setBufferSize(int bufferSize)
181     {
182         if (isRunning())
183             throw new IllegalStateException(getState());
184         _bufferSize = bufferSize;
185     }
186     
187     /* ------------------------------------------------------------ */
188     /** Get the subprotocol string for connections opened by this client.
189      * @return The subprotocol
190      */
191     public String getProtocol()
192     {
193         return _protocol;
194     }
195 
196     /* ------------------------------------------------------------ */
197     /** Set the subprotocol string for connections opened by this client.
198      * @param protocol The subprotocol
199      */
200     public void setProtocol(String protocol)
201     {
202         _protocol = protocol;
203     }
204     
205     /* ------------------------------------------------------------ */
206     public Map<String,String> getCookies()
207     {
208         return _cookies;
209     }
210     
211     /* ------------------------------------------------------------ */
212     public List<String> getExtensions()
213     {
214         return _extensions;
215     }
216     
217     
218     /* ------------------------------------------------------------ */
219     /** Open a WebSocket connection.
220      * Open a websocket connection to the URI and block until the connection is accepted or there is an error.
221      * @param uri The URI to connect to.
222      * @param websocket The {@link WebSocket} instance to handle incoming events.
223      * @param maxConnectTime The interval to wait for a successful connection
224      * @param units the units of the maxConnectTime
225      * @return A {@link WebSocket.Connection}
226      * @throws IOException
227      * @throws InterruptedException
228      * @throws TimeoutException
229      */
230     public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
231     {
232         try
233         {
234             return open(uri,websocket).get(maxConnectTime,units);
235         }
236         catch (ExecutionException e)
237         {
238             Throwable cause = e.getCause();
239             if (cause instanceof IOException)
240                 throw (IOException)cause;
241             if (cause instanceof Error)
242                 throw (Error)cause;
243             if (cause instanceof RuntimeException)
244                 throw (RuntimeException)cause;
245             throw new RuntimeException(cause);
246         }
247     }
248     
249     /* ------------------------------------------------------------ */
250     /** Asynchronously open a websocket connection.
251      * Open a websocket connection and return a {@link Future} to obtain the connection.  
252      * The caller must call {@link Future#get(long, TimeUnit)} if they wish to impose a connect timeout on the open.
253      * 
254      * @param uri The URI to connect to.
255      * @param websocket The {@link WebSocket} instance to handle incoming events.
256      * @return A {@link Future} to the {@link WebSocket.Connection}
257      * @throws IOException
258      */
259     public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
260     {
261         if (!isStarted())
262             throw new IllegalStateException("!started");
263         String scheme=uri.getScheme();
264         if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
265             throw new IllegalArgumentException("Bad WebSocket scheme '"+scheme+"'");
266         if ("wss".equalsIgnoreCase(scheme))
267             throw new IOException("wss not supported");
268         
269         SocketChannel channel = SocketChannel.open();
270         channel.socket().setTcpNoDelay(true);
271         int maxIdleTime = getMaxIdleTime();
272         if (maxIdleTime<0)
273             maxIdleTime=(int)_selector.getMaxIdleTime();
274         if (maxIdleTime>0)
275             channel.socket().setSoTimeout(maxIdleTime);
276 
277         InetSocketAddress address=new InetSocketAddress(uri.getHost(),uri.getPort());
278 
279         final WebSocketFuture holder=new WebSocketFuture(websocket,uri,_protocol,maxIdleTime,_cookies,_extensions,channel);
280 
281         channel.configureBlocking(false);
282         channel.connect(address);
283         _selector.register( channel, holder);
284 
285         return holder;
286     }
287    
288 
289     @Override
290     protected void doStart() throws Exception
291     {
292         if (_parent!=null && !_parent.isRunning())
293             throw new IllegalStateException("parent:"+getState());
294         
295         _buffers = new WebSocketBuffers(_bufferSize); 
296 
297         super.doStart();
298         
299         // Start a selector and timer if this is the root client
300         if (_parent==null)
301         {
302             for (int i=0;i<_selector.getSelectSets();i++)
303             {
304                 final int id=i;
305                 _threadPool.dispatch(new Runnable()
306                 {
307                     public void run()
308                     {
309                         while(isRunning())
310                         {
311                             try
312                             {
313                                 _selector.doSelect(id);
314                             }
315                             catch (IOException e)
316                             {
317                                 __log.warn(e);
318                             }
319                         }
320                     }
321                 });
322             }
323         }
324     }
325 
326     
327     /* ------------------------------------------------------------ */
328     /** WebSocket Client Selector Manager
329      */
330     class WebSocketClientSelector extends SelectorManager
331     {
332         @Override
333         public boolean dispatch(Runnable task)
334         {
335             return _threadPool.dispatch(task);
336         }
337 
338         @Override
339         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey sKey) throws IOException
340         {
341             return new SelectChannelEndPoint(channel,selectSet,sKey);
342         }
343 
344         @Override
345         protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
346         {
347             WebSocketFuture holder = (WebSocketFuture) endpoint.getSelectionKey().attachment();
348             return new HandshakeConnection(endpoint,holder);
349         }
350         
351         @Override
352         protected void endPointOpened(SelectChannelEndPoint endpoint)
353         {
354             // TODO expose on outer class
355         }
356 
357         @Override
358         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
359         {
360             throw new IllegalStateException();
361         }
362 
363         @Override
364         protected void endPointClosed(SelectChannelEndPoint endpoint)
365         {
366             endpoint.getConnection().closed();
367         }
368 
369         @Override
370         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
371         {
372             if (!(attachment instanceof WebSocketFuture))
373                 super.connectionFailed(channel,ex,attachment);
374             else
375             {
376                 __log.debug(ex);
377                 WebSocketFuture holder = (WebSocketFuture)attachment;
378                 
379                 holder.handshakeFailed(ex);
380             } 
381         }
382     }
383     
384     
385     /* ------------------------------------------------------------ */
386     /** Handshake Connection.
387      * Handles the connection until the handshake succeeds or fails.
388      */
389     class HandshakeConnection extends AbstractConnection
390     {
391         private final SelectChannelEndPoint _endp;
392         private final WebSocketFuture _holder;
393         private final String _key;
394         private final HttpParser _parser;
395         private String _accept;
396         private String _error;
397         
398         
399         public HandshakeConnection(SelectChannelEndPoint endpoint, WebSocketFuture holder)
400         {
401             super(endpoint,System.currentTimeMillis());
402             _endp=endpoint;
403             _holder=holder;
404             
405             byte[] bytes=new byte[16];
406             __random.nextBytes(bytes);
407             _key=new String(B64Code.encode(bytes));
408             
409             Buffers buffers = new SimpleBuffers(_buffers.getBuffer(),null);
410             _parser=new HttpParser(buffers,_endp,
411             
412             new HttpParser.EventHandler()
413             {
414                 @Override
415                 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
416                 {
417                     if (status!=101)
418                     {
419                         _error="Bad response status "+status+" "+reason;
420                         _endp.close();
421                     }
422                 }
423                 
424                 @Override
425                 public void parsedHeader(Buffer name, Buffer value) throws IOException
426                 {
427                     if (__ACCEPT.equals(name))
428                         _accept=value.toString();
429                 }
430 
431                 @Override
432                 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
433                 {
434                     if (_error==null)
435                         _error="Bad response: "+method+" "+url+" "+version;
436                     _endp.close();
437                 }
438                 
439                 @Override
440                 public void content(Buffer ref) throws IOException
441                 {
442                     if (_error==null)
443                         _error="Bad response. "+ref.length()+"B of content?";
444                     _endp.close();
445                 }
446             });
447             
448             String path=_holder.getURI().getPath();
449             if (path==null || path.length()==0)
450                 path="/";
451             
452             String request=
453                 "GET "+path+" HTTP/1.1\r\n"+
454                 "Host: "+holder.getURI().getHost()+":"+_holder.getURI().getPort()+"\r\n"+
455                 "Upgrade: websocket\r\n"+
456                 "Connection: Upgrade\r\n"+
457                 "Sec-WebSocket-Key: "+_key+"\r\n"+
458                 "Sec-WebSocket-Origin: http://example.com\r\n"+
459                 "Sec-WebSocket-Version: "+WebSocketConnectionD10.VERSION+"\r\n";
460             
461             if (holder.getProtocol()!=null)
462                 request+="Sec-WebSocket-Protocol: "+holder.getProtocol()+"\r\n";
463                 
464             if (holder.getCookies()!=null && holder.getCookies().size()>0)
465             {
466                 for (String cookie : holder.getCookies().keySet())
467                     request+="Cookie: "+QuotedStringTokenizer.quoteIfNeeded(cookie,HttpFields.__COOKIE_DELIM)+
468                     "="+
469                     QuotedStringTokenizer.quoteIfNeeded(holder.getCookies().get(cookie),HttpFields.__COOKIE_DELIM)+
470                     "\r\n";
471             }
472 
473             request+="\r\n";
474             
475             // TODO extensions
476             
477             try
478             {
479                 Buffer handshake = new ByteArrayBuffer(request,false);
480                 int len=handshake.length();
481                 if (len!=_endp.flush(handshake))
482                     throw new IOException("incomplete");
483             }
484             catch(IOException e)
485             {
486                 holder.handshakeFailed(e);
487             }
488             
489         }
490 
491         public Connection handle() throws IOException
492         {
493             while (_endp.isOpen() && !_parser.isComplete())
494             {
495                 switch (_parser.parseAvailable())
496                 {
497                     case -1:
498                         _holder.handshakeFailed(new IOException("Incomplete handshake response"));
499                         return this;
500                     case 0:
501                         return this;
502                     default:
503                         break;    
504                 }
505             }
506             if (_error==null)
507             {
508                 if (_accept==null)
509                     _error="No Sec-WebSocket-Accept";
510                 else if (!WebSocketConnectionD10.hashKey(_key).equals(_accept))
511                     _error="Bad Sec-WebSocket-Accept";
512                 else 
513                 {
514                     Buffer header=_parser.getHeaderBuffer();
515                     WebSocketConnectionD10 connection = new WebSocketConnectionD10(_holder.getWebSocket(),_endp,_buffers,System.currentTimeMillis(),_holder.getMaxIdleTime(),_holder.getProtocol(),null,10, new WebSocketGeneratorD10.RandomMaskGen());
516 
517                     if (header.hasContent())
518                         connection.fillBuffersFrom(header);
519                     _buffers.returnBuffer(header);
520 
521                     _holder.onConnection(connection);
522                     
523                     return connection;
524                 }
525             }
526 
527             _endp.close();
528             return this;
529         }
530 
531         public boolean isIdle()
532         {
533             return false;
534         }
535 
536         public boolean isSuspended()
537         {
538             return false;
539         }
540 
541         public void closed()
542         {
543             if (_error!=null)
544                 _holder.handshakeFailed(new ProtocolException(_error));
545             else
546                 _holder.handshakeFailed(new EOFException());
547         }
548     }
549 
550     
551     /* ------------------------------------------------------------ */
552     /** The Future Websocket Connection.
553      */
554     class WebSocketFuture implements Future<WebSocket.Connection>
555     {
556         final WebSocket _websocket;;
557         final URI _uri;
558         final String _protocol;
559         final int _maxIdleTime;
560         final Map<String,String> _cookies;
561         final List<String> _extensions;
562         final CountDownLatch _done = new CountDownLatch(1);
563 
564         ByteChannel _channel;
565         WebSocketConnection _connection;
566         Throwable _exception;
567         
568         public WebSocketFuture(WebSocket websocket, URI uri, String protocol, int maxIdleTime, Map<String,String> cookies,List<String> extensions, ByteChannel channel)
569         {
570             _websocket=websocket;
571             _uri=uri;
572             _protocol=protocol;
573             _maxIdleTime=maxIdleTime;
574             _cookies=cookies;
575             _extensions=extensions;
576             _channel=channel;
577         }
578         
579         public void onConnection(WebSocketConnection connection)
580         {   
581             try
582             {
583                 synchronized (this)
584                 {
585                     if (_channel!=null)
586                         _connection=connection;
587                 }
588 
589                 if (_connection!=null)
590                 {
591                     if (_websocket instanceof WebSocket.OnFrame)
592                         ((WebSocket.OnFrame)_websocket).onHandshake((WebSocket.FrameConnection)connection.getConnection());
593 
594                     _websocket.onOpen(connection.getConnection());
595 
596                 } 
597             }
598             finally
599             {
600                 _done.countDown();
601             }
602         }
603 
604         public void handshakeFailed(Throwable ex)
605         {  
606             try
607             {
608                 ByteChannel channel=null;
609                 synchronized (this)
610                 {
611                     if (_channel!=null)
612                     {
613                         channel=_channel;
614                         _channel=null;
615                         _exception=ex;
616                     }
617                 }
618 
619                 if (channel!=null)
620                 {
621                     if (ex instanceof ProtocolException)
622                         closeChannel(channel,WebSocketConnectionD10.CLOSE_PROTOCOL,ex.getMessage());
623                     else
624                         closeChannel(channel,WebSocketConnectionD10.CLOSE_NOCLOSE,ex.getMessage());
625                 }
626             }
627             finally
628             {
629                 _done.countDown();
630             }
631         }
632 
633         public Map<String,String> getCookies()
634         {
635             return _cookies;
636         }
637 
638         public String getProtocol()
639         {
640             return _protocol;
641         }
642 
643         public WebSocket getWebSocket()
644         {
645             return _websocket;
646         }
647         
648         public URI getURI()
649         {
650             return _uri;
651         }
652         
653         public int getMaxIdleTime()
654         {
655             return _maxIdleTime;
656         }
657         
658         public String toString()
659         {
660             return "[" + _uri + ","+_websocket+"]@"+hashCode();
661         }
662 
663         public boolean cancel(boolean mayInterruptIfRunning)
664         {
665             try
666             {
667                 ByteChannel channel=null;
668                 synchronized (this)
669                 {
670                     if (_connection==null && _exception==null && _channel!=null)
671                     {
672                         channel=_channel;
673                         _channel=null;
674                     }
675                 }
676 
677                 if (channel!=null)
678                 {
679                     closeChannel(channel,WebSocketConnectionD10.CLOSE_NOCLOSE,"cancelled");
680                     return true;
681                 }
682                 return false;
683             }
684             finally
685             {
686                 _done.countDown();
687             }
688         }
689 
690         public boolean isCancelled()
691         {
692             synchronized (this)
693             {
694                 return _channel==null && _connection==null;
695             }
696         }
697 
698         public boolean isDone()
699         {
700             synchronized (this)
701             {
702                 return _connection!=null && _exception==null;
703             }
704         }
705 
706         public org.eclipse.jetty.websocket.WebSocket.Connection get() throws InterruptedException, ExecutionException
707         {
708             try
709             {
710                 return get(Long.MAX_VALUE,TimeUnit.SECONDS);
711             }
712             catch(TimeoutException e)
713             {
714                 throw new IllegalStateException("The universe has ended",e);
715             }
716         }
717 
718         public org.eclipse.jetty.websocket.WebSocket.Connection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
719                 TimeoutException
720         {
721             _done.await(timeout,unit);
722             ByteChannel channel=null;
723             org.eclipse.jetty.websocket.WebSocket.Connection connection=null;
724             Throwable exception=null;
725             synchronized (this)
726             {
727                 exception=_exception;
728                 if (_connection==null)
729                 {
730                     exception=_exception;
731                     channel=_channel;
732                     _channel=null;
733                 }
734                 else
735                     connection=_connection.getConnection();
736             }
737             
738             if (channel!=null)
739                 closeChannel(channel,WebSocketConnectionD10.CLOSE_NOCLOSE,"timeout");
740             if (exception!=null)
741                 throw new ExecutionException(exception);
742             if (connection!=null)
743                 return connection;
744             throw new TimeoutException();
745         }
746 
747         private void closeChannel(ByteChannel channel,int code, String message)
748         {
749             try
750             {
751                 _websocket.onClose(code,message);
752             }
753             catch(Exception e)
754             {
755                 __log.warn(e);
756             }
757             
758             try
759             {
760                 channel.close();
761             }
762             catch(IOException e)
763             {
764                 __log.debug(e);
765             }
766         }
767     }
768     
769 }