1 package org.eclipse.jetty.websocket;
2
3 import java.io.EOFException;
4 import java.io.IOException;
5 import java.net.ProtocolException;
6 import java.nio.channels.SelectionKey;
7 import java.nio.channels.SocketChannel;
8 import java.util.Random;
9
10 import org.eclipse.jetty.http.HttpFields;
11 import org.eclipse.jetty.http.HttpParser;
12 import org.eclipse.jetty.io.AbstractConnection;
13 import org.eclipse.jetty.io.AsyncEndPoint;
14 import org.eclipse.jetty.io.Buffer;
15 import org.eclipse.jetty.io.Buffers;
16 import org.eclipse.jetty.io.ByteArrayBuffer;
17 import org.eclipse.jetty.io.ConnectedEndPoint;
18 import org.eclipse.jetty.io.Connection;
19 import org.eclipse.jetty.io.SimpleBuffers;
20 import org.eclipse.jetty.io.nio.AsyncConnection;
21 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
22 import org.eclipse.jetty.io.nio.SelectorManager;
23 import org.eclipse.jetty.util.B64Code;
24 import org.eclipse.jetty.util.QuotedStringTokenizer;
25 import org.eclipse.jetty.util.component.AggregateLifeCycle;
26 import org.eclipse.jetty.util.component.LifeCycle;
27 import org.eclipse.jetty.util.log.Logger;
28 import org.eclipse.jetty.util.thread.QueuedThreadPool;
29 import org.eclipse.jetty.util.thread.ThreadPool;
30
31
32
33
34
35
36
37
38
39
40
41 public class WebSocketClientFactory extends AggregateLifeCycle
42 {
43 private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
44 private final static Random __random = new Random();
45 private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
46
47 private final ThreadPool _threadPool;
48 private final WebSocketClientSelector _selector;
49 private MaskGen _maskGen;
50 private WebSocketBuffers _buffers;
51
52
53
54
55
56 public WebSocketClientFactory()
57 {
58 _threadPool=new QueuedThreadPool();
59 addBean(_threadPool);
60 _buffers=new WebSocketBuffers(8*1024);
61 addBean(_buffers);
62 _maskGen=new RandomMaskGen();
63 addBean(_maskGen);
64 _selector=new WebSocketClientSelector();
65 addBean(_selector);
66 }
67
68
69
70
71
72
73 public WebSocketClientFactory(ThreadPool threadPool)
74 {
75 _threadPool=threadPool;
76 addBean(threadPool);
77 _buffers=new WebSocketBuffers(8*1024);
78 addBean(_buffers);
79 _maskGen=new RandomMaskGen();
80 addBean(_maskGen);
81 _selector=new WebSocketClientSelector();
82 addBean(_selector);
83 }
84
85
86
87
88
89
90
91
92 public WebSocketClientFactory(ThreadPool threadPool,MaskGen maskGen,int bufferSize)
93 {
94 _threadPool=threadPool;
95 addBean(threadPool);
96 _buffers=new WebSocketBuffers(bufferSize);
97 addBean(_buffers);
98 _maskGen=maskGen;
99 _selector=new WebSocketClientSelector();
100 addBean(_selector);
101 }
102
103
104
105
106
107
108 public SelectorManager getSelectorManager()
109 {
110 return _selector;
111 }
112
113
114
115
116
117
118 public ThreadPool getThreadPool()
119 {
120 return _threadPool;
121 }
122
123
124
125
126
127
128 public MaskGen getMaskGen()
129 {
130 return _maskGen;
131 }
132
133
134
135
136
137
138 public void setMaskGen(MaskGen maskGen)
139 {
140 if (isRunning())
141 throw new IllegalStateException(getState());
142 if (removeBean(_maskGen))
143 addBean(maskGen);
144 _maskGen=maskGen;
145 }
146
147
148
149
150
151
152 public void setBufferSize(int bufferSize)
153 {
154 if (isRunning())
155 throw new IllegalStateException(getState());
156 removeBean(_buffers);
157 _buffers=new WebSocketBuffers(bufferSize);
158 addBean(_buffers);
159 }
160
161
162
163
164
165 public int getBufferSize()
166 {
167 return _buffers.getBufferSize();
168 }
169
170
171
172
173
174
175
176
177 public WebSocketClient newWebSocketClient()
178 {
179 return new WebSocketClient(this);
180 }
181
182
183 @Override
184 protected void doStart() throws Exception
185 {
186 super.doStart();
187 if (getThreadPool() instanceof LifeCycle && !((LifeCycle)getThreadPool()).isStarted())
188 ((LifeCycle)getThreadPool()).start();
189 }
190
191
192 @Override
193 protected void doStop() throws Exception
194 {
195 super.doStop();
196 }
197
198
199
200
201 class WebSocketClientSelector extends SelectorManager
202 {
203 @Override
204 public boolean dispatch(Runnable task)
205 {
206 return _threadPool.dispatch(task);
207 }
208
209 @Override
210 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey key) throws IOException
211 {
212 SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key,channel.socket().getSoTimeout());
213 endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
214 return endp;
215 }
216
217 @Override
218 public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
219 {
220 WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture) attachment;
221 return new HandshakeConnection(endpoint,holder);
222 }
223
224 @Override
225 protected void endPointOpened(SelectChannelEndPoint endpoint)
226 {
227
228 }
229
230 @Override
231 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
232 {
233 LOG.debug("upgrade {} -> {}",oldConnection,endpoint.getConnection());
234 }
235
236 @Override
237 protected void endPointClosed(SelectChannelEndPoint endpoint)
238 {
239 endpoint.getConnection().onClose();
240 }
241
242 @Override
243 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
244 {
245 if (!(attachment instanceof WebSocketClient.WebSocketFuture))
246 super.connectionFailed(channel,ex,attachment);
247 else
248 {
249 __log.debug(ex);
250 WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;
251
252 future.handshakeFailed(ex);
253 }
254 }
255 }
256
257
258
259
260
261
262 class HandshakeConnection extends AbstractConnection implements AsyncConnection
263 {
264 private final AsyncEndPoint _endp;
265 private final WebSocketClient.WebSocketFuture _future;
266 private final String _key;
267 private final HttpParser _parser;
268 private String _accept;
269 private String _error;
270
271 public HandshakeConnection(AsyncEndPoint endpoint, WebSocketClient.WebSocketFuture future)
272 {
273 super(endpoint,System.currentTimeMillis());
274 _endp=endpoint;
275 _future=future;
276
277 byte[] bytes=new byte[16];
278 __random.nextBytes(bytes);
279 _key=new String(B64Code.encode(bytes));
280
281
282 Buffers buffers = new SimpleBuffers(_buffers.getBuffer(),null);
283 _parser=new HttpParser(buffers,_endp,
284
285 new HttpParser.EventHandler()
286 {
287 @Override
288 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
289 {
290 if (status!=101)
291 {
292 _error="Bad response status "+status+" "+reason;
293 _endp.close();
294 }
295 }
296
297 @Override
298 public void parsedHeader(Buffer name, Buffer value) throws IOException
299 {
300 if (__ACCEPT.equals(name))
301 _accept=value.toString();
302 }
303
304 @Override
305 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
306 {
307 if (_error==null)
308 _error="Bad response: "+method+" "+url+" "+version;
309 _endp.close();
310 }
311
312 @Override
313 public void content(Buffer ref) throws IOException
314 {
315 if (_error==null)
316 _error="Bad response. "+ref.length()+"B of content?";
317 _endp.close();
318 }
319 });
320
321 String path=_future.getURI().getPath();
322 if (path==null || path.length()==0)
323 {
324 path="/";
325 }
326
327 if(_future.getURI().getRawQuery() != null)
328 {
329 path += "?" + _future.getURI().getRawQuery();
330 }
331
332 String origin = future.getOrigin();
333
334 StringBuilder request = new StringBuilder(512);
335 request
336 .append("GET ").append(path).append(" HTTP/1.1\r\n")
337 .append("Host: ").append(future.getURI().getHost()).append(":").append(_future.getURI().getPort()).append("\r\n")
338 .append("Upgrade: websocket\r\n")
339 .append("Connection: Upgrade\r\n")
340 .append("Sec-WebSocket-Key: ")
341 .append(_key).append("\r\n");
342
343 if(origin!=null)
344 request.append("Origin: ").append(origin).append("\r\n");
345
346 request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionD13.VERSION).append("\r\n");
347
348 if (future.getProtocol()!=null)
349 request.append("Sec-WebSocket-Protocol: ").append(future.getProtocol()).append("\r\n");
350
351 if (future.getCookies()!=null && future.getCookies().size()>0)
352 {
353 for (String cookie : future.getCookies().keySet())
354 request
355 .append("Cookie: ")
356 .append(QuotedStringTokenizer.quoteIfNeeded(cookie,HttpFields.__COOKIE_DELIM))
357 .append("=")
358 .append(QuotedStringTokenizer.quoteIfNeeded(future.getCookies().get(cookie),HttpFields.__COOKIE_DELIM))
359 .append("\r\n");
360 }
361
362 request.append("\r\n");
363
364
365
366 try
367 {
368 Buffer handshake = new ByteArrayBuffer(request.toString(),false);
369 int len=handshake.length();
370 if (len!=_endp.flush(handshake))
371 throw new IOException("incomplete");
372 }
373 catch(IOException e)
374 {
375 future.handshakeFailed(e);
376 }
377 }
378
379 public Connection handle() throws IOException
380 {
381 while (_endp.isOpen() && !_parser.isComplete())
382 {
383 if (!_parser.parseAvailable())
384 {
385 if (_endp.isInputShutdown())
386 _future.handshakeFailed(new IOException("Incomplete handshake response"));
387 return this;
388 }
389 }
390 if (_error==null)
391 {
392 if (_accept==null)
393 _error="No Sec-WebSocket-Accept";
394 else if (!WebSocketConnectionD13.hashKey(_key).equals(_accept))
395 _error="Bad Sec-WebSocket-Accept";
396 else
397 {
398 Buffer header=_parser.getHeaderBuffer();
399 MaskGen maskGen=_future.getMaskGen();
400 WebSocketConnectionD13 connection =
401 new WebSocketConnectionD13(_future.getWebSocket(),
402 _endp,
403 _buffers,System.currentTimeMillis(),
404 _future.getMaxIdleTime(),
405 _future.getProtocol(),
406 null,
407 WebSocketConnectionD13.VERSION,
408 maskGen);
409
410 if (header.hasContent())
411 connection.fillBuffersFrom(header);
412 _buffers.returnBuffer(header);
413
414 _future.onConnection(connection);
415
416 return connection;
417 }
418 }
419
420 _endp.close();
421 return this;
422 }
423
424 public void onInputShutdown() throws IOException
425 {
426 _endp.close();
427 }
428
429 public boolean isIdle()
430 {
431 return false;
432 }
433
434 public boolean isSuspended()
435 {
436 return false;
437 }
438
439 public void onClose()
440 {
441 if (_error!=null)
442 _future.handshakeFailed(new ProtocolException(_error));
443 else
444 _future.handshakeFailed(new EOFException());
445 }
446
447 public String toString()
448 {
449 return "HS"+super.toString();
450 }
451 }
452 }