1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.net.ProtocolException;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.SocketChannel;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Queue;
29 import java.util.Random;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31
32 import javax.net.ssl.SSLEngine;
33
34 import org.eclipse.jetty.http.HttpFields;
35 import org.eclipse.jetty.http.HttpParser;
36 import org.eclipse.jetty.io.AbstractConnection;
37 import org.eclipse.jetty.io.AsyncEndPoint;
38 import org.eclipse.jetty.io.Buffer;
39 import org.eclipse.jetty.io.Buffers;
40 import org.eclipse.jetty.io.ByteArrayBuffer;
41 import org.eclipse.jetty.io.ConnectedEndPoint;
42 import org.eclipse.jetty.io.Connection;
43 import org.eclipse.jetty.io.EndPoint;
44 import org.eclipse.jetty.io.SimpleBuffers;
45 import org.eclipse.jetty.io.nio.AsyncConnection;
46 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
47 import org.eclipse.jetty.io.nio.SelectorManager;
48 import org.eclipse.jetty.io.nio.SslConnection;
49 import org.eclipse.jetty.util.B64Code;
50 import org.eclipse.jetty.util.QuotedStringTokenizer;
51 import org.eclipse.jetty.util.component.AggregateLifeCycle;
52 import org.eclipse.jetty.util.log.Logger;
53 import org.eclipse.jetty.util.ssl.SslContextFactory;
54 import org.eclipse.jetty.util.thread.QueuedThreadPool;
55 import org.eclipse.jetty.util.thread.ThreadPool;
56
57
58
59
60
61
62
63
64
65
66
67 public class WebSocketClientFactory extends AggregateLifeCycle
68 {
69 private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
70 private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
71 private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<WebSocketConnection>();
72 private final SslContextFactory _sslContextFactory = new SslContextFactory();
73 private final ThreadPool _threadPool;
74 private final WebSocketClientSelector _selector;
75 private MaskGen _maskGen;
76 private WebSocketBuffers _buffers;
77
78
79
80
81
82 public WebSocketClientFactory()
83 {
84 this(null);
85 }
86
87
88
89
90
91
92
93 public WebSocketClientFactory(ThreadPool threadPool)
94 {
95 this(threadPool, new RandomMaskGen());
96 }
97
98
99
100
101
102
103
104
105 public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen)
106 {
107 this(threadPool, maskGen, 8192);
108 }
109
110
111
112
113
114
115
116
117
118
119 public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen, int bufferSize)
120 {
121 if (threadPool == null)
122 threadPool = new QueuedThreadPool();
123 _threadPool = threadPool;
124 addBean(_threadPool);
125
126 _buffers = new WebSocketBuffers(bufferSize);
127 addBean(_buffers);
128
129 _maskGen = maskGen;
130 addBean(_maskGen);
131
132 _selector = new WebSocketClientSelector();
133 addBean(_selector);
134
135 addBean(_sslContextFactory);
136 }
137
138
139
140
141
142 public SslContextFactory getSslContextFactory()
143 {
144 return _sslContextFactory;
145 }
146
147
148
149
150
151
152
153 public SelectorManager getSelectorManager()
154 {
155 return _selector;
156 }
157
158
159
160
161
162
163
164
165 public ThreadPool getThreadPool()
166 {
167 return _threadPool;
168 }
169
170
171
172
173
174
175 public MaskGen getMaskGen()
176 {
177 return _maskGen;
178 }
179
180
181
182
183
184
185 public void setMaskGen(MaskGen maskGen)
186 {
187 if (isRunning())
188 throw new IllegalStateException(getState());
189 removeBean(_maskGen);
190 _maskGen = maskGen;
191 addBean(maskGen);
192 }
193
194
195
196
197
198
199 public void setBufferSize(int bufferSize)
200 {
201 if (isRunning())
202 throw new IllegalStateException(getState());
203 removeBean(_buffers);
204 _buffers = new WebSocketBuffers(bufferSize);
205 addBean(_buffers);
206 }
207
208
209
210
211
212 public int getBufferSize()
213 {
214 return _buffers.getBufferSize();
215 }
216
217 @Override
218 protected void doStop() throws Exception
219 {
220 closeConnections();
221 super.doStop();
222 }
223
224
225
226
227
228
229
230
231 public WebSocketClient newWebSocketClient()
232 {
233 return new WebSocketClient(this);
234 }
235
236 protected SSLEngine newSslEngine(SocketChannel channel) throws IOException
237 {
238 SSLEngine sslEngine;
239 if (channel != null)
240 {
241 String peerHost = channel.socket().getInetAddress().getHostAddress();
242 int peerPort = channel.socket().getPort();
243 sslEngine = _sslContextFactory.newSslEngine(peerHost, peerPort);
244 }
245 else
246 {
247 sslEngine = _sslContextFactory.newSslEngine();
248 }
249 sslEngine.setUseClientMode(true);
250 sslEngine.beginHandshake();
251
252 return sslEngine;
253 }
254
255 protected boolean addConnection(WebSocketConnection connection)
256 {
257 return isRunning() && connections.add(connection);
258 }
259
260 protected boolean removeConnection(WebSocketConnection connection)
261 {
262 return connections.remove(connection);
263 }
264
265 protected void closeConnections()
266 {
267 for (WebSocketConnection connection : connections)
268 connection.shutdown();
269 }
270
271
272
273
274
275 class WebSocketClientSelector extends SelectorManager
276 {
277 @Override
278 public boolean dispatch(Runnable task)
279 {
280 return _threadPool.dispatch(task);
281 }
282
283 @Override
284 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey key) throws IOException
285 {
286 WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)key.attachment();
287 int maxIdleTime = holder.getMaxIdleTime();
288 if (maxIdleTime < 0)
289 maxIdleTime = (int)getMaxIdleTime();
290 SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTime);
291 AsyncEndPoint endPoint = result;
292
293
294 if ("wss".equals(holder.getURI().getScheme()))
295 {
296 SSLEngine sslEngine = newSslEngine(channel);
297 SslConnection sslConnection = new SslConnection(sslEngine, endPoint);
298 endPoint.setConnection(sslConnection);
299 endPoint = sslConnection.getSslEndPoint();
300 }
301
302 AsyncConnection connection = selectSet.getManager().newConnection(channel, endPoint, holder);
303 endPoint.setConnection(connection);
304
305 return result;
306 }
307
308 @Override
309 public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
310 {
311 WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)attachment;
312 return new HandshakeConnection(endpoint, holder);
313 }
314
315 @Override
316 protected void endPointOpened(SelectChannelEndPoint endpoint)
317 {
318
319 }
320
321 @Override
322 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
323 {
324 LOG.debug("upgrade {} -> {}", oldConnection, endpoint.getConnection());
325 }
326
327 @Override
328 protected void endPointClosed(SelectChannelEndPoint endpoint)
329 {
330 endpoint.getConnection().onClose();
331 }
332
333 @Override
334 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
335 {
336 if (!(attachment instanceof WebSocketClient.WebSocketFuture))
337 super.connectionFailed(channel, ex, attachment);
338 else
339 {
340 __log.debug(ex);
341 WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;
342
343 future.handshakeFailed(ex);
344 }
345 }
346 }
347
348
349
350
351
352
353 class HandshakeConnection extends AbstractConnection implements AsyncConnection
354 {
355 private final AsyncEndPoint _endp;
356 private final WebSocketClient.WebSocketFuture _future;
357 private final String _key;
358 private final HttpParser _parser;
359 private String _accept;
360 private String _error;
361 private ByteArrayBuffer _handshake;
362
363 public HandshakeConnection(AsyncEndPoint endpoint, WebSocketClient.WebSocketFuture future)
364 {
365 super(endpoint, System.currentTimeMillis());
366 _endp = endpoint;
367 _future = future;
368
369 byte[] bytes = new byte[16];
370 new Random().nextBytes(bytes);
371 _key = new String(B64Code.encode(bytes));
372
373 Buffers buffers = new SimpleBuffers(_buffers.getBuffer(), null);
374 _parser = new HttpParser(buffers, _endp, new HttpParser.EventHandler()
375 {
376 @Override
377 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
378 {
379 if (status != 101)
380 {
381 _error = "Bad response status " + status + " " + reason;
382 _endp.close();
383 }
384 }
385
386 @Override
387 public void parsedHeader(Buffer name, Buffer value) throws IOException
388 {
389 if (__ACCEPT.equals(name))
390 _accept = value.toString();
391 }
392
393 @Override
394 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
395 {
396 if (_error == null)
397 _error = "Bad response: " + method + " " + url + " " + version;
398 _endp.close();
399 }
400
401 @Override
402 public void content(Buffer ref) throws IOException
403 {
404 if (_error == null)
405 _error = "Bad response. " + ref.length() + "B of content?";
406 _endp.close();
407 }
408 });
409 }
410
411 private boolean handshake()
412 {
413 if (_handshake==null)
414 {
415 String path = _future.getURI().getPath();
416 if (path == null || path.length() == 0)
417 path = "/";
418
419 if (_future.getURI().getRawQuery() != null)
420 path += "?" + _future.getURI().getRawQuery();
421
422 String origin = _future.getOrigin();
423
424 StringBuilder request = new StringBuilder(512);
425 request.append("GET ").append(path).append(" HTTP/1.1\r\n")
426 .append("Host: ").append(_future.getURI().getHost()).append(":")
427 .append(_future.getURI().getPort()).append("\r\n")
428 .append("Upgrade: websocket\r\n")
429 .append("Connection: Upgrade\r\n")
430 .append("Sec-WebSocket-Key: ")
431 .append(_key).append("\r\n");
432
433 if (origin != null)
434 request.append("Origin: ").append(origin).append("\r\n");
435
436 request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionRFC6455.VERSION).append("\r\n");
437
438 if (_future.getProtocol() != null)
439 request.append("Sec-WebSocket-Protocol: ").append(_future.getProtocol()).append("\r\n");
440
441 Map<String, String> cookies = _future.getCookies();
442 if (cookies != null && cookies.size() > 0)
443 {
444 for (String cookie : cookies.keySet())
445 request.append("Cookie: ")
446 .append(QuotedStringTokenizer.quoteIfNeeded(cookie, HttpFields.__COOKIE_DELIM))
447 .append("=")
448 .append(QuotedStringTokenizer.quoteIfNeeded(cookies.get(cookie), HttpFields.__COOKIE_DELIM))
449 .append("\r\n");
450 }
451
452 request.append("\r\n");
453
454 _handshake=new ByteArrayBuffer(request.toString(), false);
455 }
456
457
458
459 try
460 {
461 int len = _handshake.length();
462 int flushed = _endp.flush(_handshake);
463 if (flushed<0)
464 throw new IOException("incomplete handshake");
465 }
466 catch (IOException e)
467 {
468 _future.handshakeFailed(e);
469 }
470 return _handshake.length()==0;
471 }
472
473 public Connection handle() throws IOException
474 {
475 while (_endp.isOpen() && !_parser.isComplete())
476 {
477 if (_handshake==null || _handshake.length()>0)
478 if (!handshake())
479 return this;
480
481 if (!_parser.parseAvailable())
482 {
483 if (_endp.isInputShutdown())
484 _future.handshakeFailed(new IOException("Incomplete handshake response"));
485 return this;
486 }
487 }
488 if (_error == null)
489 {
490 if (_accept == null)
491 {
492 _error = "No Sec-WebSocket-Accept";
493 }
494 else if (!WebSocketConnectionRFC6455.hashKey(_key).equals(_accept))
495 {
496 _error = "Bad Sec-WebSocket-Accept";
497 }
498 else
499 {
500 WebSocketConnection connection = newWebSocketConnection();
501
502 Buffer header = _parser.getHeaderBuffer();
503 if (header.hasContent())
504 connection.fillBuffersFrom(header);
505 _buffers.returnBuffer(header);
506
507 _future.onConnection(connection);
508
509 return connection;
510 }
511 }
512
513 _endp.close();
514 return this;
515 }
516
517 private WebSocketConnection newWebSocketConnection() throws IOException
518 {
519 __log.debug("newWebSocketConnection()");
520 return new WebSocketClientConnection(
521 _future._client.getFactory(),
522 _future.getWebSocket(),
523 _endp,
524 _buffers,
525 System.currentTimeMillis(),
526 _future.getMaxIdleTime(),
527 _future.getProtocol(),
528 null,
529 WebSocketConnectionRFC6455.VERSION,
530 _future.getMaskGen());
531 }
532
533 public void onInputShutdown() throws IOException
534 {
535 _endp.close();
536 }
537
538 public boolean isIdle()
539 {
540 return false;
541 }
542
543 public boolean isSuspended()
544 {
545 return false;
546 }
547
548 public void onClose()
549 {
550 if (_error != null)
551 _future.handshakeFailed(new ProtocolException(_error));
552 else
553 _future.handshakeFailed(new EOFException());
554 }
555 }
556
557 private static class WebSocketClientConnection extends WebSocketConnectionRFC6455
558 {
559 private final WebSocketClientFactory factory;
560
561 public WebSocketClientConnection(WebSocketClientFactory factory, WebSocket webSocket, EndPoint endPoint, WebSocketBuffers buffers, long timeStamp, int maxIdleTime, String protocol, List<Extension> extensions, int draftVersion, MaskGen maskGen) throws IOException
562 {
563 super(webSocket, endPoint, buffers, timeStamp, maxIdleTime, protocol, extensions, draftVersion, maskGen);
564 this.factory = factory;
565 }
566
567 @Override
568 public void onClose()
569 {
570 super.onClose();
571 factory.removeConnection(this);
572 }
573 }
574 }