View Javadoc

1   package org.eclipse.jetty.websocket;
2   
3   import java.io.EOFException;
4   import java.io.IOException;
5   import java.net.ProtocolException;
6   import java.nio.channels.SelectionKey;
7   import java.nio.channels.SocketChannel;
8   import java.util.Random;
9   
10  import org.eclipse.jetty.http.HttpFields;
11  import org.eclipse.jetty.http.HttpParser;
12  import org.eclipse.jetty.io.AbstractConnection;
13  import org.eclipse.jetty.io.Buffer;
14  import org.eclipse.jetty.io.Buffers;
15  import org.eclipse.jetty.io.ByteArrayBuffer;
16  import org.eclipse.jetty.io.ConnectedEndPoint;
17  import org.eclipse.jetty.io.Connection;
18  import org.eclipse.jetty.io.SimpleBuffers;
19  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
20  import org.eclipse.jetty.io.nio.SelectorManager;
21  import org.eclipse.jetty.util.B64Code;
22  import org.eclipse.jetty.util.QuotedStringTokenizer;
23  import org.eclipse.jetty.util.component.AggregateLifeCycle;
24  import org.eclipse.jetty.util.component.LifeCycle;
25  import org.eclipse.jetty.util.log.Logger;
26  import org.eclipse.jetty.util.thread.QueuedThreadPool;
27  import org.eclipse.jetty.util.thread.ThreadPool;
28  
29  
30  /* ------------------------------------------------------------ */
31  /**
32   * <p>WebSocketClientFactory contains the common components needed by multiple {@link WebSocketClient} instances
33   * (for example, a {@link ThreadPool}, a {@link SelectorManager NIO selector}, etc).</p>
34   * <p>WebSocketClients with different configurations should share the same factory to avoid to waste resources.</p>
35   * <p>If a ThreadPool or MaskGen is passed in the constructor, then it is not added with {@link AggregateLifeCycle#addBean(Object)},
36   * so it's lifecycle must be controlled externally.  
37   * @see WebSocketClient
38   */
39  public class WebSocketClientFactory extends AggregateLifeCycle
40  {
41      private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
42      private final static Random __random = new Random();
43      private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
44  
45      private final ThreadPool _threadPool;
46      private final WebSocketClientSelector _selector;
47      private MaskGen _maskGen;
48      private WebSocketBuffers _buffers;
49  
50      /* ------------------------------------------------------------ */
51      /**
52       * <p>Creates a WebSocketClientFactory with the default configuration.</p>
53       */
54      public WebSocketClientFactory()
55      {
56          _threadPool=new QueuedThreadPool();
57          addBean(_threadPool);
58          _buffers=new WebSocketBuffers(8*1024);
59          addBean(_buffers);
60          _maskGen=new RandomMaskGen();
61          addBean(_maskGen);
62          _selector=new WebSocketClientSelector();
63          addBean(_selector);
64      }
65  
66      /* ------------------------------------------------------------ */
67      /**
68       * <p>Creates a WebSocketClientFactory with the given ThreadPool and the default configuration.</p>
69       * @param threadPool the ThreadPool instance to use
70       */
71      public WebSocketClientFactory(ThreadPool threadPool)
72      {
73          _threadPool=threadPool;
74          addBean(threadPool);
75          _buffers=new WebSocketBuffers(8*1024);
76          addBean(_buffers);
77          _maskGen=new RandomMaskGen();
78          addBean(_maskGen);
79          _selector=new WebSocketClientSelector();
80          addBean(_selector);
81      }
82  
83      /* ------------------------------------------------------------ */
84      /**
85       * <p>Creates a WebSocketClientFactory with the specified configuration.</p>
86       * @param threadPool the ThreadPool instance to use
87       * @param maskGen the mask generator to use
88       * @param bufferSize the read buffer size
89       */
90      public WebSocketClientFactory(ThreadPool threadPool,MaskGen maskGen,int bufferSize)
91      {
92          _threadPool=threadPool;
93          addBean(threadPool);
94          _buffers=new WebSocketBuffers(bufferSize);
95          addBean(_buffers);
96          _maskGen=maskGen;
97          _selector=new WebSocketClientSelector();
98          addBean(_selector);
99      }
100 
101     /* ------------------------------------------------------------ */
102     /**
103      * Get the selectorManager. Used to configure the manager.
104      * @return The {@link SelectorManager} instance.
105      */
106     public SelectorManager getSelectorManager()
107     {
108         return _selector;
109     }
110 
111     /* ------------------------------------------------------------ */
112     /** Get the ThreadPool.
113      * Used to set/query the thread pool configuration.
114      * @return The {@link ThreadPool}
115      */
116     public ThreadPool getThreadPool()
117     {
118         return _threadPool;
119     }
120 
121     /* ------------------------------------------------------------ */
122     /**
123      * @return the shared mask generator, or null if no shared mask generator is used
124      * @see {@link WebSocketClient#getMaskGen()}
125      */
126     public MaskGen getMaskGen()
127     {
128         return _maskGen;
129     }
130 
131     /* ------------------------------------------------------------ */
132     /**
133      * @param maskGen the shared mask generator, or null if no shared mask generator is used
134      * @see {@link WebSocketClient#setMaskGen(MaskGen)}
135      */
136     public void setMaskGen(MaskGen maskGen)
137     {
138         if (isRunning())
139             throw new IllegalStateException(getState());
140         if (removeBean(_maskGen))
141             addBean(maskGen);
142         _maskGen=maskGen;
143     }
144 
145     /* ------------------------------------------------------------ */
146     /**
147      * @param bufferSize the read buffer size
148      * @see #getBufferSize()
149      */
150     public void setBufferSize(int bufferSize)
151     {
152         if (isRunning())
153             throw new IllegalStateException(getState());
154         removeBean(_buffers);
155         _buffers=new WebSocketBuffers(bufferSize);
156         addBean(_buffers);
157     }
158 
159     /* ------------------------------------------------------------ */
160     /**
161      * @return the read buffer size
162      */
163     public int getBufferSize()
164     {
165         return _buffers.getBufferSize();
166     }
167 
168     /* ------------------------------------------------------------ */
169     /**
170      * <p>Creates and returns a new instance of a {@link WebSocketClient}, configured with this
171      * WebSocketClientFactory instance.</p>
172      *
173      * @return a new {@link WebSocketClient} instance
174      */
175     public WebSocketClient newWebSocketClient()
176     {
177         return new WebSocketClient(this);
178     }
179 
180     /* ------------------------------------------------------------ */
181     @Override
182     protected void doStart() throws Exception
183     {
184         super.doStart();
185         if (getThreadPool() instanceof LifeCycle && !((LifeCycle)getThreadPool()).isStarted())
186             ((LifeCycle)getThreadPool()).start();
187     }
188 
189     /* ------------------------------------------------------------ */
190     @Override
191     protected void doStop() throws Exception
192     {
193         super.doStop();
194     }
195 
196     /* ------------------------------------------------------------ */
197     /** WebSocket Client Selector Manager
198      */
199     class WebSocketClientSelector extends SelectorManager
200     {
201         @Override
202         public boolean dispatch(Runnable task)
203         {
204             return _threadPool.dispatch(task);
205         }
206 
207         @Override
208         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey sKey) throws IOException
209         {
210             return new SelectChannelEndPoint(channel,selectSet,sKey);
211         }
212 
213         @Override
214         protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
215         {
216             WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture) endpoint.getSelectionKey().attachment();
217             return new HandshakeConnection(endpoint,holder);
218         }
219 
220         @Override
221         protected void endPointOpened(SelectChannelEndPoint endpoint)
222         {
223             // TODO expose on outer class ??
224         }
225 
226         @Override
227         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
228         {
229             throw new IllegalStateException();
230         }
231 
232         @Override
233         protected void endPointClosed(SelectChannelEndPoint endpoint)
234         {
235             endpoint.getConnection().closed();
236         }
237 
238         @Override
239         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
240         {
241             if (!(attachment instanceof WebSocketClient.WebSocketFuture))
242                 super.connectionFailed(channel,ex,attachment);
243             else
244             {
245                 __log.debug(ex);
246                 WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;
247 
248                 future.handshakeFailed(ex);
249             }
250         }
251     }
252 
253 
254     /* ------------------------------------------------------------ */
255     /** Handshake Connection.
256      * Handles the connection until the handshake succeeds or fails.
257      */
258     class HandshakeConnection extends AbstractConnection
259     {
260         private final SelectChannelEndPoint _endp;
261         private final WebSocketClient.WebSocketFuture _future;
262         private final String _key;
263         private final HttpParser _parser;
264         private String _accept;
265         private String _error;
266 
267         public HandshakeConnection(SelectChannelEndPoint endpoint, WebSocketClient.WebSocketFuture future)
268         {
269             super(endpoint,System.currentTimeMillis());
270             _endp=endpoint;
271             _future=future;
272 
273             byte[] bytes=new byte[16];
274             __random.nextBytes(bytes);
275             _key=new String(B64Code.encode(bytes));
276 
277 
278             Buffers buffers = new SimpleBuffers(_buffers.getBuffer(),null);
279             _parser=new HttpParser(buffers,_endp,
280 
281             new HttpParser.EventHandler()
282             {
283                 @Override
284                 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
285                 {
286                     if (status!=101)
287                     {
288                         _error="Bad response status "+status+" "+reason;
289                         _endp.close();
290                     }
291                 }
292 
293                 @Override
294                 public void parsedHeader(Buffer name, Buffer value) throws IOException
295                 {
296                     if (__ACCEPT.equals(name))
297                         _accept=value.toString();
298                 }
299 
300                 @Override
301                 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
302                 {
303                     if (_error==null)
304                         _error="Bad response: "+method+" "+url+" "+version;
305                     _endp.close();
306                 }
307 
308                 @Override
309                 public void content(Buffer ref) throws IOException
310                 {
311                     if (_error==null)
312                         _error="Bad response. "+ref.length()+"B of content?";
313                     _endp.close();
314                 }
315             });
316 
317             String path=_future.getURI().getPath();
318             if (path==null || path.length()==0)
319                 path="/";
320 
321             String origin = future.getOrigin();
322 
323             StringBuilder request = new StringBuilder(512);
324             request
325                 .append("GET ").append(path).append(" HTTP/1.1\r\n")
326                 .append("Host: ").append(future.getURI().getHost()).append(":").append(_future.getURI().getPort()).append("\r\n")
327                 .append("Upgrade: websocket\r\n")
328                 .append("Connection: Upgrade\r\n")
329                 .append("Sec-WebSocket-Key: ")
330                 .append(_key).append("\r\n");
331             
332             if(origin!=null)
333                 request.append("Origin: ").append(origin).append("\r\n");
334                 
335             request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionD13.VERSION).append("\r\n");
336 
337             if (future.getProtocol()!=null)
338                 request.append("Sec-WebSocket-Protocol: ").append(future.getProtocol()).append("\r\n");
339 
340             if (future.getCookies()!=null && future.getCookies().size()>0)
341             {
342                 for (String cookie : future.getCookies().keySet())
343                     request
344                         .append("Cookie: ")
345                         .append(QuotedStringTokenizer.quoteIfNeeded(cookie,HttpFields.__COOKIE_DELIM))
346                         .append("=")
347                         .append(QuotedStringTokenizer.quoteIfNeeded(future.getCookies().get(cookie),HttpFields.__COOKIE_DELIM))
348                         .append("\r\n");
349             }
350 
351             request.append("\r\n");
352 
353             // TODO extensions
354 
355             try
356             {
357                 Buffer handshake = new ByteArrayBuffer(request.toString(),false);
358                 int len=handshake.length();
359                 if (len!=_endp.flush(handshake))
360                     throw new IOException("incomplete");
361             }
362             catch(IOException e)
363             {
364                 future.handshakeFailed(e);
365             }
366 
367         }
368 
369         public Connection handle() throws IOException
370         {
371             while (_endp.isOpen() && !_parser.isComplete())
372             {
373                 switch (_parser.parseAvailable())
374                 {
375                     case -1:
376                         _future.handshakeFailed(new IOException("Incomplete handshake response"));
377                         return this;
378                     case 0:
379                         return this;
380                     default:
381                         break;
382                 }
383             }
384             if (_error==null)
385             {
386                 if (_accept==null)
387                     _error="No Sec-WebSocket-Accept";
388                 else if (!WebSocketConnectionD13.hashKey(_key).equals(_accept))
389                     _error="Bad Sec-WebSocket-Accept";
390                 else
391                 {
392                     Buffer header=_parser.getHeaderBuffer();
393                     MaskGen maskGen=_future.getMaskGen();
394                     WebSocketConnectionD13 connection = new WebSocketConnectionD13(_future.getWebSocket(),_endp,_buffers,System.currentTimeMillis(),_future.getMaxIdleTime(),_future.getProtocol(),null,10,maskGen);
395 
396                     if (header.hasContent())
397                         connection.fillBuffersFrom(header);
398                     _buffers.returnBuffer(header);
399 
400                     _future.onConnection(connection);
401 
402                     return connection;
403                 }
404             }
405 
406             _endp.close();
407             return this;
408         }
409 
410         public boolean isIdle()
411         {
412             return false;
413         }
414 
415         public boolean isSuspended()
416         {
417             return false;
418         }
419 
420         public void closed()
421         {
422             if (_error!=null)
423                 _future.handshakeFailed(new ProtocolException(_error));
424             else
425                 _future.handshakeFailed(new EOFException());
426         }
427     }
428 }