View Javadoc

1   /*******************************************************************************
2    * Copyright (c) 2011 Intalio, Inc.
3    * ======================================================================
4    * All rights reserved. This program and the accompanying materials
5    * are made available under the terms of the Eclipse Public License v1.0
6    * and Apache License v2.0 which accompanies this distribution.
7    *
8    *   The Eclipse Public License is available at
9    *   http://www.eclipse.org/legal/epl-v10.html
10   *
11   *   The Apache License v2.0 is available at
12   *   http://www.opensource.org/licenses/apache2.0.php
13   *
14   * You may elect to redistribute this code under either of these licenses.
15   *******************************************************************************/
16  package org.eclipse.jetty.websocket;
17  
18  import java.io.EOFException;
19  import java.io.IOException;
20  import java.net.ProtocolException;
21  import java.nio.channels.SelectionKey;
22  import java.nio.channels.SocketChannel;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Queue;
26  import java.util.Random;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import javax.net.ssl.SSLEngine;
29  
30  import org.eclipse.jetty.http.HttpFields;
31  import org.eclipse.jetty.http.HttpParser;
32  import org.eclipse.jetty.io.AbstractConnection;
33  import org.eclipse.jetty.io.AsyncEndPoint;
34  import org.eclipse.jetty.io.Buffer;
35  import org.eclipse.jetty.io.Buffers;
36  import org.eclipse.jetty.io.ByteArrayBuffer;
37  import org.eclipse.jetty.io.ConnectedEndPoint;
38  import org.eclipse.jetty.io.Connection;
39  import org.eclipse.jetty.io.EndPoint;
40  import org.eclipse.jetty.io.SimpleBuffers;
41  import org.eclipse.jetty.io.nio.AsyncConnection;
42  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
43  import org.eclipse.jetty.io.nio.SelectorManager;
44  import org.eclipse.jetty.io.nio.SslConnection;
45  import org.eclipse.jetty.util.B64Code;
46  import org.eclipse.jetty.util.QuotedStringTokenizer;
47  import org.eclipse.jetty.util.component.AggregateLifeCycle;
48  import org.eclipse.jetty.util.log.Logger;
49  import org.eclipse.jetty.util.ssl.SslContextFactory;
50  import org.eclipse.jetty.util.thread.QueuedThreadPool;
51  import org.eclipse.jetty.util.thread.ThreadPool;
52  
53  /* ------------------------------------------------------------ */
54  /**
55   * <p>WebSocketClientFactory contains the common components needed by multiple {@link WebSocketClient} instances
56   * (for example, a {@link ThreadPool}, a {@link SelectorManager NIO selector}, etc).</p>
57   * <p>WebSocketClients with different configurations should share the same factory to avoid to waste resources.</p>
58   * <p>If a ThreadPool or MaskGen is passed in the constructor, then it is not added with {@link AggregateLifeCycle#addBean(Object)},
59   * so it's lifecycle must be controlled externally.
60   *
61   * @see WebSocketClient
62   */
63  public class WebSocketClientFactory extends AggregateLifeCycle
64  {
65      private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
66      private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
67      private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<WebSocketConnection>();
68      private final SslContextFactory _sslContextFactory = new SslContextFactory();
69      private final ThreadPool _threadPool;
70      private final WebSocketClientSelector _selector;
71      private MaskGen _maskGen;
72      private WebSocketBuffers _buffers;
73  
74      /* ------------------------------------------------------------ */
75      /**
76       * <p>Creates a WebSocketClientFactory with the default configuration.</p>
77       */
78      public WebSocketClientFactory()
79      {
80          this(null);
81      }
82  
83      /* ------------------------------------------------------------ */
84      /**
85       * <p>Creates a WebSocketClientFactory with the given ThreadPool and the default configuration.</p>
86       *
87       * @param threadPool the ThreadPool instance to use
88       */
89      public WebSocketClientFactory(ThreadPool threadPool)
90      {
91          this(threadPool, new RandomMaskGen());
92      }
93  
94      /* ------------------------------------------------------------ */
95      /**
96       * <p>Creates a WebSocketClientFactory with the given ThreadPool and the given MaskGen.</p>
97       *
98       * @param threadPool the ThreadPool instance to use
99       * @param maskGen    the MaskGen instance to use
100      */
101     public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen)
102     {
103         this(threadPool, maskGen, 8192);
104     }
105 
106     /* ------------------------------------------------------------ */
107 
108     /**
109      * <p>Creates a WebSocketClientFactory with the specified configuration.</p>
110      *
111      * @param threadPool the ThreadPool instance to use
112      * @param maskGen    the mask generator to use
113      * @param bufferSize the read buffer size
114      */
115     public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen, int bufferSize)
116     {
117         if (threadPool == null)
118             threadPool = new QueuedThreadPool();
119         _threadPool = threadPool;
120         addBean(_threadPool);
121 
122         _buffers = new WebSocketBuffers(bufferSize);
123         addBean(_buffers);
124 
125         _maskGen = maskGen;
126         addBean(_maskGen);
127 
128         _selector = new WebSocketClientSelector();
129         addBean(_selector);
130 
131         addBean(_sslContextFactory);
132     }
133 
134     /* ------------------------------------------------------------ */
135     /**
136      * @return the SslContextFactory used to configure SSL parameters
137      */
138     public SslContextFactory getSslContextFactory()
139     {
140         return _sslContextFactory;
141     }
142 
143     /* ------------------------------------------------------------ */
144     /**
145      * Get the selectorManager. Used to configure the manager.
146      *
147      * @return The {@link SelectorManager} instance.
148      */
149     public SelectorManager getSelectorManager()
150     {
151         return _selector;
152     }
153 
154     /* ------------------------------------------------------------ */
155     /**
156      * Get the ThreadPool.
157      * Used to set/query the thread pool configuration.
158      *
159      * @return The {@link ThreadPool}
160      */
161     public ThreadPool getThreadPool()
162     {
163         return _threadPool;
164     }
165 
166     /* ------------------------------------------------------------ */
167     /**
168      * @return the shared mask generator, or null if no shared mask generator is used
169      * @see WebSocketClient#getMaskGen()
170      */
171     public MaskGen getMaskGen()
172     {
173         return _maskGen;
174     }
175 
176     /* ------------------------------------------------------------ */
177     /**
178      * @param maskGen the shared mask generator, or null if no shared mask generator is used
179      * @see WebSocketClient#setMaskGen(MaskGen)
180      */
181     public void setMaskGen(MaskGen maskGen)
182     {
183         if (isRunning())
184             throw new IllegalStateException(getState());
185         removeBean(_maskGen);
186         _maskGen = maskGen;
187         addBean(maskGen);
188     }
189 
190     /* ------------------------------------------------------------ */
191     /**
192      * @param bufferSize the read buffer size
193      * @see #getBufferSize()
194      */
195     public void setBufferSize(int bufferSize)
196     {
197         if (isRunning())
198             throw new IllegalStateException(getState());
199         removeBean(_buffers);
200         _buffers = new WebSocketBuffers(bufferSize);
201         addBean(_buffers);
202     }
203 
204     /* ------------------------------------------------------------ */
205     /**
206      * @return the read buffer size
207      */
208     public int getBufferSize()
209     {
210         return _buffers.getBufferSize();
211     }
212 
213     @Override
214     protected void doStop() throws Exception
215     {
216         closeConnections();
217         super.doStop();
218     }
219 
220     /* ------------------------------------------------------------ */
221     /**
222      * <p>Creates and returns a new instance of a {@link WebSocketClient}, configured with this
223      * WebSocketClientFactory instance.</p>
224      *
225      * @return a new {@link WebSocketClient} instance
226      */
227     public WebSocketClient newWebSocketClient()
228     {
229         return new WebSocketClient(this);
230     }
231 
232     protected SSLEngine newSslEngine(SocketChannel channel) throws IOException
233     {
234         SSLEngine sslEngine;
235         if (channel != null)
236         {
237             String peerHost = channel.socket().getInetAddress().getHostAddress();
238             int peerPort = channel.socket().getPort();
239             sslEngine = _sslContextFactory.newSslEngine(peerHost, peerPort);
240         }
241         else
242         {
243             sslEngine = _sslContextFactory.newSslEngine();
244         }
245         sslEngine.setUseClientMode(true);
246         sslEngine.beginHandshake();
247 
248         return sslEngine;
249     }
250 
251     protected boolean addConnection(WebSocketConnection connection)
252     {
253         return isRunning() && connections.add(connection);
254     }
255 
256     protected boolean removeConnection(WebSocketConnection connection)
257     {
258         return connections.remove(connection);
259     }
260 
261     protected void closeConnections()
262     {
263         for (WebSocketConnection connection : connections)
264             connection.shutdown();
265     }
266 
267     /* ------------------------------------------------------------ */
268     /**
269      * WebSocket Client Selector Manager
270      */
271     class WebSocketClientSelector extends SelectorManager
272     {
273         @Override
274         public boolean dispatch(Runnable task)
275         {
276             return _threadPool.dispatch(task);
277         }
278 
279         @Override
280         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey key) throws IOException
281         {
282             WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)key.attachment();
283             int maxIdleTime = holder.getMaxIdleTime();
284             if (maxIdleTime < 0)
285                 maxIdleTime = (int)getMaxIdleTime();
286             SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTime);
287             AsyncEndPoint endPoint = result;
288 
289             // Detect if it is SSL, and wrap the connection if so
290             if ("wss".equals(holder.getURI().getScheme()))
291             {
292                 SSLEngine sslEngine = newSslEngine(channel);
293                 SslConnection sslConnection = new SslConnection(sslEngine, endPoint);
294                 endPoint.setConnection(sslConnection);
295                 endPoint = sslConnection.getSslEndPoint();
296             }
297 
298             AsyncConnection connection = selectSet.getManager().newConnection(channel, endPoint, holder);
299             endPoint.setConnection(connection);
300 
301             return result;
302         }
303 
304         @Override
305         public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
306         {
307             WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)attachment;
308             return new HandshakeConnection(endpoint, holder);
309         }
310 
311         @Override
312         protected void endPointOpened(SelectChannelEndPoint endpoint)
313         {
314             // TODO expose on outer class ??
315         }
316 
317         @Override
318         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
319         {
320             LOG.debug("upgrade {} -> {}", oldConnection, endpoint.getConnection());
321         }
322 
323         @Override
324         protected void endPointClosed(SelectChannelEndPoint endpoint)
325         {
326             endpoint.getConnection().onClose();
327         }
328 
329         @Override
330         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
331         {
332             if (!(attachment instanceof WebSocketClient.WebSocketFuture))
333                 super.connectionFailed(channel, ex, attachment);
334             else
335             {
336                 __log.debug(ex);
337                 WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;
338 
339                 future.handshakeFailed(ex);
340             }
341         }
342     }
343 
344     /* ------------------------------------------------------------ */
345     /**
346      * Handshake Connection.
347      * Handles the connection until the handshake succeeds or fails.
348      */
349     class HandshakeConnection extends AbstractConnection implements AsyncConnection
350     {
351         private final AsyncEndPoint _endp;
352         private final WebSocketClient.WebSocketFuture _future;
353         private final String _key;
354         private final HttpParser _parser;
355         private String _accept;
356         private String _error;
357         private ByteArrayBuffer _handshake;
358 
359         public HandshakeConnection(AsyncEndPoint endpoint, WebSocketClient.WebSocketFuture future)
360         {
361             super(endpoint, System.currentTimeMillis());
362             _endp = endpoint;
363             _future = future;
364 
365             byte[] bytes = new byte[16];
366             new Random().nextBytes(bytes);
367             _key = new String(B64Code.encode(bytes));
368 
369             Buffers buffers = new SimpleBuffers(_buffers.getBuffer(), null);
370             _parser = new HttpParser(buffers, _endp, new HttpParser.EventHandler()
371             {
372                 @Override
373                 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
374                 {
375                     if (status != 101)
376                     {
377                         _error = "Bad response status " + status + " " + reason;
378                         _endp.close();
379                     }
380                 }
381 
382                 @Override
383                 public void parsedHeader(Buffer name, Buffer value) throws IOException
384                 {
385                     if (__ACCEPT.equals(name))
386                         _accept = value.toString();
387                 }
388 
389                 @Override
390                 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
391                 {
392                     if (_error == null)
393                         _error = "Bad response: " + method + " " + url + " " + version;
394                     _endp.close();
395                 }
396 
397                 @Override
398                 public void content(Buffer ref) throws IOException
399                 {
400                     if (_error == null)
401                         _error = "Bad response. " + ref.length() + "B of content?";
402                     _endp.close();
403                 }
404             });
405         }
406 
407         private boolean handshake()
408         {
409             if (_handshake==null)
410             {
411                 String path = _future.getURI().getPath();
412                 if (path == null || path.length() == 0)
413                     path = "/";
414 
415                 if (_future.getURI().getRawQuery() != null)
416                     path += "?" + _future.getURI().getRawQuery();
417 
418                 String origin = _future.getOrigin();
419 
420                 StringBuilder request = new StringBuilder(512);
421                 request.append("GET ").append(path).append(" HTTP/1.1\r\n")
422                 .append("Host: ").append(_future.getURI().getHost()).append(":")
423                 .append(_future.getURI().getPort()).append("\r\n")
424                 .append("Upgrade: websocket\r\n")
425                 .append("Connection: Upgrade\r\n")
426                 .append("Sec-WebSocket-Key: ")
427                 .append(_key).append("\r\n");
428 
429                 if (origin != null)
430                     request.append("Origin: ").append(origin).append("\r\n");
431 
432                 request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionRFC6455.VERSION).append("\r\n");
433 
434                 if (_future.getProtocol() != null)
435                     request.append("Sec-WebSocket-Protocol: ").append(_future.getProtocol()).append("\r\n");
436 
437                 Map<String, String> cookies = _future.getCookies();
438                 if (cookies != null && cookies.size() > 0)
439                 {
440                     for (String cookie : cookies.keySet())
441                         request.append("Cookie: ")
442                         .append(QuotedStringTokenizer.quoteIfNeeded(cookie, HttpFields.__COOKIE_DELIM))
443                         .append("=")
444                         .append(QuotedStringTokenizer.quoteIfNeeded(cookies.get(cookie), HttpFields.__COOKIE_DELIM))
445                         .append("\r\n");
446                 }
447 
448                 request.append("\r\n");
449 
450                 _handshake=new ByteArrayBuffer(request.toString(), false);
451             }
452             
453             // TODO extensions
454 
455             try
456             {
457                 int len = _handshake.length();
458                 int flushed = _endp.flush(_handshake);
459                 if (flushed<0)
460                     throw new IOException("incomplete handshake");
461             }
462             catch (IOException e)
463             {
464                 _future.handshakeFailed(e);
465             }
466             return _handshake.length()==0;
467         }
468 
469         public Connection handle() throws IOException
470         {
471             while (_endp.isOpen() && !_parser.isComplete())
472             {
473                 if (_handshake==null || _handshake.length()>0)
474                     if (!handshake())
475                         return this;
476 
477                 if (!_parser.parseAvailable())
478                 {
479                     if (_endp.isInputShutdown())
480                         _future.handshakeFailed(new IOException("Incomplete handshake response"));
481                     return this;
482                 }
483             }
484             if (_error == null)
485             {
486                 if (_accept == null)
487                 {
488                     _error = "No Sec-WebSocket-Accept";
489                 }
490                 else if (!WebSocketConnectionRFC6455.hashKey(_key).equals(_accept))
491                 {
492                     _error = "Bad Sec-WebSocket-Accept";
493                 }
494                 else
495                 {
496                     WebSocketConnection connection = newWebSocketConnection();
497 
498                     Buffer header = _parser.getHeaderBuffer();
499                     if (header.hasContent())
500                         connection.fillBuffersFrom(header);
501                     _buffers.returnBuffer(header);
502 
503                     _future.onConnection(connection);
504 
505                     return connection;
506                 }
507             }
508 
509             _endp.close();
510             return this;
511         }
512 
513         private WebSocketConnection newWebSocketConnection() throws IOException
514         {
515             return new WebSocketClientConnection(
516                     _future._client.getFactory(),
517                     _future.getWebSocket(),
518                     _endp,
519                     _buffers,
520                     System.currentTimeMillis(),
521                     _future.getMaxIdleTime(),
522                     _future.getProtocol(),
523                     null,
524                     WebSocketConnectionRFC6455.VERSION,
525                     _future.getMaskGen());
526         }
527 
528         public void onInputShutdown() throws IOException
529         {
530             _endp.close();
531         }
532 
533         public boolean isIdle()
534         {
535             return false;
536         }
537 
538         public boolean isSuspended()
539         {
540             return false;
541         }
542 
543         public void onClose()
544         {
545             if (_error != null)
546                 _future.handshakeFailed(new ProtocolException(_error));
547             else
548                 _future.handshakeFailed(new EOFException());
549         }
550     }
551 
552     private static class WebSocketClientConnection extends WebSocketConnectionRFC6455
553     {
554         private final WebSocketClientFactory factory;
555 
556         public WebSocketClientConnection(WebSocketClientFactory factory, WebSocket webSocket, EndPoint endPoint, WebSocketBuffers buffers, long timeStamp, int maxIdleTime, String protocol, List<Extension> extensions, int draftVersion, MaskGen maskGen) throws IOException
557         {
558             super(webSocket, endPoint, buffers, timeStamp, maxIdleTime, protocol, extensions, draftVersion, maskGen);
559             this.factory = factory;
560         }
561 
562         @Override
563         public void onClose()
564         {
565             super.onClose();
566             factory.removeConnection(this);
567         }
568     }
569 }