1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.eclipse.jetty.websocket;
17
18 import java.io.EOFException;
19 import java.io.IOException;
20 import java.net.ProtocolException;
21 import java.nio.channels.SelectionKey;
22 import java.nio.channels.SocketChannel;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Queue;
26 import java.util.Random;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import javax.net.ssl.SSLEngine;
29
30 import org.eclipse.jetty.http.HttpFields;
31 import org.eclipse.jetty.http.HttpParser;
32 import org.eclipse.jetty.io.AbstractConnection;
33 import org.eclipse.jetty.io.AsyncEndPoint;
34 import org.eclipse.jetty.io.Buffer;
35 import org.eclipse.jetty.io.Buffers;
36 import org.eclipse.jetty.io.ByteArrayBuffer;
37 import org.eclipse.jetty.io.ConnectedEndPoint;
38 import org.eclipse.jetty.io.Connection;
39 import org.eclipse.jetty.io.EndPoint;
40 import org.eclipse.jetty.io.SimpleBuffers;
41 import org.eclipse.jetty.io.nio.AsyncConnection;
42 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
43 import org.eclipse.jetty.io.nio.SelectorManager;
44 import org.eclipse.jetty.io.nio.SslConnection;
45 import org.eclipse.jetty.util.B64Code;
46 import org.eclipse.jetty.util.QuotedStringTokenizer;
47 import org.eclipse.jetty.util.component.AggregateLifeCycle;
48 import org.eclipse.jetty.util.log.Logger;
49 import org.eclipse.jetty.util.ssl.SslContextFactory;
50 import org.eclipse.jetty.util.thread.QueuedThreadPool;
51 import org.eclipse.jetty.util.thread.ThreadPool;
52
53
54
55
56
57
58
59
60
61
62
63 public class WebSocketClientFactory extends AggregateLifeCycle
64 {
65 private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
66 private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
67 private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<WebSocketConnection>();
68 private final SslContextFactory _sslContextFactory = new SslContextFactory();
69 private final ThreadPool _threadPool;
70 private final WebSocketClientSelector _selector;
71 private MaskGen _maskGen;
72 private WebSocketBuffers _buffers;
73
74
75
76
77
78 public WebSocketClientFactory()
79 {
80 this(new QueuedThreadPool());
81 }
82
83
84
85
86
87
88
89 public WebSocketClientFactory(ThreadPool threadPool)
90 {
91 this(threadPool, new RandomMaskGen());
92 }
93
94
95
96
97
98
99
100
101 public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen)
102 {
103 this(threadPool, maskGen, 8192);
104 }
105
106
107
108
109
110
111
112
113
114
115 public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen, int bufferSize)
116 {
117 _threadPool = threadPool;
118 addBean(threadPool);
119 _buffers = new WebSocketBuffers(bufferSize);
120 addBean(_buffers);
121 _maskGen = maskGen;
122 addBean(_maskGen);
123 _selector = new WebSocketClientSelector();
124 addBean(_selector);
125 addBean(_sslContextFactory);
126 }
127
128
129
130
131
132 public SslContextFactory getSslContextFactory()
133 {
134 return _sslContextFactory;
135 }
136
137
138
139
140
141
142
143 public SelectorManager getSelectorManager()
144 {
145 return _selector;
146 }
147
148
149
150
151
152
153
154
155 public ThreadPool getThreadPool()
156 {
157 return _threadPool;
158 }
159
160
161
162
163
164
165 public MaskGen getMaskGen()
166 {
167 return _maskGen;
168 }
169
170
171
172
173
174
175 public void setMaskGen(MaskGen maskGen)
176 {
177 if (isRunning())
178 throw new IllegalStateException(getState());
179 removeBean(_maskGen);
180 _maskGen = maskGen;
181 addBean(maskGen);
182 }
183
184
185
186
187
188
189 public void setBufferSize(int bufferSize)
190 {
191 if (isRunning())
192 throw new IllegalStateException(getState());
193 removeBean(_buffers);
194 _buffers = new WebSocketBuffers(bufferSize);
195 addBean(_buffers);
196 }
197
198
199
200
201
202 public int getBufferSize()
203 {
204 return _buffers.getBufferSize();
205 }
206
207 @Override
208 protected void doStop() throws Exception
209 {
210 closeConnections();
211 }
212
213
214
215
216
217
218
219
220 public WebSocketClient newWebSocketClient()
221 {
222 return new WebSocketClient(this);
223 }
224
225 protected SSLEngine newSslEngine(SocketChannel channel) throws IOException
226 {
227 SSLEngine sslEngine;
228 if (channel != null)
229 {
230 String peerHost = channel.socket().getInetAddress().getHostAddress();
231 int peerPort = channel.socket().getPort();
232 sslEngine = _sslContextFactory.newSslEngine(peerHost, peerPort);
233 }
234 else
235 {
236 sslEngine = _sslContextFactory.newSslEngine();
237 }
238 sslEngine.setUseClientMode(true);
239 sslEngine.beginHandshake();
240
241 return sslEngine;
242 }
243
244 protected boolean addConnection(WebSocketConnection connection)
245 {
246 return isRunning() && connections.add(connection);
247 }
248
249 protected boolean removeConnection(WebSocketConnection connection)
250 {
251 return connections.remove(connection);
252 }
253
254 protected void closeConnections()
255 {
256 for (WebSocketConnection connection : connections)
257 connection.shutdown();
258 }
259
260
261
262
263
264 class WebSocketClientSelector extends SelectorManager
265 {
266 @Override
267 public boolean dispatch(Runnable task)
268 {
269 return _threadPool.dispatch(task);
270 }
271
272 @Override
273 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey key) throws IOException
274 {
275 WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)key.attachment();
276 int maxIdleTime = holder.getMaxIdleTime();
277 if (maxIdleTime < 0)
278 maxIdleTime = (int)getMaxIdleTime();
279 SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTime);
280 AsyncEndPoint endPoint = result;
281
282
283 if ("wss".equals(holder.getURI().getScheme()))
284 {
285 SSLEngine sslEngine = newSslEngine(channel);
286 SslConnection sslConnection = new SslConnection(sslEngine, endPoint);
287 endPoint.setConnection(sslConnection);
288 endPoint = sslConnection.getSslEndPoint();
289 }
290
291 AsyncConnection connection = selectSet.getManager().newConnection(channel, endPoint, holder);
292 endPoint.setConnection(connection);
293
294 return result;
295 }
296
297 @Override
298 public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
299 {
300 WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)attachment;
301 return new HandshakeConnection(endpoint, holder);
302 }
303
304 @Override
305 protected void endPointOpened(SelectChannelEndPoint endpoint)
306 {
307
308 }
309
310 @Override
311 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
312 {
313 LOG.debug("upgrade {} -> {}", oldConnection, endpoint.getConnection());
314 }
315
316 @Override
317 protected void endPointClosed(SelectChannelEndPoint endpoint)
318 {
319 endpoint.getConnection().onClose();
320 }
321
322 @Override
323 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
324 {
325 if (!(attachment instanceof WebSocketClient.WebSocketFuture))
326 super.connectionFailed(channel, ex, attachment);
327 else
328 {
329 __log.debug(ex);
330 WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;
331
332 future.handshakeFailed(ex);
333 }
334 }
335 }
336
337
338
339
340
341
342 class HandshakeConnection extends AbstractConnection implements AsyncConnection
343 {
344 private final AsyncEndPoint _endp;
345 private final WebSocketClient.WebSocketFuture _future;
346 private final String _key;
347 private final HttpParser _parser;
348 private String _accept;
349 private String _error;
350 private boolean _handshaken;
351
352 public HandshakeConnection(AsyncEndPoint endpoint, WebSocketClient.WebSocketFuture future)
353 {
354 super(endpoint, System.currentTimeMillis());
355 _endp = endpoint;
356 _future = future;
357
358 byte[] bytes = new byte[16];
359 new Random().nextBytes(bytes);
360 _key = new String(B64Code.encode(bytes));
361
362 Buffers buffers = new SimpleBuffers(_buffers.getBuffer(), null);
363 _parser = new HttpParser(buffers, _endp, new HttpParser.EventHandler()
364 {
365 @Override
366 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
367 {
368 if (status != 101)
369 {
370 _error = "Bad response status " + status + " " + reason;
371 _endp.close();
372 }
373 }
374
375 @Override
376 public void parsedHeader(Buffer name, Buffer value) throws IOException
377 {
378 if (__ACCEPT.equals(name))
379 _accept = value.toString();
380 }
381
382 @Override
383 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
384 {
385 if (_error == null)
386 _error = "Bad response: " + method + " " + url + " " + version;
387 _endp.close();
388 }
389
390 @Override
391 public void content(Buffer ref) throws IOException
392 {
393 if (_error == null)
394 _error = "Bad response. " + ref.length() + "B of content?";
395 _endp.close();
396 }
397 });
398 }
399
400 private void handshake()
401 {
402 String path = _future.getURI().getPath();
403 if (path == null || path.length() == 0)
404 path = "/";
405
406 if (_future.getURI().getRawQuery() != null)
407 path += "?" + _future.getURI().getRawQuery();
408
409 String origin = _future.getOrigin();
410
411 StringBuilder request = new StringBuilder(512);
412 request.append("GET ").append(path).append(" HTTP/1.1\r\n")
413 .append("Host: ").append(_future.getURI().getHost()).append(":")
414 .append(_future.getURI().getPort()).append("\r\n")
415 .append("Upgrade: websocket\r\n")
416 .append("Connection: Upgrade\r\n")
417 .append("Sec-WebSocket-Key: ")
418 .append(_key).append("\r\n");
419
420 if (origin != null)
421 request.append("Origin: ").append(origin).append("\r\n");
422
423 request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionRFC6455.VERSION).append("\r\n");
424
425 if (_future.getProtocol() != null)
426 request.append("Sec-WebSocket-Protocol: ").append(_future.getProtocol()).append("\r\n");
427
428 Map<String, String> cookies = _future.getCookies();
429 if (cookies != null && cookies.size() > 0)
430 {
431 for (String cookie : cookies.keySet())
432 request.append("Cookie: ")
433 .append(QuotedStringTokenizer.quoteIfNeeded(cookie, HttpFields.__COOKIE_DELIM))
434 .append("=")
435 .append(QuotedStringTokenizer.quoteIfNeeded(cookies.get(cookie), HttpFields.__COOKIE_DELIM))
436 .append("\r\n");
437 }
438
439 request.append("\r\n");
440
441
442
443 try
444 {
445 Buffer handshake = new ByteArrayBuffer(request.toString(), false);
446 int len = handshake.length();
447 if (len != _endp.flush(handshake))
448 throw new IOException("incomplete");
449 }
450 catch (IOException e)
451 {
452 _future.handshakeFailed(e);
453 }
454 finally
455 {
456 _handshaken = true;
457 }
458 }
459
460 public Connection handle() throws IOException
461 {
462 while (_endp.isOpen() && !_parser.isComplete())
463 {
464 if (!_handshaken)
465 handshake();
466
467 if (!_parser.parseAvailable())
468 {
469 if (_endp.isInputShutdown())
470 _future.handshakeFailed(new IOException("Incomplete handshake response"));
471 return this;
472 }
473 }
474 if (_error == null)
475 {
476 if (_accept == null)
477 {
478 _error = "No Sec-WebSocket-Accept";
479 }
480 else if (!WebSocketConnectionRFC6455.hashKey(_key).equals(_accept))
481 {
482 _error = "Bad Sec-WebSocket-Accept";
483 }
484 else
485 {
486 WebSocketConnection connection = newWebSocketConnection();
487
488 Buffer header = _parser.getHeaderBuffer();
489 if (header.hasContent())
490 connection.fillBuffersFrom(header);
491 _buffers.returnBuffer(header);
492
493 _future.onConnection(connection);
494
495 return connection;
496 }
497 }
498
499 _endp.close();
500 return this;
501 }
502
503 private WebSocketConnection newWebSocketConnection() throws IOException
504 {
505 return new WebSocketClientConnection(
506 _future._client.getFactory(),
507 _future.getWebSocket(),
508 _endp,
509 _buffers,
510 System.currentTimeMillis(),
511 _future.getMaxIdleTime(),
512 _future.getProtocol(),
513 null,
514 WebSocketConnectionRFC6455.VERSION,
515 _future.getMaskGen());
516 }
517
518 public void onInputShutdown() throws IOException
519 {
520 _endp.close();
521 }
522
523 public boolean isIdle()
524 {
525 return false;
526 }
527
528 public boolean isSuspended()
529 {
530 return false;
531 }
532
533 public void onClose()
534 {
535 if (_error != null)
536 _future.handshakeFailed(new ProtocolException(_error));
537 else
538 _future.handshakeFailed(new EOFException());
539 }
540 }
541
542 private static class WebSocketClientConnection extends WebSocketConnectionRFC6455
543 {
544 private final WebSocketClientFactory factory;
545
546 public WebSocketClientConnection(WebSocketClientFactory factory, WebSocket webSocket, EndPoint endPoint, WebSocketBuffers buffers, long timeStamp, int maxIdleTime, String protocol, List<Extension> extensions, int draftVersion, MaskGen maskGen) throws IOException
547 {
548 super(webSocket, endPoint, buffers, timeStamp, maxIdleTime, protocol, extensions, draftVersion, maskGen);
549 this.factory = factory;
550 }
551
552 @Override
553 public void onClose()
554 {
555 super.onClose();
556 factory.removeConnection(this);
557 }
558 }
559 }