View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.net.ProtocolException;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.SocketChannel;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Queue;
29  import java.util.Random;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  
32  import javax.net.ssl.SSLEngine;
33  
34  import org.eclipse.jetty.http.HttpFields;
35  import org.eclipse.jetty.http.HttpParser;
36  import org.eclipse.jetty.io.AbstractConnection;
37  import org.eclipse.jetty.io.AsyncEndPoint;
38  import org.eclipse.jetty.io.Buffer;
39  import org.eclipse.jetty.io.Buffers;
40  import org.eclipse.jetty.io.ByteArrayBuffer;
41  import org.eclipse.jetty.io.ConnectedEndPoint;
42  import org.eclipse.jetty.io.Connection;
43  import org.eclipse.jetty.io.EndPoint;
44  import org.eclipse.jetty.io.SimpleBuffers;
45  import org.eclipse.jetty.io.nio.AsyncConnection;
46  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
47  import org.eclipse.jetty.io.nio.SelectorManager;
48  import org.eclipse.jetty.io.nio.SslConnection;
49  import org.eclipse.jetty.util.B64Code;
50  import org.eclipse.jetty.util.QuotedStringTokenizer;
51  import org.eclipse.jetty.util.component.AggregateLifeCycle;
52  import org.eclipse.jetty.util.log.Logger;
53  import org.eclipse.jetty.util.ssl.SslContextFactory;
54  import org.eclipse.jetty.util.thread.QueuedThreadPool;
55  import org.eclipse.jetty.util.thread.ThreadPool;
56  
57  /* ------------------------------------------------------------ */
58  /**
59   * <p>WebSocketClientFactory contains the common components needed by multiple {@link WebSocketClient} instances
60   * (for example, a {@link ThreadPool}, a {@link SelectorManager NIO selector}, etc).</p>
61   * <p>WebSocketClients with different configurations should share the same factory to avoid to waste resources.</p>
62   * <p>If a ThreadPool or MaskGen is passed in the constructor, then it is not added with {@link AggregateLifeCycle#addBean(Object)},
63   * so it's lifecycle must be controlled externally.
64   *
65   * @see WebSocketClient
66   */
67  public class WebSocketClientFactory extends AggregateLifeCycle
68  {
69      private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
70      private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
71      private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<WebSocketConnection>();
72      private final SslContextFactory _sslContextFactory = new SslContextFactory();
73      private final ThreadPool _threadPool;
74      private final WebSocketClientSelector _selector;
75      private MaskGen _maskGen;
76      private WebSocketBuffers _buffers;
77  
78      /* ------------------------------------------------------------ */
79      /**
80       * <p>Creates a WebSocketClientFactory with the default configuration.</p>
81       */
82      public WebSocketClientFactory()
83      {
84          this(null);
85      }
86  
87      /* ------------------------------------------------------------ */
88      /**
89       * <p>Creates a WebSocketClientFactory with the given ThreadPool and the default configuration.</p>
90       *
91       * @param threadPool the ThreadPool instance to use
92       */
93      public WebSocketClientFactory(ThreadPool threadPool)
94      {
95          this(threadPool, new RandomMaskGen());
96      }
97  
98      /* ------------------------------------------------------------ */
99      /**
100      * <p>Creates a WebSocketClientFactory with the given ThreadPool and the given MaskGen.</p>
101      *
102      * @param threadPool the ThreadPool instance to use
103      * @param maskGen    the MaskGen instance to use
104      */
105     public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen)
106     {
107         this(threadPool, maskGen, 8192);
108     }
109 
110     /* ------------------------------------------------------------ */
111 
112     /**
113      * <p>Creates a WebSocketClientFactory with the specified configuration.</p>
114      *
115      * @param threadPool the ThreadPool instance to use
116      * @param maskGen    the mask generator to use
117      * @param bufferSize the read buffer size
118      */
119     public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen, int bufferSize)
120     {
121         if (threadPool == null)
122             threadPool = new QueuedThreadPool();
123         _threadPool = threadPool;
124         addBean(_threadPool);
125 
126         _buffers = new WebSocketBuffers(bufferSize);
127         addBean(_buffers);
128 
129         _maskGen = maskGen;
130         addBean(_maskGen);
131 
132         _selector = new WebSocketClientSelector();
133         addBean(_selector);
134 
135         addBean(_sslContextFactory);
136     }
137 
138     /* ------------------------------------------------------------ */
139     /**
140      * @return the SslContextFactory used to configure SSL parameters
141      */
142     public SslContextFactory getSslContextFactory()
143     {
144         return _sslContextFactory;
145     }
146 
147     /* ------------------------------------------------------------ */
148     /**
149      * Get the selectorManager. Used to configure the manager.
150      *
151      * @return The {@link SelectorManager} instance.
152      */
153     public SelectorManager getSelectorManager()
154     {
155         return _selector;
156     }
157 
158     /* ------------------------------------------------------------ */
159     /**
160      * Get the ThreadPool.
161      * Used to set/query the thread pool configuration.
162      *
163      * @return The {@link ThreadPool}
164      */
165     public ThreadPool getThreadPool()
166     {
167         return _threadPool;
168     }
169 
170     /* ------------------------------------------------------------ */
171     /**
172      * @return the shared mask generator, or null if no shared mask generator is used
173      * @see WebSocketClient#getMaskGen()
174      */
175     public MaskGen getMaskGen()
176     {
177         return _maskGen;
178     }
179 
180     /* ------------------------------------------------------------ */
181     /**
182      * @param maskGen the shared mask generator, or null if no shared mask generator is used
183      * @see WebSocketClient#setMaskGen(MaskGen)
184      */
185     public void setMaskGen(MaskGen maskGen)
186     {
187         if (isRunning())
188             throw new IllegalStateException(getState());
189         removeBean(_maskGen);
190         _maskGen = maskGen;
191         addBean(maskGen);
192     }
193 
194     /* ------------------------------------------------------------ */
195     /**
196      * @param bufferSize the read buffer size
197      * @see #getBufferSize()
198      */
199     public void setBufferSize(int bufferSize)
200     {
201         if (isRunning())
202             throw new IllegalStateException(getState());
203         removeBean(_buffers);
204         _buffers = new WebSocketBuffers(bufferSize);
205         addBean(_buffers);
206     }
207 
208     /* ------------------------------------------------------------ */
209     /**
210      * @return the read buffer size
211      */
212     public int getBufferSize()
213     {
214         return _buffers.getBufferSize();
215     }
216 
217     @Override
218     protected void doStop() throws Exception
219     {
220         closeConnections();
221         super.doStop();
222     }
223 
224     /* ------------------------------------------------------------ */
225     /**
226      * <p>Creates and returns a new instance of a {@link WebSocketClient}, configured with this
227      * WebSocketClientFactory instance.</p>
228      *
229      * @return a new {@link WebSocketClient} instance
230      */
231     public WebSocketClient newWebSocketClient()
232     {
233         return new WebSocketClient(this);
234     }
235 
236     protected SSLEngine newSslEngine(SocketChannel channel) throws IOException
237     {
238         SSLEngine sslEngine;
239         if (channel != null)
240         {
241             String peerHost = channel.socket().getInetAddress().getHostAddress();
242             int peerPort = channel.socket().getPort();
243             sslEngine = _sslContextFactory.newSslEngine(peerHost, peerPort);
244         }
245         else
246         {
247             sslEngine = _sslContextFactory.newSslEngine();
248         }
249         sslEngine.setUseClientMode(true);
250         sslEngine.beginHandshake();
251 
252         return sslEngine;
253     }
254 
255     protected boolean addConnection(WebSocketConnection connection)
256     {
257         return isRunning() && connections.add(connection);
258     }
259 
260     protected boolean removeConnection(WebSocketConnection connection)
261     {
262         return connections.remove(connection);
263     }
264 
265     protected void closeConnections()
266     {
267         for (WebSocketConnection connection : connections)
268             connection.shutdown();
269     }
270 
271     /* ------------------------------------------------------------ */
272     /**
273      * WebSocket Client Selector Manager
274      */
275     class WebSocketClientSelector extends SelectorManager
276     {
277         @Override
278         public boolean dispatch(Runnable task)
279         {
280             return _threadPool.dispatch(task);
281         }
282 
283         @Override
284         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey key) throws IOException
285         {
286             WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)key.attachment();
287             int maxIdleTime = holder.getMaxIdleTime();
288             if (maxIdleTime < 0)
289                 maxIdleTime = (int)getMaxIdleTime();
290             SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTime);
291             AsyncEndPoint endPoint = result;
292 
293             // Detect if it is SSL, and wrap the connection if so
294             if ("wss".equals(holder.getURI().getScheme()))
295             {
296                 SSLEngine sslEngine = newSslEngine(channel);
297                 SslConnection sslConnection = new SslConnection(sslEngine, endPoint);
298                 endPoint.setConnection(sslConnection);
299                 endPoint = sslConnection.getSslEndPoint();
300             }
301 
302             AsyncConnection connection = selectSet.getManager().newConnection(channel, endPoint, holder);
303             endPoint.setConnection(connection);
304 
305             return result;
306         }
307 
308         @Override
309         public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
310         {
311             WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)attachment;
312             return new HandshakeConnection(endpoint, holder);
313         }
314 
315         @Override
316         protected void endPointOpened(SelectChannelEndPoint endpoint)
317         {
318             // TODO expose on outer class ??
319         }
320 
321         @Override
322         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
323         {
324             LOG.debug("upgrade {} -> {}", oldConnection, endpoint.getConnection());
325         }
326 
327         @Override
328         protected void endPointClosed(SelectChannelEndPoint endpoint)
329         {
330             endpoint.getConnection().onClose();
331         }
332 
333         @Override
334         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
335         {
336             if (!(attachment instanceof WebSocketClient.WebSocketFuture))
337                 super.connectionFailed(channel, ex, attachment);
338             else
339             {
340                 __log.debug(ex);
341                 WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;
342 
343                 future.handshakeFailed(ex);
344             }
345         }
346     }
347 
348     /* ------------------------------------------------------------ */
349     /**
350      * Handshake Connection.
351      * Handles the connection until the handshake succeeds or fails.
352      */
353     class HandshakeConnection extends AbstractConnection implements AsyncConnection
354     {
355         private final AsyncEndPoint _endp;
356         private final WebSocketClient.WebSocketFuture _future;
357         private final String _key;
358         private final HttpParser _parser;
359         private String _accept;
360         private String _error;
361         private ByteArrayBuffer _handshake;
362 
363         public HandshakeConnection(AsyncEndPoint endpoint, WebSocketClient.WebSocketFuture future)
364         {
365             super(endpoint, System.currentTimeMillis());
366             _endp = endpoint;
367             _future = future;
368 
369             byte[] bytes = new byte[16];
370             new Random().nextBytes(bytes);
371             _key = new String(B64Code.encode(bytes));
372 
373             Buffers buffers = new SimpleBuffers(_buffers.getBuffer(), null);
374             _parser = new HttpParser(buffers, _endp, new HttpParser.EventHandler()
375             {
376                 @Override
377                 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
378                 {
379                     if (status != 101)
380                     {
381                         _error = "Bad response status " + status + " " + reason;
382                         _endp.close();
383                     }
384                 }
385 
386                 @Override
387                 public void parsedHeader(Buffer name, Buffer value) throws IOException
388                 {
389                     if (__ACCEPT.equals(name))
390                         _accept = value.toString();
391                 }
392 
393                 @Override // TODO simone says shouldn't be needed
394                 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
395                 {
396                     if (_error == null)
397                         _error = "Bad response: " + method + " " + url + " " + version;
398                     _endp.close();
399                 }
400 
401                 @Override // TODO simone says shouldn't be needed
402                 public void content(Buffer ref) throws IOException
403                 {
404                     if (_error == null)
405                         _error = "Bad response. " + ref.length() + "B of content?";
406                     _endp.close();
407                 }
408             });
409         }
410 
411         private boolean handshake()
412         {
413             if (_handshake==null)
414             {
415                 String path = _future.getURI().getPath();
416                 if (path == null || path.length() == 0)
417                     path = "/";
418 
419                 if (_future.getURI().getRawQuery() != null)
420                     path += "?" + _future.getURI().getRawQuery();
421 
422                 String origin = _future.getOrigin();
423 
424                 StringBuilder request = new StringBuilder(512);
425                 request.append("GET ").append(path).append(" HTTP/1.1\r\n")
426                 .append("Host: ").append(_future.getURI().getHost()).append(":")
427                 .append(_future.getURI().getPort()).append("\r\n")
428                 .append("Upgrade: websocket\r\n")
429                 .append("Connection: Upgrade\r\n")
430                 .append("Sec-WebSocket-Key: ")
431                 .append(_key).append("\r\n");
432 
433                 if (origin != null)
434                     request.append("Origin: ").append(origin).append("\r\n");
435 
436                 request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionRFC6455.VERSION).append("\r\n");
437 
438                 if (_future.getProtocol() != null)
439                     request.append("Sec-WebSocket-Protocol: ").append(_future.getProtocol()).append("\r\n");
440 
441                 Map<String, String> cookies = _future.getCookies();
442                 if (cookies != null && cookies.size() > 0)
443                 {
444                     for (String cookie : cookies.keySet())
445                         request.append("Cookie: ")
446                         .append(QuotedStringTokenizer.quoteIfNeeded(cookie, HttpFields.__COOKIE_DELIM))
447                         .append("=")
448                         .append(QuotedStringTokenizer.quoteIfNeeded(cookies.get(cookie), HttpFields.__COOKIE_DELIM))
449                         .append("\r\n");
450                 }
451 
452                 request.append("\r\n");
453 
454                 _handshake=new ByteArrayBuffer(request.toString(), false);
455             }
456             
457             // TODO extensions
458 
459             try
460             {
461                 int len = _handshake.length();
462                 int flushed = _endp.flush(_handshake);
463                 if (flushed<0)
464                     throw new IOException("incomplete handshake");
465             }
466             catch (IOException e)
467             {
468                 _future.handshakeFailed(e);
469             }
470             return _handshake.length()==0;
471         }
472 
473         public Connection handle() throws IOException
474         {
475             while (_endp.isOpen() && !_parser.isComplete())
476             {
477                 if (_handshake==null || _handshake.length()>0)
478                     if (!handshake())
479                         return this;
480 
481                 if (!_parser.parseAvailable())
482                 {
483                     if (_endp.isInputShutdown())
484                         _future.handshakeFailed(new IOException("Incomplete handshake response"));
485                     return this;
486                 }
487             }
488             if (_error == null)
489             {
490                 if (_accept == null)
491                 {
492                     _error = "No Sec-WebSocket-Accept";
493                 }
494                 else if (!WebSocketConnectionRFC6455.hashKey(_key).equals(_accept))
495                 {
496                     _error = "Bad Sec-WebSocket-Accept";
497                 }
498                 else
499                 {
500                     WebSocketConnection connection = newWebSocketConnection();
501 
502                     Buffer header = _parser.getHeaderBuffer();
503                     if (header.hasContent())
504                         connection.fillBuffersFrom(header);
505                     _buffers.returnBuffer(header);
506 
507                     _future.onConnection(connection);
508 
509                     return connection;
510                 }
511             }
512 
513             _endp.close();
514             return this;
515         }
516 
517         private WebSocketConnection newWebSocketConnection() throws IOException
518         {
519             __log.debug("newWebSocketConnection()");
520             return new WebSocketClientConnection(
521                     _future._client.getFactory(),
522                     _future.getWebSocket(),
523                     _endp,
524                     _buffers,
525                     System.currentTimeMillis(),
526                     _future.getMaxIdleTime(),
527                     _future.getProtocol(),
528                     null,
529                     WebSocketConnectionRFC6455.VERSION,
530                     _future.getMaskGen());
531         }
532 
533         public void onInputShutdown() throws IOException
534         {
535             _endp.close();
536         }
537 
538         public boolean isIdle()
539         {
540             return false;
541         }
542 
543         public boolean isSuspended()
544         {
545             return false;
546         }
547 
548         public void onClose()
549         {
550             if (_error != null)
551                 _future.handshakeFailed(new ProtocolException(_error));
552             else
553                 _future.handshakeFailed(new EOFException());
554         }
555     }
556 
557     private static class WebSocketClientConnection extends WebSocketConnectionRFC6455
558     {
559         private final WebSocketClientFactory factory;
560 
561         public WebSocketClientConnection(WebSocketClientFactory factory, WebSocket webSocket, EndPoint endPoint, WebSocketBuffers buffers, long timeStamp, int maxIdleTime, String protocol, List<Extension> extensions, int draftVersion, MaskGen maskGen) throws IOException
562         {
563             super(webSocket, endPoint, buffers, timeStamp, maxIdleTime, protocol, extensions, draftVersion, maskGen);
564             this.factory = factory;
565         }
566 
567         @Override
568         public void onClose()
569         {
570             super.onClose();
571             factory.removeConnection(this);
572         }
573     }
574 }