1 package org.eclipse.jetty.websocket;
2
3 import java.io.EOFException;
4 import java.io.IOException;
5 import java.net.InetSocketAddress;
6 import java.net.ProtocolException;
7 import java.net.URI;
8 import java.nio.channels.ByteChannel;
9 import java.nio.channels.SelectionKey;
10 import java.nio.channels.SocketChannel;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Random;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.CopyOnWriteArrayList;
16 import java.util.concurrent.CountDownLatch;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.Future;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.TimeoutException;
21
22 import org.eclipse.jetty.http.HttpFields;
23 import org.eclipse.jetty.http.HttpParser;
24 import org.eclipse.jetty.io.AbstractConnection;
25 import org.eclipse.jetty.io.Buffer;
26 import org.eclipse.jetty.io.Buffers;
27 import org.eclipse.jetty.io.ByteArrayBuffer;
28 import org.eclipse.jetty.io.ConnectedEndPoint;
29 import org.eclipse.jetty.io.Connection;
30 import org.eclipse.jetty.io.SimpleBuffers;
31 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
32 import org.eclipse.jetty.io.nio.SelectorManager;
33 import org.eclipse.jetty.util.B64Code;
34 import org.eclipse.jetty.util.QuotedStringTokenizer;
35 import org.eclipse.jetty.util.component.AggregateLifeCycle;
36 import org.eclipse.jetty.util.log.Logger;
37 import org.eclipse.jetty.util.thread.QueuedThreadPool;
38 import org.eclipse.jetty.util.thread.ThreadPool;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public class WebSocketClient extends AggregateLifeCycle
72 {
73 private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClient.class.getCanonicalName());
74 private final static Random __random = new Random();
75 private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
76
77 private final WebSocketClient _root;
78 private final WebSocketClient _parent;
79 private final ThreadPool _threadPool;
80 private final WebSocketClientSelector _selector;
81
82 private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
83 private final List<String> _extensions=new CopyOnWriteArrayList<String>();
84
85 private int _bufferSize=64*1024;
86 private String _protocol;
87 private int _maxIdleTime=-1;
88
89 private WebSocketBuffers _buffers;
90
91
92
93
94
95 public WebSocketClient()
96 {
97 this(new QueuedThreadPool());
98 }
99
100
101
102
103
104 public WebSocketClient(ThreadPool threadpool)
105 {
106 _root=this;
107 _parent=null;
108 _threadPool=threadpool;
109 _selector=new WebSocketClientSelector();
110 addBean(_selector);
111 addBean(_threadPool);
112 }
113
114
115
116
117
118
119
120 public WebSocketClient(WebSocketClient parent)
121 {
122 _root=parent._root;
123 _parent=parent;
124 _threadPool=parent._threadPool;
125 _selector=parent._selector;
126 _parent.addBean(this);
127 }
128
129
130
131
132
133
134 public SelectorManager getSelectorManager()
135 {
136 return _selector;
137 }
138
139
140
141
142
143
144 public ThreadPool getThreadPool()
145 {
146 return _threadPool;
147 }
148
149
150
151
152
153 public int getMaxIdleTime()
154 {
155 return _maxIdleTime;
156 }
157
158
159
160
161
162 public void setMaxIdleTime(int maxIdleTime)
163 {
164 _maxIdleTime=maxIdleTime;
165 }
166
167
168
169
170
171 public int getBufferSize()
172 {
173 return _bufferSize;
174 }
175
176
177
178
179
180 public void setBufferSize(int bufferSize)
181 {
182 if (isRunning())
183 throw new IllegalStateException(getState());
184 _bufferSize = bufferSize;
185 }
186
187
188
189
190
191 public String getProtocol()
192 {
193 return _protocol;
194 }
195
196
197
198
199
200 public void setProtocol(String protocol)
201 {
202 _protocol = protocol;
203 }
204
205
206 public Map<String,String> getCookies()
207 {
208 return _cookies;
209 }
210
211
212 public List<String> getExtensions()
213 {
214 return _extensions;
215 }
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230 public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
231 {
232 try
233 {
234 return open(uri,websocket).get(maxConnectTime,units);
235 }
236 catch (ExecutionException e)
237 {
238 Throwable cause = e.getCause();
239 if (cause instanceof IOException)
240 throw (IOException)cause;
241 if (cause instanceof Error)
242 throw (Error)cause;
243 if (cause instanceof RuntimeException)
244 throw (RuntimeException)cause;
245 throw new RuntimeException(cause);
246 }
247 }
248
249
250
251
252
253
254
255
256
257
258
259 public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
260 {
261 if (!isStarted())
262 throw new IllegalStateException("!started");
263 String scheme=uri.getScheme();
264 if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
265 throw new IllegalArgumentException("Bad WebSocket scheme '"+scheme+"'");
266 if ("wss".equalsIgnoreCase(scheme))
267 throw new IOException("wss not supported");
268
269 SocketChannel channel = SocketChannel.open();
270 channel.socket().setTcpNoDelay(true);
271 int maxIdleTime = getMaxIdleTime();
272 if (maxIdleTime<0)
273 maxIdleTime=(int)_selector.getMaxIdleTime();
274 if (maxIdleTime>0)
275 channel.socket().setSoTimeout(maxIdleTime);
276
277 InetSocketAddress address=new InetSocketAddress(uri.getHost(),uri.getPort());
278
279 final WebSocketFuture holder=new WebSocketFuture(websocket,uri,_protocol,maxIdleTime,_cookies,_extensions,channel);
280
281 channel.configureBlocking(false);
282 channel.connect(address);
283 _selector.register( channel, holder);
284
285 return holder;
286 }
287
288
289 @Override
290 protected void doStart() throws Exception
291 {
292 if (_parent!=null && !_parent.isRunning())
293 throw new IllegalStateException("parent:"+getState());
294
295 _buffers = new WebSocketBuffers(_bufferSize);
296
297 super.doStart();
298
299
300 if (_parent==null)
301 {
302 for (int i=0;i<_selector.getSelectSets();i++)
303 {
304 final int id=i;
305 _threadPool.dispatch(new Runnable()
306 {
307 public void run()
308 {
309 while(isRunning())
310 {
311 try
312 {
313 _selector.doSelect(id);
314 }
315 catch (IOException e)
316 {
317 __log.warn(e);
318 }
319 }
320 }
321 });
322 }
323 }
324 }
325
326
327
328
329
330 class WebSocketClientSelector extends SelectorManager
331 {
332 @Override
333 public boolean dispatch(Runnable task)
334 {
335 return _threadPool.dispatch(task);
336 }
337
338 @Override
339 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey sKey) throws IOException
340 {
341 return new SelectChannelEndPoint(channel,selectSet,sKey);
342 }
343
344 @Override
345 protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
346 {
347 WebSocketFuture holder = (WebSocketFuture) endpoint.getSelectionKey().attachment();
348 return new HandshakeConnection(endpoint,holder);
349 }
350
351 @Override
352 protected void endPointOpened(SelectChannelEndPoint endpoint)
353 {
354
355 }
356
357 @Override
358 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
359 {
360 throw new IllegalStateException();
361 }
362
363 @Override
364 protected void endPointClosed(SelectChannelEndPoint endpoint)
365 {
366 endpoint.getConnection().closed();
367 }
368
369 @Override
370 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
371 {
372 if (!(attachment instanceof WebSocketFuture))
373 super.connectionFailed(channel,ex,attachment);
374 else
375 {
376 __log.debug(ex);
377 WebSocketFuture holder = (WebSocketFuture)attachment;
378
379 holder.handshakeFailed(ex);
380 }
381 }
382 }
383
384
385
386
387
388
389 class HandshakeConnection extends AbstractConnection
390 {
391 private final SelectChannelEndPoint _endp;
392 private final WebSocketFuture _holder;
393 private final String _key;
394 private final HttpParser _parser;
395 private String _accept;
396 private String _error;
397
398
399 public HandshakeConnection(SelectChannelEndPoint endpoint, WebSocketFuture holder)
400 {
401 super(endpoint,System.currentTimeMillis());
402 _endp=endpoint;
403 _holder=holder;
404
405 byte[] bytes=new byte[16];
406 __random.nextBytes(bytes);
407 _key=new String(B64Code.encode(bytes));
408
409 Buffers buffers = new SimpleBuffers(_buffers.getBuffer(),null);
410 _parser=new HttpParser(buffers,_endp,
411
412 new HttpParser.EventHandler()
413 {
414 @Override
415 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
416 {
417 if (status!=101)
418 {
419 _error="Bad response status "+status+" "+reason;
420 _endp.close();
421 }
422 }
423
424 @Override
425 public void parsedHeader(Buffer name, Buffer value) throws IOException
426 {
427 if (__ACCEPT.equals(name))
428 _accept=value.toString();
429 }
430
431 @Override
432 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
433 {
434 if (_error==null)
435 _error="Bad response: "+method+" "+url+" "+version;
436 _endp.close();
437 }
438
439 @Override
440 public void content(Buffer ref) throws IOException
441 {
442 if (_error==null)
443 _error="Bad response. "+ref.length()+"B of content?";
444 _endp.close();
445 }
446 });
447
448 String path=_holder.getURI().getPath();
449 if (path==null || path.length()==0)
450 path="/";
451
452 String request=
453 "GET "+path+" HTTP/1.1\r\n"+
454 "Host: "+holder.getURI().getHost()+":"+_holder.getURI().getPort()+"\r\n"+
455 "Upgrade: websocket\r\n"+
456 "Connection: Upgrade\r\n"+
457 "Sec-WebSocket-Key: "+_key+"\r\n"+
458 "Sec-WebSocket-Origin: http://example.com\r\n"+
459 "Sec-WebSocket-Version: "+WebSocketConnectionD10.VERSION+"\r\n";
460
461 if (holder.getProtocol()!=null)
462 request+="Sec-WebSocket-Protocol: "+holder.getProtocol()+"\r\n";
463
464 if (holder.getCookies()!=null && holder.getCookies().size()>0)
465 {
466 for (String cookie : holder.getCookies().keySet())
467 request+="Cookie: "+QuotedStringTokenizer.quoteIfNeeded(cookie,HttpFields.__COOKIE_DELIM)+
468 "="+
469 QuotedStringTokenizer.quoteIfNeeded(holder.getCookies().get(cookie),HttpFields.__COOKIE_DELIM)+
470 "\r\n";
471 }
472
473 request+="\r\n";
474
475
476
477 try
478 {
479 Buffer handshake = new ByteArrayBuffer(request,false);
480 int len=handshake.length();
481 if (len!=_endp.flush(handshake))
482 throw new IOException("incomplete");
483 }
484 catch(IOException e)
485 {
486 holder.handshakeFailed(e);
487 }
488
489 }
490
491 public Connection handle() throws IOException
492 {
493 while (_endp.isOpen() && !_parser.isComplete())
494 {
495 switch (_parser.parseAvailable())
496 {
497 case -1:
498 _holder.handshakeFailed(new IOException("Incomplete handshake response"));
499 return this;
500 case 0:
501 return this;
502 default:
503 break;
504 }
505 }
506 if (_error==null)
507 {
508 if (_accept==null)
509 _error="No Sec-WebSocket-Accept";
510 else if (!WebSocketConnectionD10.hashKey(_key).equals(_accept))
511 _error="Bad Sec-WebSocket-Accept";
512 else
513 {
514 Buffer header=_parser.getHeaderBuffer();
515 WebSocketConnectionD10 connection = new WebSocketConnectionD10(_holder.getWebSocket(),_endp,_buffers,System.currentTimeMillis(),_holder.getMaxIdleTime(),_holder.getProtocol(),null,10, new WebSocketGeneratorD10.RandomMaskGen());
516
517 if (header.hasContent())
518 connection.fillBuffersFrom(header);
519 _buffers.returnBuffer(header);
520
521 _holder.onConnection(connection);
522
523 return connection;
524 }
525 }
526
527 _endp.close();
528 return this;
529 }
530
531 public boolean isIdle()
532 {
533 return false;
534 }
535
536 public boolean isSuspended()
537 {
538 return false;
539 }
540
541 public void closed()
542 {
543 if (_error!=null)
544 _holder.handshakeFailed(new ProtocolException(_error));
545 else
546 _holder.handshakeFailed(new EOFException());
547 }
548 }
549
550
551
552
553
554 class WebSocketFuture implements Future<WebSocket.Connection>
555 {
556 final WebSocket _websocket;;
557 final URI _uri;
558 final String _protocol;
559 final int _maxIdleTime;
560 final Map<String,String> _cookies;
561 final List<String> _extensions;
562 final CountDownLatch _done = new CountDownLatch(1);
563
564 ByteChannel _channel;
565 WebSocketConnection _connection;
566 Throwable _exception;
567
568 public WebSocketFuture(WebSocket websocket, URI uri, String protocol, int maxIdleTime, Map<String,String> cookies,List<String> extensions, ByteChannel channel)
569 {
570 _websocket=websocket;
571 _uri=uri;
572 _protocol=protocol;
573 _maxIdleTime=maxIdleTime;
574 _cookies=cookies;
575 _extensions=extensions;
576 _channel=channel;
577 }
578
579 public void onConnection(WebSocketConnection connection)
580 {
581 try
582 {
583 synchronized (this)
584 {
585 if (_channel!=null)
586 _connection=connection;
587 }
588
589 if (_connection!=null)
590 {
591 if (_websocket instanceof WebSocket.OnFrame)
592 ((WebSocket.OnFrame)_websocket).onHandshake((WebSocket.FrameConnection)connection.getConnection());
593
594 _websocket.onOpen(connection.getConnection());
595
596 }
597 }
598 finally
599 {
600 _done.countDown();
601 }
602 }
603
604 public void handshakeFailed(Throwable ex)
605 {
606 try
607 {
608 ByteChannel channel=null;
609 synchronized (this)
610 {
611 if (_channel!=null)
612 {
613 channel=_channel;
614 _channel=null;
615 _exception=ex;
616 }
617 }
618
619 if (channel!=null)
620 {
621 if (ex instanceof ProtocolException)
622 closeChannel(channel,WebSocketConnectionD10.CLOSE_PROTOCOL,ex.getMessage());
623 else
624 closeChannel(channel,WebSocketConnectionD10.CLOSE_NOCLOSE,ex.getMessage());
625 }
626 }
627 finally
628 {
629 _done.countDown();
630 }
631 }
632
633 public Map<String,String> getCookies()
634 {
635 return _cookies;
636 }
637
638 public String getProtocol()
639 {
640 return _protocol;
641 }
642
643 public WebSocket getWebSocket()
644 {
645 return _websocket;
646 }
647
648 public URI getURI()
649 {
650 return _uri;
651 }
652
653 public int getMaxIdleTime()
654 {
655 return _maxIdleTime;
656 }
657
658 public String toString()
659 {
660 return "[" + _uri + ","+_websocket+"]@"+hashCode();
661 }
662
663 public boolean cancel(boolean mayInterruptIfRunning)
664 {
665 try
666 {
667 ByteChannel channel=null;
668 synchronized (this)
669 {
670 if (_connection==null && _exception==null && _channel!=null)
671 {
672 channel=_channel;
673 _channel=null;
674 }
675 }
676
677 if (channel!=null)
678 {
679 closeChannel(channel,WebSocketConnectionD10.CLOSE_NOCLOSE,"cancelled");
680 return true;
681 }
682 return false;
683 }
684 finally
685 {
686 _done.countDown();
687 }
688 }
689
690 public boolean isCancelled()
691 {
692 synchronized (this)
693 {
694 return _channel==null && _connection==null;
695 }
696 }
697
698 public boolean isDone()
699 {
700 synchronized (this)
701 {
702 return _connection!=null && _exception==null;
703 }
704 }
705
706 public org.eclipse.jetty.websocket.WebSocket.Connection get() throws InterruptedException, ExecutionException
707 {
708 try
709 {
710 return get(Long.MAX_VALUE,TimeUnit.SECONDS);
711 }
712 catch(TimeoutException e)
713 {
714 throw new IllegalStateException("The universe has ended",e);
715 }
716 }
717
718 public org.eclipse.jetty.websocket.WebSocket.Connection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
719 TimeoutException
720 {
721 _done.await(timeout,unit);
722 ByteChannel channel=null;
723 org.eclipse.jetty.websocket.WebSocket.Connection connection=null;
724 Throwable exception=null;
725 synchronized (this)
726 {
727 exception=_exception;
728 if (_connection==null)
729 {
730 exception=_exception;
731 channel=_channel;
732 _channel=null;
733 }
734 else
735 connection=_connection.getConnection();
736 }
737
738 if (channel!=null)
739 closeChannel(channel,WebSocketConnectionD10.CLOSE_NOCLOSE,"timeout");
740 if (exception!=null)
741 throw new ExecutionException(exception);
742 if (connection!=null)
743 return connection;
744 throw new TimeoutException();
745 }
746
747 private void closeChannel(ByteChannel channel,int code, String message)
748 {
749 try
750 {
751 _websocket.onClose(code,message);
752 }
753 catch(Exception e)
754 {
755 __log.warn(e);
756 }
757
758 try
759 {
760 channel.close();
761 }
762 catch(IOException e)
763 {
764 __log.debug(e);
765 }
766 }
767 }
768
769 }