1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.eclipse.jetty.websocket;
17
18 import java.io.EOFException;
19 import java.io.IOException;
20 import java.net.ProtocolException;
21 import java.nio.channels.SelectionKey;
22 import java.nio.channels.SocketChannel;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Queue;
26 import java.util.Random;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import javax.net.ssl.SSLEngine;
29
30 import org.eclipse.jetty.http.HttpFields;
31 import org.eclipse.jetty.http.HttpParser;
32 import org.eclipse.jetty.io.AbstractConnection;
33 import org.eclipse.jetty.io.AsyncEndPoint;
34 import org.eclipse.jetty.io.Buffer;
35 import org.eclipse.jetty.io.Buffers;
36 import org.eclipse.jetty.io.ByteArrayBuffer;
37 import org.eclipse.jetty.io.ConnectedEndPoint;
38 import org.eclipse.jetty.io.Connection;
39 import org.eclipse.jetty.io.EndPoint;
40 import org.eclipse.jetty.io.SimpleBuffers;
41 import org.eclipse.jetty.io.nio.AsyncConnection;
42 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
43 import org.eclipse.jetty.io.nio.SelectorManager;
44 import org.eclipse.jetty.io.nio.SslConnection;
45 import org.eclipse.jetty.util.B64Code;
46 import org.eclipse.jetty.util.QuotedStringTokenizer;
47 import org.eclipse.jetty.util.component.AggregateLifeCycle;
48 import org.eclipse.jetty.util.log.Logger;
49 import org.eclipse.jetty.util.ssl.SslContextFactory;
50 import org.eclipse.jetty.util.thread.QueuedThreadPool;
51 import org.eclipse.jetty.util.thread.ThreadPool;
52
53
54
55
56
57
58
59
60
61
62
63 public class WebSocketClientFactory extends AggregateLifeCycle
64 {
65 private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
66 private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
67 private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<WebSocketConnection>();
68 private final SslContextFactory _sslContextFactory = new SslContextFactory();
69 private final ThreadPool _threadPool;
70 private final WebSocketClientSelector _selector;
71 private MaskGen _maskGen;
72 private WebSocketBuffers _buffers;
73
74
75
76
77
78 public WebSocketClientFactory()
79 {
80 this(null);
81 }
82
83
84
85
86
87
88
89 public WebSocketClientFactory(ThreadPool threadPool)
90 {
91 this(threadPool, new RandomMaskGen());
92 }
93
94
95
96
97
98
99
100
101 public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen)
102 {
103 this(threadPool, maskGen, 8192);
104 }
105
106
107
108
109
110
111
112
113
114
115 public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen, int bufferSize)
116 {
117 if (threadPool == null)
118 threadPool = new QueuedThreadPool();
119 _threadPool = threadPool;
120 addBean(_threadPool);
121
122 _buffers = new WebSocketBuffers(bufferSize);
123 addBean(_buffers);
124
125 _maskGen = maskGen;
126 addBean(_maskGen);
127
128 _selector = new WebSocketClientSelector();
129 addBean(_selector);
130
131 addBean(_sslContextFactory);
132 }
133
134
135
136
137
138 public SslContextFactory getSslContextFactory()
139 {
140 return _sslContextFactory;
141 }
142
143
144
145
146
147
148
149 public SelectorManager getSelectorManager()
150 {
151 return _selector;
152 }
153
154
155
156
157
158
159
160
161 public ThreadPool getThreadPool()
162 {
163 return _threadPool;
164 }
165
166
167
168
169
170
171 public MaskGen getMaskGen()
172 {
173 return _maskGen;
174 }
175
176
177
178
179
180
181 public void setMaskGen(MaskGen maskGen)
182 {
183 if (isRunning())
184 throw new IllegalStateException(getState());
185 removeBean(_maskGen);
186 _maskGen = maskGen;
187 addBean(maskGen);
188 }
189
190
191
192
193
194
195 public void setBufferSize(int bufferSize)
196 {
197 if (isRunning())
198 throw new IllegalStateException(getState());
199 removeBean(_buffers);
200 _buffers = new WebSocketBuffers(bufferSize);
201 addBean(_buffers);
202 }
203
204
205
206
207
208 public int getBufferSize()
209 {
210 return _buffers.getBufferSize();
211 }
212
213 @Override
214 protected void doStop() throws Exception
215 {
216 closeConnections();
217 super.doStop();
218 }
219
220
221
222
223
224
225
226
227 public WebSocketClient newWebSocketClient()
228 {
229 return new WebSocketClient(this);
230 }
231
232 protected SSLEngine newSslEngine(SocketChannel channel) throws IOException
233 {
234 SSLEngine sslEngine;
235 if (channel != null)
236 {
237 String peerHost = channel.socket().getInetAddress().getHostAddress();
238 int peerPort = channel.socket().getPort();
239 sslEngine = _sslContextFactory.newSslEngine(peerHost, peerPort);
240 }
241 else
242 {
243 sslEngine = _sslContextFactory.newSslEngine();
244 }
245 sslEngine.setUseClientMode(true);
246 sslEngine.beginHandshake();
247
248 return sslEngine;
249 }
250
251 protected boolean addConnection(WebSocketConnection connection)
252 {
253 return isRunning() && connections.add(connection);
254 }
255
256 protected boolean removeConnection(WebSocketConnection connection)
257 {
258 return connections.remove(connection);
259 }
260
261 protected void closeConnections()
262 {
263 for (WebSocketConnection connection : connections)
264 connection.shutdown();
265 }
266
267
268
269
270
271 class WebSocketClientSelector extends SelectorManager
272 {
273 @Override
274 public boolean dispatch(Runnable task)
275 {
276 return _threadPool.dispatch(task);
277 }
278
279 @Override
280 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey key) throws IOException
281 {
282 WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)key.attachment();
283 int maxIdleTime = holder.getMaxIdleTime();
284 if (maxIdleTime < 0)
285 maxIdleTime = (int)getMaxIdleTime();
286 SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTime);
287 AsyncEndPoint endPoint = result;
288
289
290 if ("wss".equals(holder.getURI().getScheme()))
291 {
292 SSLEngine sslEngine = newSslEngine(channel);
293 SslConnection sslConnection = new SslConnection(sslEngine, endPoint);
294 endPoint.setConnection(sslConnection);
295 endPoint = sslConnection.getSslEndPoint();
296 }
297
298 AsyncConnection connection = selectSet.getManager().newConnection(channel, endPoint, holder);
299 endPoint.setConnection(connection);
300
301 return result;
302 }
303
304 @Override
305 public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
306 {
307 WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)attachment;
308 return new HandshakeConnection(endpoint, holder);
309 }
310
311 @Override
312 protected void endPointOpened(SelectChannelEndPoint endpoint)
313 {
314
315 }
316
317 @Override
318 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
319 {
320 LOG.debug("upgrade {} -> {}", oldConnection, endpoint.getConnection());
321 }
322
323 @Override
324 protected void endPointClosed(SelectChannelEndPoint endpoint)
325 {
326 endpoint.getConnection().onClose();
327 }
328
329 @Override
330 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
331 {
332 if (!(attachment instanceof WebSocketClient.WebSocketFuture))
333 super.connectionFailed(channel, ex, attachment);
334 else
335 {
336 __log.debug(ex);
337 WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;
338
339 future.handshakeFailed(ex);
340 }
341 }
342 }
343
344
345
346
347
348
349 class HandshakeConnection extends AbstractConnection implements AsyncConnection
350 {
351 private final AsyncEndPoint _endp;
352 private final WebSocketClient.WebSocketFuture _future;
353 private final String _key;
354 private final HttpParser _parser;
355 private String _accept;
356 private String _error;
357 private ByteArrayBuffer _handshake;
358
359 public HandshakeConnection(AsyncEndPoint endpoint, WebSocketClient.WebSocketFuture future)
360 {
361 super(endpoint, System.currentTimeMillis());
362 _endp = endpoint;
363 _future = future;
364
365 byte[] bytes = new byte[16];
366 new Random().nextBytes(bytes);
367 _key = new String(B64Code.encode(bytes));
368
369 Buffers buffers = new SimpleBuffers(_buffers.getBuffer(), null);
370 _parser = new HttpParser(buffers, _endp, new HttpParser.EventHandler()
371 {
372 @Override
373 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
374 {
375 if (status != 101)
376 {
377 _error = "Bad response status " + status + " " + reason;
378 _endp.close();
379 }
380 }
381
382 @Override
383 public void parsedHeader(Buffer name, Buffer value) throws IOException
384 {
385 if (__ACCEPT.equals(name))
386 _accept = value.toString();
387 }
388
389 @Override
390 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
391 {
392 if (_error == null)
393 _error = "Bad response: " + method + " " + url + " " + version;
394 _endp.close();
395 }
396
397 @Override
398 public void content(Buffer ref) throws IOException
399 {
400 if (_error == null)
401 _error = "Bad response. " + ref.length() + "B of content?";
402 _endp.close();
403 }
404 });
405 }
406
407 private boolean handshake()
408 {
409 if (_handshake==null)
410 {
411 String path = _future.getURI().getPath();
412 if (path == null || path.length() == 0)
413 path = "/";
414
415 if (_future.getURI().getRawQuery() != null)
416 path += "?" + _future.getURI().getRawQuery();
417
418 String origin = _future.getOrigin();
419
420 StringBuilder request = new StringBuilder(512);
421 request.append("GET ").append(path).append(" HTTP/1.1\r\n")
422 .append("Host: ").append(_future.getURI().getHost()).append(":")
423 .append(_future.getURI().getPort()).append("\r\n")
424 .append("Upgrade: websocket\r\n")
425 .append("Connection: Upgrade\r\n")
426 .append("Sec-WebSocket-Key: ")
427 .append(_key).append("\r\n");
428
429 if (origin != null)
430 request.append("Origin: ").append(origin).append("\r\n");
431
432 request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionRFC6455.VERSION).append("\r\n");
433
434 if (_future.getProtocol() != null)
435 request.append("Sec-WebSocket-Protocol: ").append(_future.getProtocol()).append("\r\n");
436
437 Map<String, String> cookies = _future.getCookies();
438 if (cookies != null && cookies.size() > 0)
439 {
440 for (String cookie : cookies.keySet())
441 request.append("Cookie: ")
442 .append(QuotedStringTokenizer.quoteIfNeeded(cookie, HttpFields.__COOKIE_DELIM))
443 .append("=")
444 .append(QuotedStringTokenizer.quoteIfNeeded(cookies.get(cookie), HttpFields.__COOKIE_DELIM))
445 .append("\r\n");
446 }
447
448 request.append("\r\n");
449
450 _handshake=new ByteArrayBuffer(request.toString(), false);
451 }
452
453
454
455 try
456 {
457 int len = _handshake.length();
458 int flushed = _endp.flush(_handshake);
459 if (flushed<0)
460 throw new IOException("incomplete handshake");
461 }
462 catch (IOException e)
463 {
464 _future.handshakeFailed(e);
465 }
466 return _handshake.length()==0;
467 }
468
469 public Connection handle() throws IOException
470 {
471 while (_endp.isOpen() && !_parser.isComplete())
472 {
473 if (_handshake==null || _handshake.length()>0)
474 if (!handshake())
475 return this;
476
477 if (!_parser.parseAvailable())
478 {
479 if (_endp.isInputShutdown())
480 _future.handshakeFailed(new IOException("Incomplete handshake response"));
481 return this;
482 }
483 }
484 if (_error == null)
485 {
486 if (_accept == null)
487 {
488 _error = "No Sec-WebSocket-Accept";
489 }
490 else if (!WebSocketConnectionRFC6455.hashKey(_key).equals(_accept))
491 {
492 _error = "Bad Sec-WebSocket-Accept";
493 }
494 else
495 {
496 WebSocketConnection connection = newWebSocketConnection();
497
498 Buffer header = _parser.getHeaderBuffer();
499 if (header.hasContent())
500 connection.fillBuffersFrom(header);
501 _buffers.returnBuffer(header);
502
503 _future.onConnection(connection);
504
505 return connection;
506 }
507 }
508
509 _endp.close();
510 return this;
511 }
512
513 private WebSocketConnection newWebSocketConnection() throws IOException
514 {
515 return new WebSocketClientConnection(
516 _future._client.getFactory(),
517 _future.getWebSocket(),
518 _endp,
519 _buffers,
520 System.currentTimeMillis(),
521 _future.getMaxIdleTime(),
522 _future.getProtocol(),
523 null,
524 WebSocketConnectionRFC6455.VERSION,
525 _future.getMaskGen());
526 }
527
528 public void onInputShutdown() throws IOException
529 {
530 _endp.close();
531 }
532
533 public boolean isIdle()
534 {
535 return false;
536 }
537
538 public boolean isSuspended()
539 {
540 return false;
541 }
542
543 public void onClose()
544 {
545 if (_error != null)
546 _future.handshakeFailed(new ProtocolException(_error));
547 else
548 _future.handshakeFailed(new EOFException());
549 }
550 }
551
552 private static class WebSocketClientConnection extends WebSocketConnectionRFC6455
553 {
554 private final WebSocketClientFactory factory;
555
556 public WebSocketClientConnection(WebSocketClientFactory factory, WebSocket webSocket, EndPoint endPoint, WebSocketBuffers buffers, long timeStamp, int maxIdleTime, String protocol, List<Extension> extensions, int draftVersion, MaskGen maskGen) throws IOException
557 {
558 super(webSocket, endPoint, buffers, timeStamp, maxIdleTime, protocol, extensions, draftVersion, maskGen);
559 this.factory = factory;
560 }
561
562 @Override
563 public void onClose()
564 {
565 super.onClose();
566 factory.removeConnection(this);
567 }
568 }
569 }