View Javadoc

1   /*******************************************************************************
2    * Copyright (c) 2011 Intalio, Inc.
3    * ======================================================================
4    * All rights reserved. This program and the accompanying materials
5    * are made available under the terms of the Eclipse Public License v1.0
6    * and Apache License v2.0 which accompanies this distribution.
7    *
8    *   The Eclipse Public License is available at
9    *   http://www.eclipse.org/legal/epl-v10.html
10   *
11   *   The Apache License v2.0 is available at
12   *   http://www.opensource.org/licenses/apache2.0.php
13   *
14   * You may elect to redistribute this code under either of these licenses.
15   *******************************************************************************/
16  package org.eclipse.jetty.websocket;
17  
18  import java.io.EOFException;
19  import java.io.IOException;
20  import java.net.ProtocolException;
21  import java.nio.channels.SelectionKey;
22  import java.nio.channels.SocketChannel;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Queue;
26  import java.util.Random;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import javax.net.ssl.SSLEngine;
29  
30  import org.eclipse.jetty.http.HttpFields;
31  import org.eclipse.jetty.http.HttpParser;
32  import org.eclipse.jetty.io.AbstractConnection;
33  import org.eclipse.jetty.io.AsyncEndPoint;
34  import org.eclipse.jetty.io.Buffer;
35  import org.eclipse.jetty.io.Buffers;
36  import org.eclipse.jetty.io.ByteArrayBuffer;
37  import org.eclipse.jetty.io.ConnectedEndPoint;
38  import org.eclipse.jetty.io.Connection;
39  import org.eclipse.jetty.io.EndPoint;
40  import org.eclipse.jetty.io.SimpleBuffers;
41  import org.eclipse.jetty.io.nio.AsyncConnection;
42  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
43  import org.eclipse.jetty.io.nio.SelectorManager;
44  import org.eclipse.jetty.io.nio.SslConnection;
45  import org.eclipse.jetty.util.B64Code;
46  import org.eclipse.jetty.util.QuotedStringTokenizer;
47  import org.eclipse.jetty.util.component.AggregateLifeCycle;
48  import org.eclipse.jetty.util.log.Logger;
49  import org.eclipse.jetty.util.ssl.SslContextFactory;
50  import org.eclipse.jetty.util.thread.QueuedThreadPool;
51  import org.eclipse.jetty.util.thread.ThreadPool;
52  
53  /* ------------------------------------------------------------ */
54  /**
55   * <p>WebSocketClientFactory contains the common components needed by multiple {@link WebSocketClient} instances
56   * (for example, a {@link ThreadPool}, a {@link SelectorManager NIO selector}, etc).</p>
57   * <p>WebSocketClients with different configurations should share the same factory to avoid to waste resources.</p>
58   * <p>If a ThreadPool or MaskGen is passed in the constructor, then it is not added with {@link AggregateLifeCycle#addBean(Object)},
59   * so it's lifecycle must be controlled externally.
60   *
61   * @see WebSocketClient
62   */
63  public class WebSocketClientFactory extends AggregateLifeCycle
64  {
65      private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
66      private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
67      private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<WebSocketConnection>();
68      private final SslContextFactory _sslContextFactory = new SslContextFactory();
69      private final ThreadPool _threadPool;
70      private final WebSocketClientSelector _selector;
71      private MaskGen _maskGen;
72      private WebSocketBuffers _buffers;
73  
74      /* ------------------------------------------------------------ */
75      /**
76       * <p>Creates a WebSocketClientFactory with the default configuration.</p>
77       */
78      public WebSocketClientFactory()
79      {
80          this(new QueuedThreadPool());
81      }
82  
83      /* ------------------------------------------------------------ */
84      /**
85       * <p>Creates a WebSocketClientFactory with the given ThreadPool and the default configuration.</p>
86       *
87       * @param threadPool the ThreadPool instance to use
88       */
89      public WebSocketClientFactory(ThreadPool threadPool)
90      {
91          this(threadPool, new RandomMaskGen());
92      }
93  
94      /* ------------------------------------------------------------ */
95      /**
96       * <p>Creates a WebSocketClientFactory with the given ThreadPool and the given MaskGen.</p>
97       *
98       * @param threadPool the ThreadPool instance to use
99       * @param maskGen    the MaskGen instance to use
100      */
101     public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen)
102     {
103         this(threadPool, maskGen, 8192);
104     }
105 
106     /* ------------------------------------------------------------ */
107 
108     /**
109      * <p>Creates a WebSocketClientFactory with the specified configuration.</p>
110      *
111      * @param threadPool the ThreadPool instance to use
112      * @param maskGen    the mask generator to use
113      * @param bufferSize the read buffer size
114      */
115     public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen, int bufferSize)
116     {
117         _threadPool = threadPool;
118         addBean(threadPool);
119         _buffers = new WebSocketBuffers(bufferSize);
120         addBean(_buffers);
121         _maskGen = maskGen;
122         addBean(_maskGen);
123         _selector = new WebSocketClientSelector();
124         addBean(_selector);
125         addBean(_sslContextFactory);
126     }
127 
128     /* ------------------------------------------------------------ */
129     /**
130      * @return the SslContextFactory used to configure SSL parameters
131      */
132     public SslContextFactory getSslContextFactory()
133     {
134         return _sslContextFactory;
135     }
136 
137     /* ------------------------------------------------------------ */
138     /**
139      * Get the selectorManager. Used to configure the manager.
140      *
141      * @return The {@link SelectorManager} instance.
142      */
143     public SelectorManager getSelectorManager()
144     {
145         return _selector;
146     }
147 
148     /* ------------------------------------------------------------ */
149     /**
150      * Get the ThreadPool.
151      * Used to set/query the thread pool configuration.
152      *
153      * @return The {@link ThreadPool}
154      */
155     public ThreadPool getThreadPool()
156     {
157         return _threadPool;
158     }
159 
160     /* ------------------------------------------------------------ */
161     /**
162      * @return the shared mask generator, or null if no shared mask generator is used
163      * @see WebSocketClient#getMaskGen()
164      */
165     public MaskGen getMaskGen()
166     {
167         return _maskGen;
168     }
169 
170     /* ------------------------------------------------------------ */
171     /**
172      * @param maskGen the shared mask generator, or null if no shared mask generator is used
173      * @see WebSocketClient#setMaskGen(MaskGen)
174      */
175     public void setMaskGen(MaskGen maskGen)
176     {
177         if (isRunning())
178             throw new IllegalStateException(getState());
179         removeBean(_maskGen);
180         _maskGen = maskGen;
181         addBean(maskGen);
182     }
183 
184     /* ------------------------------------------------------------ */
185     /**
186      * @param bufferSize the read buffer size
187      * @see #getBufferSize()
188      */
189     public void setBufferSize(int bufferSize)
190     {
191         if (isRunning())
192             throw new IllegalStateException(getState());
193         removeBean(_buffers);
194         _buffers = new WebSocketBuffers(bufferSize);
195         addBean(_buffers);
196     }
197 
198     /* ------------------------------------------------------------ */
199     /**
200      * @return the read buffer size
201      */
202     public int getBufferSize()
203     {
204         return _buffers.getBufferSize();
205     }
206 
207     @Override
208     protected void doStop() throws Exception
209     {
210         closeConnections();
211     }
212 
213     /* ------------------------------------------------------------ */
214     /**
215      * <p>Creates and returns a new instance of a {@link WebSocketClient}, configured with this
216      * WebSocketClientFactory instance.</p>
217      *
218      * @return a new {@link WebSocketClient} instance
219      */
220     public WebSocketClient newWebSocketClient()
221     {
222         return new WebSocketClient(this);
223     }
224 
225     protected SSLEngine newSslEngine(SocketChannel channel) throws IOException
226     {
227         SSLEngine sslEngine;
228         if (channel != null)
229         {
230             String peerHost = channel.socket().getInetAddress().getHostAddress();
231             int peerPort = channel.socket().getPort();
232             sslEngine = _sslContextFactory.newSslEngine(peerHost, peerPort);
233         }
234         else
235         {
236             sslEngine = _sslContextFactory.newSslEngine();
237         }
238         sslEngine.setUseClientMode(true);
239         sslEngine.beginHandshake();
240 
241         return sslEngine;
242     }
243 
244     protected boolean addConnection(WebSocketConnection connection)
245     {
246         return isRunning() && connections.add(connection);
247     }
248 
249     protected boolean removeConnection(WebSocketConnection connection)
250     {
251         return connections.remove(connection);
252     }
253 
254     protected void closeConnections()
255     {
256         for (WebSocketConnection connection : connections)
257             connection.shutdown();
258     }
259 
260     /* ------------------------------------------------------------ */
261     /**
262      * WebSocket Client Selector Manager
263      */
264     class WebSocketClientSelector extends SelectorManager
265     {
266         @Override
267         public boolean dispatch(Runnable task)
268         {
269             return _threadPool.dispatch(task);
270         }
271 
272         @Override
273         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey key) throws IOException
274         {
275             WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)key.attachment();
276             int maxIdleTime = holder.getMaxIdleTime();
277             if (maxIdleTime < 0)
278                 maxIdleTime = (int)getMaxIdleTime();
279             SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTime);
280             AsyncEndPoint endPoint = result;
281 
282             // Detect if it is SSL, and wrap the connection if so
283             if ("wss".equals(holder.getURI().getScheme()))
284             {
285                 SSLEngine sslEngine = newSslEngine(channel);
286                 SslConnection sslConnection = new SslConnection(sslEngine, endPoint);
287                 endPoint.setConnection(sslConnection);
288                 endPoint = sslConnection.getSslEndPoint();
289             }
290 
291             AsyncConnection connection = selectSet.getManager().newConnection(channel, endPoint, holder);
292             endPoint.setConnection(connection);
293 
294             return result;
295         }
296 
297         @Override
298         public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
299         {
300             WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)attachment;
301             return new HandshakeConnection(endpoint, holder);
302         }
303 
304         @Override
305         protected void endPointOpened(SelectChannelEndPoint endpoint)
306         {
307             // TODO expose on outer class ??
308         }
309 
310         @Override
311         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
312         {
313             LOG.debug("upgrade {} -> {}", oldConnection, endpoint.getConnection());
314         }
315 
316         @Override
317         protected void endPointClosed(SelectChannelEndPoint endpoint)
318         {
319             endpoint.getConnection().onClose();
320         }
321 
322         @Override
323         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
324         {
325             if (!(attachment instanceof WebSocketClient.WebSocketFuture))
326                 super.connectionFailed(channel, ex, attachment);
327             else
328             {
329                 __log.debug(ex);
330                 WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;
331 
332                 future.handshakeFailed(ex);
333             }
334         }
335     }
336 
337     /* ------------------------------------------------------------ */
338     /**
339      * Handshake Connection.
340      * Handles the connection until the handshake succeeds or fails.
341      */
342     class HandshakeConnection extends AbstractConnection implements AsyncConnection
343     {
344         private final AsyncEndPoint _endp;
345         private final WebSocketClient.WebSocketFuture _future;
346         private final String _key;
347         private final HttpParser _parser;
348         private String _accept;
349         private String _error;
350         private boolean _handshaken;
351 
352         public HandshakeConnection(AsyncEndPoint endpoint, WebSocketClient.WebSocketFuture future)
353         {
354             super(endpoint, System.currentTimeMillis());
355             _endp = endpoint;
356             _future = future;
357 
358             byte[] bytes = new byte[16];
359             new Random().nextBytes(bytes);
360             _key = new String(B64Code.encode(bytes));
361 
362             Buffers buffers = new SimpleBuffers(_buffers.getBuffer(), null);
363             _parser = new HttpParser(buffers, _endp, new HttpParser.EventHandler()
364             {
365                 @Override
366                 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
367                 {
368                     if (status != 101)
369                     {
370                         _error = "Bad response status " + status + " " + reason;
371                         _endp.close();
372                     }
373                 }
374 
375                 @Override
376                 public void parsedHeader(Buffer name, Buffer value) throws IOException
377                 {
378                     if (__ACCEPT.equals(name))
379                         _accept = value.toString();
380                 }
381 
382                 @Override
383                 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
384                 {
385                     if (_error == null)
386                         _error = "Bad response: " + method + " " + url + " " + version;
387                     _endp.close();
388                 }
389 
390                 @Override
391                 public void content(Buffer ref) throws IOException
392                 {
393                     if (_error == null)
394                         _error = "Bad response. " + ref.length() + "B of content?";
395                     _endp.close();
396                 }
397             });
398         }
399 
400         private void handshake()
401         {
402             String path = _future.getURI().getPath();
403             if (path == null || path.length() == 0)
404                 path = "/";
405 
406             if (_future.getURI().getRawQuery() != null)
407                 path += "?" + _future.getURI().getRawQuery();
408 
409             String origin = _future.getOrigin();
410 
411             StringBuilder request = new StringBuilder(512);
412             request.append("GET ").append(path).append(" HTTP/1.1\r\n")
413                     .append("Host: ").append(_future.getURI().getHost()).append(":")
414                     .append(_future.getURI().getPort()).append("\r\n")
415                     .append("Upgrade: websocket\r\n")
416                     .append("Connection: Upgrade\r\n")
417                     .append("Sec-WebSocket-Key: ")
418                     .append(_key).append("\r\n");
419 
420             if (origin != null)
421                 request.append("Origin: ").append(origin).append("\r\n");
422 
423             request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionRFC6455.VERSION).append("\r\n");
424 
425             if (_future.getProtocol() != null)
426                 request.append("Sec-WebSocket-Protocol: ").append(_future.getProtocol()).append("\r\n");
427 
428             Map<String, String> cookies = _future.getCookies();
429             if (cookies != null && cookies.size() > 0)
430             {
431                 for (String cookie : cookies.keySet())
432                     request.append("Cookie: ")
433                             .append(QuotedStringTokenizer.quoteIfNeeded(cookie, HttpFields.__COOKIE_DELIM))
434                             .append("=")
435                             .append(QuotedStringTokenizer.quoteIfNeeded(cookies.get(cookie), HttpFields.__COOKIE_DELIM))
436                             .append("\r\n");
437             }
438 
439             request.append("\r\n");
440 
441             // TODO extensions
442 
443             try
444             {
445                 Buffer handshake = new ByteArrayBuffer(request.toString(), false);
446                 int len = handshake.length();
447                 if (len != _endp.flush(handshake))
448                     throw new IOException("incomplete");
449             }
450             catch (IOException e)
451             {
452                 _future.handshakeFailed(e);
453             }
454             finally
455             {
456                 _handshaken = true;
457             }
458         }
459 
460         public Connection handle() throws IOException
461         {
462             while (_endp.isOpen() && !_parser.isComplete())
463             {
464                 if (!_handshaken)
465                     handshake();
466 
467                 if (!_parser.parseAvailable())
468                 {
469                     if (_endp.isInputShutdown())
470                         _future.handshakeFailed(new IOException("Incomplete handshake response"));
471                     return this;
472                 }
473             }
474             if (_error == null)
475             {
476                 if (_accept == null)
477                 {
478                     _error = "No Sec-WebSocket-Accept";
479                 }
480                 else if (!WebSocketConnectionRFC6455.hashKey(_key).equals(_accept))
481                 {
482                     _error = "Bad Sec-WebSocket-Accept";
483                 }
484                 else
485                 {
486                     WebSocketConnection connection = newWebSocketConnection();
487 
488                     Buffer header = _parser.getHeaderBuffer();
489                     if (header.hasContent())
490                         connection.fillBuffersFrom(header);
491                     _buffers.returnBuffer(header);
492 
493                     _future.onConnection(connection);
494 
495                     return connection;
496                 }
497             }
498 
499             _endp.close();
500             return this;
501         }
502 
503         private WebSocketConnection newWebSocketConnection() throws IOException
504         {
505             return new WebSocketClientConnection(
506                     _future._client.getFactory(),
507                     _future.getWebSocket(),
508                     _endp,
509                     _buffers,
510                     System.currentTimeMillis(),
511                     _future.getMaxIdleTime(),
512                     _future.getProtocol(),
513                     null,
514                     WebSocketConnectionRFC6455.VERSION,
515                     _future.getMaskGen());
516         }
517 
518         public void onInputShutdown() throws IOException
519         {
520             _endp.close();
521         }
522 
523         public boolean isIdle()
524         {
525             return false;
526         }
527 
528         public boolean isSuspended()
529         {
530             return false;
531         }
532 
533         public void onClose()
534         {
535             if (_error != null)
536                 _future.handshakeFailed(new ProtocolException(_error));
537             else
538                 _future.handshakeFailed(new EOFException());
539         }
540     }
541 
542     private static class WebSocketClientConnection extends WebSocketConnectionRFC6455
543     {
544         private final WebSocketClientFactory factory;
545 
546         public WebSocketClientConnection(WebSocketClientFactory factory, WebSocket webSocket, EndPoint endPoint, WebSocketBuffers buffers, long timeStamp, int maxIdleTime, String protocol, List<Extension> extensions, int draftVersion, MaskGen maskGen) throws IOException
547         {
548             super(webSocket, endPoint, buffers, timeStamp, maxIdleTime, protocol, extensions, draftVersion, maskGen);
549             this.factory = factory;
550         }
551 
552         @Override
553         public void onClose()
554         {
555             super.onClose();
556             factory.removeConnection(this);
557         }
558     }
559 }