View Javadoc

1   package org.eclipse.jetty.websocket;
2   
3   import java.io.EOFException;
4   import java.io.IOException;
5   import java.net.ProtocolException;
6   import java.nio.channels.SelectionKey;
7   import java.nio.channels.SocketChannel;
8   import java.util.Random;
9   
10  import org.eclipse.jetty.http.HttpFields;
11  import org.eclipse.jetty.http.HttpParser;
12  import org.eclipse.jetty.io.AbstractConnection;
13  import org.eclipse.jetty.io.AsyncEndPoint;
14  import org.eclipse.jetty.io.Buffer;
15  import org.eclipse.jetty.io.Buffers;
16  import org.eclipse.jetty.io.ByteArrayBuffer;
17  import org.eclipse.jetty.io.ConnectedEndPoint;
18  import org.eclipse.jetty.io.Connection;
19  import org.eclipse.jetty.io.SimpleBuffers;
20  import org.eclipse.jetty.io.nio.AsyncConnection;
21  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
22  import org.eclipse.jetty.io.nio.SelectorManager;
23  import org.eclipse.jetty.util.B64Code;
24  import org.eclipse.jetty.util.QuotedStringTokenizer;
25  import org.eclipse.jetty.util.component.AggregateLifeCycle;
26  import org.eclipse.jetty.util.component.LifeCycle;
27  import org.eclipse.jetty.util.log.Logger;
28  import org.eclipse.jetty.util.thread.QueuedThreadPool;
29  import org.eclipse.jetty.util.thread.ThreadPool;
30  
31  
32  /* ------------------------------------------------------------ */
33  /**
34   * <p>WebSocketClientFactory contains the common components needed by multiple {@link WebSocketClient} instances
35   * (for example, a {@link ThreadPool}, a {@link SelectorManager NIO selector}, etc).</p>
36   * <p>WebSocketClients with different configurations should share the same factory to avoid to waste resources.</p>
37   * <p>If a ThreadPool or MaskGen is passed in the constructor, then it is not added with {@link AggregateLifeCycle#addBean(Object)},
38   * so it's lifecycle must be controlled externally.  
39   * @see WebSocketClient
40   */
41  public class WebSocketClientFactory extends AggregateLifeCycle
42  {
43      private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
44      private final static Random __random = new Random();
45      private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
46  
47      private final ThreadPool _threadPool;
48      private final WebSocketClientSelector _selector;
49      private MaskGen _maskGen;
50      private WebSocketBuffers _buffers;
51  
52      /* ------------------------------------------------------------ */
53      /**
54       * <p>Creates a WebSocketClientFactory with the default configuration.</p>
55       */
56      public WebSocketClientFactory()
57      {
58          _threadPool=new QueuedThreadPool();
59          addBean(_threadPool);
60          _buffers=new WebSocketBuffers(8*1024);
61          addBean(_buffers);
62          _maskGen=new RandomMaskGen();
63          addBean(_maskGen);
64          _selector=new WebSocketClientSelector();
65          addBean(_selector);
66      }
67  
68      /* ------------------------------------------------------------ */
69      /**
70       * <p>Creates a WebSocketClientFactory with the given ThreadPool and the default configuration.</p>
71       * @param threadPool the ThreadPool instance to use
72       */
73      public WebSocketClientFactory(ThreadPool threadPool)
74      {
75          _threadPool=threadPool;
76          addBean(threadPool);
77          _buffers=new WebSocketBuffers(8*1024);
78          addBean(_buffers);
79          _maskGen=new RandomMaskGen();
80          addBean(_maskGen);
81          _selector=new WebSocketClientSelector();
82          addBean(_selector);
83      }
84  
85      /* ------------------------------------------------------------ */
86      /**
87       * <p>Creates a WebSocketClientFactory with the specified configuration.</p>
88       * @param threadPool the ThreadPool instance to use
89       * @param maskGen the mask generator to use
90       * @param bufferSize the read buffer size
91       */
92      public WebSocketClientFactory(ThreadPool threadPool,MaskGen maskGen,int bufferSize)
93      {
94          _threadPool=threadPool;
95          addBean(threadPool);
96          _buffers=new WebSocketBuffers(bufferSize);
97          addBean(_buffers);
98          _maskGen=maskGen;
99          _selector=new WebSocketClientSelector();
100         addBean(_selector);
101     }
102 
103     /* ------------------------------------------------------------ */
104     /**
105      * Get the selectorManager. Used to configure the manager.
106      * @return The {@link SelectorManager} instance.
107      */
108     public SelectorManager getSelectorManager()
109     {
110         return _selector;
111     }
112 
113     /* ------------------------------------------------------------ */
114     /** Get the ThreadPool.
115      * Used to set/query the thread pool configuration.
116      * @return The {@link ThreadPool}
117      */
118     public ThreadPool getThreadPool()
119     {
120         return _threadPool;
121     }
122 
123     /* ------------------------------------------------------------ */
124     /**
125      * @return the shared mask generator, or null if no shared mask generator is used
126      * @see {@link WebSocketClient#getMaskGen()}
127      */
128     public MaskGen getMaskGen()
129     {
130         return _maskGen;
131     }
132 
133     /* ------------------------------------------------------------ */
134     /**
135      * @param maskGen the shared mask generator, or null if no shared mask generator is used
136      * @see {@link WebSocketClient#setMaskGen(MaskGen)}
137      */
138     public void setMaskGen(MaskGen maskGen)
139     {
140         if (isRunning())
141             throw new IllegalStateException(getState());
142         if (removeBean(_maskGen))
143             addBean(maskGen);
144         _maskGen=maskGen;
145     }
146 
147     /* ------------------------------------------------------------ */
148     /**
149      * @param bufferSize the read buffer size
150      * @see #getBufferSize()
151      */
152     public void setBufferSize(int bufferSize)
153     {
154         if (isRunning())
155             throw new IllegalStateException(getState());
156         removeBean(_buffers);
157         _buffers=new WebSocketBuffers(bufferSize);
158         addBean(_buffers);
159     }
160 
161     /* ------------------------------------------------------------ */
162     /**
163      * @return the read buffer size
164      */
165     public int getBufferSize()
166     {
167         return _buffers.getBufferSize();
168     }
169 
170     /* ------------------------------------------------------------ */
171     /**
172      * <p>Creates and returns a new instance of a {@link WebSocketClient}, configured with this
173      * WebSocketClientFactory instance.</p>
174      *
175      * @return a new {@link WebSocketClient} instance
176      */
177     public WebSocketClient newWebSocketClient()
178     {
179         return new WebSocketClient(this);
180     }
181 
182     /* ------------------------------------------------------------ */
183     @Override
184     protected void doStart() throws Exception
185     {
186         super.doStart();
187         if (getThreadPool() instanceof LifeCycle && !((LifeCycle)getThreadPool()).isStarted())
188             ((LifeCycle)getThreadPool()).start();
189     }
190 
191     /* ------------------------------------------------------------ */
192     @Override
193     protected void doStop() throws Exception
194     {
195         super.doStop();
196     }
197 
198     /* ------------------------------------------------------------ */
199     /** WebSocket Client Selector Manager
200      */
201     class WebSocketClientSelector extends SelectorManager
202     {
203         @Override
204         public boolean dispatch(Runnable task)
205         {
206             return _threadPool.dispatch(task);
207         }
208 
209         @Override
210         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey key) throws IOException
211         {
212             SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key,channel.socket().getSoTimeout());
213             endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
214             return endp;
215         }
216 
217         @Override
218         public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
219         {
220             WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture) attachment;
221             return new HandshakeConnection(endpoint,holder);
222         }
223 
224         @Override
225         protected void endPointOpened(SelectChannelEndPoint endpoint)
226         {
227             // TODO expose on outer class ??
228         }
229 
230         @Override
231         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
232         {
233             LOG.debug("upgrade {} -> {}",oldConnection,endpoint.getConnection());
234         }
235 
236         @Override
237         protected void endPointClosed(SelectChannelEndPoint endpoint)
238         {
239             endpoint.getConnection().onClose();
240         }
241 
242         @Override
243         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
244         {
245             if (!(attachment instanceof WebSocketClient.WebSocketFuture))
246                 super.connectionFailed(channel,ex,attachment);
247             else
248             {
249                 __log.debug(ex);
250                 WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;
251 
252                 future.handshakeFailed(ex);
253             }
254         }
255     }
256 
257 
258     /* ------------------------------------------------------------ */
259     /** Handshake Connection.
260      * Handles the connection until the handshake succeeds or fails.
261      */
262     class HandshakeConnection extends AbstractConnection implements AsyncConnection
263     {
264         private final AsyncEndPoint _endp;
265         private final WebSocketClient.WebSocketFuture _future;
266         private final String _key;
267         private final HttpParser _parser;
268         private String _accept;
269         private String _error;
270 
271         public HandshakeConnection(AsyncEndPoint endpoint, WebSocketClient.WebSocketFuture future)
272         {
273             super(endpoint,System.currentTimeMillis());
274             _endp=endpoint;
275             _future=future;
276 
277             byte[] bytes=new byte[16];
278             __random.nextBytes(bytes);
279             _key=new String(B64Code.encode(bytes));
280 
281 
282             Buffers buffers = new SimpleBuffers(_buffers.getBuffer(),null);
283             _parser=new HttpParser(buffers,_endp,
284 
285             new HttpParser.EventHandler()
286             {
287                 @Override
288                 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
289                 {
290                     if (status!=101)
291                     {
292                         _error="Bad response status "+status+" "+reason;
293                         _endp.close();
294                     }
295                 }
296 
297                 @Override
298                 public void parsedHeader(Buffer name, Buffer value) throws IOException
299                 {
300                     if (__ACCEPT.equals(name))
301                         _accept=value.toString();
302                 }
303 
304                 @Override
305                 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
306                 {
307                     if (_error==null)
308                         _error="Bad response: "+method+" "+url+" "+version;
309                     _endp.close();
310                 }
311 
312                 @Override
313                 public void content(Buffer ref) throws IOException
314                 {
315                     if (_error==null)
316                         _error="Bad response. "+ref.length()+"B of content?";
317                     _endp.close();
318                 }
319             });
320 
321             String path=_future.getURI().getPath();
322             if (path==null || path.length()==0) 
323             {
324                 path="/";
325             }
326             
327             if(_future.getURI().getRawQuery() != null)
328             {
329                 path += "?" + _future.getURI().getRawQuery();
330             }
331 
332             String origin = future.getOrigin();
333 
334             StringBuilder request = new StringBuilder(512);
335             request
336                 .append("GET ").append(path).append(" HTTP/1.1\r\n")
337                 .append("Host: ").append(future.getURI().getHost()).append(":").append(_future.getURI().getPort()).append("\r\n")
338                 .append("Upgrade: websocket\r\n")
339                 .append("Connection: Upgrade\r\n")
340                 .append("Sec-WebSocket-Key: ")
341                 .append(_key).append("\r\n");
342             
343             if(origin!=null)
344                 request.append("Origin: ").append(origin).append("\r\n");
345                 
346             request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionD13.VERSION).append("\r\n");
347 
348             if (future.getProtocol()!=null)
349                 request.append("Sec-WebSocket-Protocol: ").append(future.getProtocol()).append("\r\n");
350 
351             if (future.getCookies()!=null && future.getCookies().size()>0)
352             {
353                 for (String cookie : future.getCookies().keySet())
354                     request
355                         .append("Cookie: ")
356                         .append(QuotedStringTokenizer.quoteIfNeeded(cookie,HttpFields.__COOKIE_DELIM))
357                         .append("=")
358                         .append(QuotedStringTokenizer.quoteIfNeeded(future.getCookies().get(cookie),HttpFields.__COOKIE_DELIM))
359                         .append("\r\n");
360             }
361 
362             request.append("\r\n");
363 
364             // TODO extensions
365 
366             try
367             {
368                 Buffer handshake = new ByteArrayBuffer(request.toString(),false);
369                 int len=handshake.length();
370                 if (len!=_endp.flush(handshake))
371                     throw new IOException("incomplete");
372             }
373             catch(IOException e)
374             {
375                 future.handshakeFailed(e);
376             }
377         }
378 
379         public Connection handle() throws IOException
380         {
381             while (_endp.isOpen() && !_parser.isComplete())
382             {
383                 if (!_parser.parseAvailable())
384                 {
385                     if (_endp.isInputShutdown())
386                         _future.handshakeFailed(new IOException("Incomplete handshake response"));
387                     return this;
388                 }
389             }
390             if (_error==null)
391             {
392                 if (_accept==null)
393                     _error="No Sec-WebSocket-Accept";
394                 else if (!WebSocketConnectionD13.hashKey(_key).equals(_accept))
395                     _error="Bad Sec-WebSocket-Accept";
396                 else
397                 {
398                     Buffer header=_parser.getHeaderBuffer();
399                     MaskGen maskGen=_future.getMaskGen();
400                     WebSocketConnectionD13 connection = 
401                         new WebSocketConnectionD13(_future.getWebSocket(),
402                             _endp,
403                             _buffers,System.currentTimeMillis(),
404                             _future.getMaxIdleTime(),
405                             _future.getProtocol(),
406                             null,
407                             WebSocketConnectionD13.VERSION,
408                             maskGen);
409 
410                     if (header.hasContent())
411                         connection.fillBuffersFrom(header);
412                     _buffers.returnBuffer(header);
413 
414                     _future.onConnection(connection);
415 
416                     return connection;
417                 }
418             }
419 
420             _endp.close();
421             return this;
422         }
423 
424         public void onInputShutdown() throws IOException
425         {
426             _endp.close();
427         }
428 
429         public boolean isIdle()
430         {
431             return false;
432         }
433 
434         public boolean isSuspended()
435         {
436             return false;
437         }
438 
439         public void onClose()
440         {
441             if (_error!=null)
442                 _future.handshakeFailed(new ProtocolException(_error));
443             else
444                 _future.handshakeFailed(new EOFException());
445         }
446         
447         public String toString()
448         {
449             return "HS"+super.toString();
450         }
451     }
452 }