1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.eclipse.jetty.websocket;
17
18 import java.io.EOFException;
19 import java.io.IOException;
20 import java.net.ProtocolException;
21 import java.nio.channels.SelectionKey;
22 import java.nio.channels.SocketChannel;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Queue;
26 import java.util.Random;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import javax.net.ssl.SSLEngine;
29
30 import org.eclipse.jetty.http.HttpFields;
31 import org.eclipse.jetty.http.HttpParser;
32 import org.eclipse.jetty.io.AbstractConnection;
33 import org.eclipse.jetty.io.AsyncEndPoint;
34 import org.eclipse.jetty.io.Buffer;
35 import org.eclipse.jetty.io.Buffers;
36 import org.eclipse.jetty.io.ByteArrayBuffer;
37 import org.eclipse.jetty.io.ConnectedEndPoint;
38 import org.eclipse.jetty.io.Connection;
39 import org.eclipse.jetty.io.EndPoint;
40 import org.eclipse.jetty.io.SimpleBuffers;
41 import org.eclipse.jetty.io.nio.AsyncConnection;
42 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
43 import org.eclipse.jetty.io.nio.SelectorManager;
44 import org.eclipse.jetty.io.nio.SslConnection;
45 import org.eclipse.jetty.util.B64Code;
46 import org.eclipse.jetty.util.QuotedStringTokenizer;
47 import org.eclipse.jetty.util.component.AggregateLifeCycle;
48 import org.eclipse.jetty.util.log.Logger;
49 import org.eclipse.jetty.util.ssl.SslContextFactory;
50 import org.eclipse.jetty.util.thread.QueuedThreadPool;
51 import org.eclipse.jetty.util.thread.ThreadPool;
52
53
54
55
56
57
58
59
60
61
62
63 public class WebSocketClientFactory extends AggregateLifeCycle
64 {
65 private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
66 private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
67 private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<WebSocketConnection>();
68 private final SslContextFactory _sslContextFactory = new SslContextFactory();
69 private final ThreadPool _threadPool;
70 private final WebSocketClientSelector _selector;
71 private MaskGen _maskGen;
72 private WebSocketBuffers _buffers;
73
74
75
76
77
78 public WebSocketClientFactory()
79 {
80 this(null);
81 }
82
83
84
85
86
87
88
89 public WebSocketClientFactory(ThreadPool threadPool)
90 {
91 this(threadPool, new RandomMaskGen());
92 }
93
94
95
96
97
98
99
100
101 public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen)
102 {
103 this(threadPool, maskGen, 8192);
104 }
105
106
107
108
109
110
111
112
113
114
115 public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen, int bufferSize)
116 {
117 if (threadPool == null)
118 threadPool = new QueuedThreadPool();
119 _threadPool = threadPool;
120 addBean(_threadPool);
121
122 _buffers = new WebSocketBuffers(bufferSize);
123 addBean(_buffers);
124
125 _maskGen = maskGen;
126 addBean(_maskGen);
127
128 _selector = new WebSocketClientSelector();
129 addBean(_selector);
130
131 addBean(_sslContextFactory);
132 }
133
134
135
136
137
138 public SslContextFactory getSslContextFactory()
139 {
140 return _sslContextFactory;
141 }
142
143
144
145
146
147
148
149 public SelectorManager getSelectorManager()
150 {
151 return _selector;
152 }
153
154
155
156
157
158
159
160
161 public ThreadPool getThreadPool()
162 {
163 return _threadPool;
164 }
165
166
167
168
169
170
171 public MaskGen getMaskGen()
172 {
173 return _maskGen;
174 }
175
176
177
178
179
180
181 public void setMaskGen(MaskGen maskGen)
182 {
183 if (isRunning())
184 throw new IllegalStateException(getState());
185 removeBean(_maskGen);
186 _maskGen = maskGen;
187 addBean(maskGen);
188 }
189
190
191
192
193
194
195 public void setBufferSize(int bufferSize)
196 {
197 if (isRunning())
198 throw new IllegalStateException(getState());
199 removeBean(_buffers);
200 _buffers = new WebSocketBuffers(bufferSize);
201 addBean(_buffers);
202 }
203
204
205
206
207
208 public int getBufferSize()
209 {
210 return _buffers.getBufferSize();
211 }
212
213 @Override
214 protected void doStop() throws Exception
215 {
216 closeConnections();
217 super.doStop();
218 }
219
220
221
222
223
224
225
226
227 public WebSocketClient newWebSocketClient()
228 {
229 return new WebSocketClient(this);
230 }
231
232 protected SSLEngine newSslEngine(SocketChannel channel) throws IOException
233 {
234 SSLEngine sslEngine;
235 if (channel != null)
236 {
237 String peerHost = channel.socket().getInetAddress().getHostAddress();
238 int peerPort = channel.socket().getPort();
239 sslEngine = _sslContextFactory.newSslEngine(peerHost, peerPort);
240 }
241 else
242 {
243 sslEngine = _sslContextFactory.newSslEngine();
244 }
245 sslEngine.setUseClientMode(true);
246 sslEngine.beginHandshake();
247
248 return sslEngine;
249 }
250
251 protected boolean addConnection(WebSocketConnection connection)
252 {
253 return isRunning() && connections.add(connection);
254 }
255
256 protected boolean removeConnection(WebSocketConnection connection)
257 {
258 return connections.remove(connection);
259 }
260
261 protected void closeConnections()
262 {
263 for (WebSocketConnection connection : connections)
264 connection.shutdown();
265 }
266
267
268
269
270
271 class WebSocketClientSelector extends SelectorManager
272 {
273 @Override
274 public boolean dispatch(Runnable task)
275 {
276 return _threadPool.dispatch(task);
277 }
278
279 @Override
280 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey key) throws IOException
281 {
282 WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)key.attachment();
283 int maxIdleTime = holder.getMaxIdleTime();
284 if (maxIdleTime < 0)
285 maxIdleTime = (int)getMaxIdleTime();
286 SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTime);
287 AsyncEndPoint endPoint = result;
288
289
290 if ("wss".equals(holder.getURI().getScheme()))
291 {
292 SSLEngine sslEngine = newSslEngine(channel);
293 SslConnection sslConnection = new SslConnection(sslEngine, endPoint);
294 endPoint.setConnection(sslConnection);
295 endPoint = sslConnection.getSslEndPoint();
296 }
297
298 AsyncConnection connection = selectSet.getManager().newConnection(channel, endPoint, holder);
299 endPoint.setConnection(connection);
300
301 return result;
302 }
303
304 @Override
305 public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
306 {
307 WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)attachment;
308 return new HandshakeConnection(endpoint, holder);
309 }
310
311 @Override
312 protected void endPointOpened(SelectChannelEndPoint endpoint)
313 {
314
315 }
316
317 @Override
318 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
319 {
320 LOG.debug("upgrade {} -> {}", oldConnection, endpoint.getConnection());
321 }
322
323 @Override
324 protected void endPointClosed(SelectChannelEndPoint endpoint)
325 {
326 endpoint.getConnection().onClose();
327 }
328
329 @Override
330 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
331 {
332 if (!(attachment instanceof WebSocketClient.WebSocketFuture))
333 super.connectionFailed(channel, ex, attachment);
334 else
335 {
336 __log.debug(ex);
337 WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;
338
339 future.handshakeFailed(ex);
340 }
341 }
342 }
343
344
345
346
347
348
349 class HandshakeConnection extends AbstractConnection implements AsyncConnection
350 {
351 private final AsyncEndPoint _endp;
352 private final WebSocketClient.WebSocketFuture _future;
353 private final String _key;
354 private final HttpParser _parser;
355 private String _accept;
356 private String _error;
357 private boolean _handshaken;
358
359 public HandshakeConnection(AsyncEndPoint endpoint, WebSocketClient.WebSocketFuture future)
360 {
361 super(endpoint, System.currentTimeMillis());
362 _endp = endpoint;
363 _future = future;
364
365 byte[] bytes = new byte[16];
366 new Random().nextBytes(bytes);
367 _key = new String(B64Code.encode(bytes));
368
369 Buffers buffers = new SimpleBuffers(_buffers.getBuffer(), null);
370 _parser = new HttpParser(buffers, _endp, new HttpParser.EventHandler()
371 {
372 @Override
373 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
374 {
375 if (status != 101)
376 {
377 _error = "Bad response status " + status + " " + reason;
378 _endp.close();
379 }
380 }
381
382 @Override
383 public void parsedHeader(Buffer name, Buffer value) throws IOException
384 {
385 if (__ACCEPT.equals(name))
386 _accept = value.toString();
387 }
388
389 @Override
390 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
391 {
392 if (_error == null)
393 _error = "Bad response: " + method + " " + url + " " + version;
394 _endp.close();
395 }
396
397 @Override
398 public void content(Buffer ref) throws IOException
399 {
400 if (_error == null)
401 _error = "Bad response. " + ref.length() + "B of content?";
402 _endp.close();
403 }
404 });
405 }
406
407 private void handshake()
408 {
409 String path = _future.getURI().getPath();
410 if (path == null || path.length() == 0)
411 path = "/";
412
413 if (_future.getURI().getRawQuery() != null)
414 path += "?" + _future.getURI().getRawQuery();
415
416 String origin = _future.getOrigin();
417
418 StringBuilder request = new StringBuilder(512);
419 request.append("GET ").append(path).append(" HTTP/1.1\r\n")
420 .append("Host: ").append(_future.getURI().getHost()).append(":")
421 .append(_future.getURI().getPort()).append("\r\n")
422 .append("Upgrade: websocket\r\n")
423 .append("Connection: Upgrade\r\n")
424 .append("Sec-WebSocket-Key: ")
425 .append(_key).append("\r\n");
426
427 if (origin != null)
428 request.append("Origin: ").append(origin).append("\r\n");
429
430 request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionRFC6455.VERSION).append("\r\n");
431
432 if (_future.getProtocol() != null)
433 request.append("Sec-WebSocket-Protocol: ").append(_future.getProtocol()).append("\r\n");
434
435 Map<String, String> cookies = _future.getCookies();
436 if (cookies != null && cookies.size() > 0)
437 {
438 for (String cookie : cookies.keySet())
439 request.append("Cookie: ")
440 .append(QuotedStringTokenizer.quoteIfNeeded(cookie, HttpFields.__COOKIE_DELIM))
441 .append("=")
442 .append(QuotedStringTokenizer.quoteIfNeeded(cookies.get(cookie), HttpFields.__COOKIE_DELIM))
443 .append("\r\n");
444 }
445
446 request.append("\r\n");
447
448
449
450 try
451 {
452 Buffer handshake = new ByteArrayBuffer(request.toString(), false);
453 int len = handshake.length();
454 if (len != _endp.flush(handshake))
455 throw new IOException("incomplete");
456 }
457 catch (IOException e)
458 {
459 _future.handshakeFailed(e);
460 }
461 finally
462 {
463 _handshaken = true;
464 }
465 }
466
467 public Connection handle() throws IOException
468 {
469 while (_endp.isOpen() && !_parser.isComplete())
470 {
471 if (!_handshaken)
472 handshake();
473
474 if (!_parser.parseAvailable())
475 {
476 if (_endp.isInputShutdown())
477 _future.handshakeFailed(new IOException("Incomplete handshake response"));
478 return this;
479 }
480 }
481 if (_error == null)
482 {
483 if (_accept == null)
484 {
485 _error = "No Sec-WebSocket-Accept";
486 }
487 else if (!WebSocketConnectionRFC6455.hashKey(_key).equals(_accept))
488 {
489 _error = "Bad Sec-WebSocket-Accept";
490 }
491 else
492 {
493 WebSocketConnection connection = newWebSocketConnection();
494
495 Buffer header = _parser.getHeaderBuffer();
496 if (header.hasContent())
497 connection.fillBuffersFrom(header);
498 _buffers.returnBuffer(header);
499
500 _future.onConnection(connection);
501
502 return connection;
503 }
504 }
505
506 _endp.close();
507 return this;
508 }
509
510 private WebSocketConnection newWebSocketConnection() throws IOException
511 {
512 return new WebSocketClientConnection(
513 _future._client.getFactory(),
514 _future.getWebSocket(),
515 _endp,
516 _buffers,
517 System.currentTimeMillis(),
518 _future.getMaxIdleTime(),
519 _future.getProtocol(),
520 null,
521 WebSocketConnectionRFC6455.VERSION,
522 _future.getMaskGen());
523 }
524
525 public void onInputShutdown() throws IOException
526 {
527 _endp.close();
528 }
529
530 public boolean isIdle()
531 {
532 return false;
533 }
534
535 public boolean isSuspended()
536 {
537 return false;
538 }
539
540 public void onClose()
541 {
542 if (_error != null)
543 _future.handshakeFailed(new ProtocolException(_error));
544 else
545 _future.handshakeFailed(new EOFException());
546 }
547 }
548
549 private static class WebSocketClientConnection extends WebSocketConnectionRFC6455
550 {
551 private final WebSocketClientFactory factory;
552
553 public WebSocketClientConnection(WebSocketClientFactory factory, WebSocket webSocket, EndPoint endPoint, WebSocketBuffers buffers, long timeStamp, int maxIdleTime, String protocol, List<Extension> extensions, int draftVersion, MaskGen maskGen) throws IOException
554 {
555 super(webSocket, endPoint, buffers, timeStamp, maxIdleTime, protocol, extensions, draftVersion, maskGen);
556 this.factory = factory;
557 }
558
559 @Override
560 public void onClose()
561 {
562 super.onClose();
563 factory.removeConnection(this);
564 }
565 }
566 }