1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.websocket;
15
16 import java.io.IOException;
17 import java.io.UnsupportedEncodingException;
18 import java.security.MessageDigest;
19 import java.util.Collections;
20 import java.util.List;
21
22 import javax.servlet.http.HttpServletRequest;
23 import javax.servlet.http.HttpServletResponse;
24
25 import org.eclipse.jetty.io.AbstractConnection;
26 import org.eclipse.jetty.io.AsyncEndPoint;
27 import org.eclipse.jetty.io.Buffer;
28 import org.eclipse.jetty.io.ByteArrayBuffer;
29 import org.eclipse.jetty.io.Connection;
30 import org.eclipse.jetty.io.EndPoint;
31 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
32 import org.eclipse.jetty.util.B64Code;
33 import org.eclipse.jetty.util.StringUtil;
34 import org.eclipse.jetty.util.Utf8StringBuilder;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37 import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
38 import org.eclipse.jetty.websocket.WebSocket.OnControl;
39 import org.eclipse.jetty.websocket.WebSocket.OnFrame;
40 import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
41
42 public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection
43 {
44 private static final Logger LOG = Log.getLogger(WebSocketConnectionD06.class);
45
46 final static byte OP_CONTINUATION = 0x00;
47 final static byte OP_CLOSE = 0x01;
48 final static byte OP_PING = 0x02;
49 final static byte OP_PONG = 0x03;
50 final static byte OP_TEXT = 0x04;
51 final static byte OP_BINARY = 0x05;
52
53 final static int CLOSE_NORMAL=1000;
54 final static int CLOSE_SHUTDOWN=1001;
55 final static int CLOSE_PROTOCOL=1002;
56 final static int CLOSE_BADDATA=1003;
57 final static int CLOSE_LARGE=1004;
58
59 static boolean isLastFrame(int flags)
60 {
61 return (flags&0x8)!=0;
62 }
63
64 static boolean isControlFrame(int opcode)
65 {
66 switch(opcode)
67 {
68 case OP_CLOSE:
69 case OP_PING:
70 case OP_PONG:
71 return true;
72 default:
73 return false;
74 }
75 }
76
77
78 private final static byte[] MAGIC;
79 private final IdleCheck _idle;
80 private final WebSocketParser _parser;
81 private final WebSocketGenerator _generator;
82 private final WebSocket _webSocket;
83 private final OnFrame _onFrame;
84 private final OnBinaryMessage _onBinaryMessage;
85 private final OnTextMessage _onTextMessage;
86 private final OnControl _onControl;
87 private final String _protocol;
88 private boolean _closedIn;
89 private boolean _closedOut;
90 private int _maxTextMessageSize;
91 private int _maxBinaryMessageSize=-1;
92
93 static
94 {
95 try
96 {
97 MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
98 }
99 catch (UnsupportedEncodingException e)
100 {
101 throw new RuntimeException(e);
102 }
103 }
104
105 private final WebSocketParser.FrameHandler _frameHandler= new FrameHandlerD06();
106
107
108
109
110 private final WebSocket.FrameConnection _connection = new FrameConnectionD06();
111
112
113
114 public WebSocketConnectionD06(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
115 throws IOException
116 {
117 super(endpoint,timestamp);
118
119
120 if (endpoint instanceof AsyncEndPoint)
121 ((AsyncEndPoint)endpoint).cancelIdle();
122
123 _endp.setMaxIdleTime(maxIdleTime);
124
125 _webSocket = websocket;
126 _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
127 _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
128 _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
129 _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
130 _generator = new WebSocketGeneratorD06(buffers, _endp,null);
131 _parser = new WebSocketParserD06(buffers, endpoint, _frameHandler,true);
132 _protocol=protocol;
133
134
135 if (_endp instanceof SelectChannelEndPoint)
136 {
137 final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
138 scep.cancelIdle();
139 _idle=new IdleCheck()
140 {
141 public void access(EndPoint endp)
142 {
143 scep.scheduleIdle();
144 }
145 };
146 scep.scheduleIdle();
147 }
148 else
149 {
150 _idle = new IdleCheck()
151 {
152 public void access(EndPoint endp)
153 {}
154 };
155 }
156
157 _maxTextMessageSize=buffers.getBufferSize();
158 _maxBinaryMessageSize=-1;
159 }
160
161
162 public WebSocket.Connection getConnection()
163 {
164 return _connection;
165 }
166
167
168 public Connection handle() throws IOException
169 {
170 try
171 {
172
173 boolean progress=true;
174
175 while (progress)
176 {
177 int flushed=_generator.flush();
178 int filled=_parser.parseNext();
179
180 progress = flushed>0 || filled>0;
181
182 if (filled<0 || flushed<0)
183 {
184 _endp.close();
185 break;
186 }
187 }
188 }
189 catch(IOException e)
190 {
191 try
192 {
193 _endp.close();
194 }
195 catch(IOException e2)
196 {
197 LOG.ignore(e2);
198 }
199 throw e;
200 }
201 finally
202 {
203 if (_endp.isOpen())
204 {
205 _idle.access(_endp);
206 if (_closedIn && _closedOut && _generator.isBufferEmpty())
207 _endp.close();
208 else if (_endp.isInputShutdown() && !_closedIn)
209 closeIn(CLOSE_PROTOCOL,null);
210 else
211 checkWriteable();
212 }
213
214 }
215 return this;
216 }
217
218
219 public boolean isIdle()
220 {
221 return _parser.isBufferEmpty() && _generator.isBufferEmpty();
222 }
223
224
225 @Override
226 public void idleExpired()
227 {
228 closeOut(WebSocketConnectionD06.CLOSE_NORMAL,"Idle");
229 }
230
231
232 public boolean isSuspended()
233 {
234 return false;
235 }
236
237
238 public void closed()
239 {
240 _webSocket.onClose(WebSocketConnectionD06.CLOSE_NORMAL,"");
241 }
242
243
244 public synchronized void closeIn(int code,String message)
245 {
246 LOG.debug("ClosedIn {} {}",this,message);
247 try
248 {
249 if (_closedOut)
250 _endp.close();
251 else
252 closeOut(code,message);
253 }
254 catch(IOException e)
255 {
256 LOG.ignore(e);
257 }
258 finally
259 {
260 _closedIn=true;
261 }
262 }
263
264
265 public synchronized void closeOut(int code,String message)
266 {
267 LOG.debug("ClosedOut {} {}",this,message);
268 try
269 {
270 if (_closedIn || _closedOut)
271 _endp.close();
272 else
273 {
274 if (code<=0)
275 code=WebSocketConnectionD06.CLOSE_NORMAL;
276 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
277 bytes[0]=(byte)(code/0x100);
278 bytes[1]=(byte)(code%0x100);
279 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_CLOSE,bytes,0,bytes.length);
280 }
281 _generator.flush();
282
283 }
284 catch(IOException e)
285 {
286 LOG.ignore(e);
287 }
288 finally
289 {
290 _closedOut=true;
291 }
292 }
293
294
295 public void fillBuffersFrom(Buffer buffer)
296 {
297 _parser.fill(buffer);
298 }
299
300
301 private void checkWriteable()
302 {
303 if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
304 {
305 ((AsyncEndPoint)_endp).scheduleWrite();
306 }
307 }
308
309
310 public List<Extension> getExtensions()
311 {
312 return Collections.emptyList();
313 }
314
315
316
317
318 private class FrameConnectionD06 implements WebSocket.FrameConnection
319 {
320 volatile boolean _disconnecting;
321 int _maxTextMessage=WebSocketConnectionD06.this._maxTextMessageSize;
322 int _maxBinaryMessage=WebSocketConnectionD06.this._maxBinaryMessageSize;
323
324
325 public synchronized void sendMessage(String content) throws IOException
326 {
327 if (_closedOut)
328 throw new IOException("closing");
329 byte[] data = content.getBytes(StringUtil.__UTF8);
330 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length);
331 _generator.flush();
332 checkWriteable();
333 _idle.access(_endp);
334 }
335
336
337 public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException
338 {
339 if (_closedOut)
340 throw new IOException("closing");
341 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length);
342 _generator.flush();
343 checkWriteable();
344 _idle.access(_endp);
345 }
346
347
348 public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
349 {
350 if (_closedOut)
351 throw new IOException("closing");
352 _generator.addFrame(flags,opcode,content,offset,length);
353 _generator.flush();
354 checkWriteable();
355 _idle.access(_endp);
356 }
357
358
359 public void sendControl(byte control, byte[] data, int offset, int length) throws IOException
360 {
361 if (_closedOut)
362 throw new IOException("closing");
363 _generator.addFrame((byte)0x8,control,data,offset,length);
364 _generator.flush();
365 checkWriteable();
366 _idle.access(_endp);
367 }
368
369
370 public boolean isMessageComplete(byte flags)
371 {
372 return isLastFrame(flags);
373 }
374
375
376 public boolean isOpen()
377 {
378 return _endp!=null&&_endp.isOpen();
379 }
380
381
382 public void close(int code, String message)
383 {
384 if (_disconnecting)
385 return;
386 _disconnecting=true;
387 WebSocketConnectionD06.this.closeOut(code,message);
388 }
389
390
391 public void setMaxIdleTime(int ms)
392 {
393 try
394 {
395 _endp.setMaxIdleTime(ms);
396 }
397 catch(IOException e)
398 {
399 LOG.warn(e);
400 }
401 }
402
403
404 public void setMaxTextMessageSize(int size)
405 {
406 _maxTextMessage=size;
407 }
408
409
410 public void setMaxBinaryMessageSize(int size)
411 {
412 _maxBinaryMessage=size;
413 }
414
415
416 public int getMaxTextMessageSize()
417 {
418 return _maxTextMessage;
419 }
420
421
422 public int getMaxIdleTime()
423 {
424 return _endp.getMaxIdleTime();
425 }
426
427
428 public int getMaxBinaryMessageSize()
429 {
430 return _maxBinaryMessage;
431 }
432
433
434 public String getProtocol()
435 {
436 return _protocol;
437 }
438
439
440 public byte binaryOpcode()
441 {
442 return OP_BINARY;
443 }
444
445
446 public byte textOpcode()
447 {
448 return OP_TEXT;
449 }
450
451
452 public byte continuationOpcode()
453 {
454 return OP_CONTINUATION;
455 }
456
457
458 public byte finMask()
459 {
460 return 0x8;
461 }
462
463
464 public boolean isControl(byte opcode)
465 {
466 return isControlFrame(opcode);
467 }
468
469
470 public boolean isText(byte opcode)
471 {
472 return opcode==OP_TEXT;
473 }
474
475
476 public boolean isBinary(byte opcode)
477 {
478 return opcode==OP_BINARY;
479 }
480
481
482 public boolean isContinuation(byte opcode)
483 {
484 return opcode==OP_CONTINUATION;
485 }
486
487
488 public boolean isClose(byte opcode)
489 {
490 return opcode==OP_CLOSE;
491 }
492
493
494 public boolean isPing(byte opcode)
495 {
496 return opcode==OP_PING;
497 }
498
499
500 public boolean isPong(byte opcode)
501 {
502 return opcode==OP_PONG;
503 }
504
505
506 public void disconnect()
507 {
508 close(CLOSE_NORMAL,null);
509 }
510
511
512 public String toString()
513 {
514 return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
515 }
516
517 public void setAllowFrameFragmentation(boolean allowFragmentation)
518 {
519 }
520
521 public boolean isAllowFrameFragmentation()
522 {
523 return false;
524 }
525 }
526
527
528
529
530 private class FrameHandlerD06 implements WebSocketParser.FrameHandler
531 {
532 private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
533 private ByteArrayBuffer _aggregate;
534 private byte _opcode=-1;
535
536 public void onFrame(byte flags, byte opcode, Buffer buffer)
537 {
538 boolean lastFrame = isLastFrame(flags);
539
540 synchronized(WebSocketConnectionD06.this)
541 {
542
543 if (_closedIn)
544 return;
545
546 try
547 {
548 byte[] array=buffer.array();
549
550
551 if (_onFrame!=null)
552 {
553 if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
554 return;
555 }
556
557 if (_onControl!=null && isControlFrame(opcode))
558 {
559 if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
560 return;
561 }
562
563 switch(opcode)
564 {
565 case WebSocketConnectionD06.OP_CONTINUATION:
566 {
567
568 if (_opcode==WebSocketConnectionD06.OP_TEXT && _connection.getMaxTextMessageSize()>=0)
569 {
570 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
571 {
572
573 if (lastFrame && _onTextMessage!=null)
574 {
575 _opcode=-1;
576 String msg =_utf8.toString();
577 _utf8.reset();
578 _onTextMessage.onMessage(msg);
579 }
580 }
581 else
582 {
583 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
584 _utf8.reset();
585 _opcode=-1;
586 }
587 }
588 else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
589 {
590 if (_aggregate.space()<_aggregate.length())
591 {
592 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
593 _aggregate.clear();
594 _opcode=-1;
595 }
596 else
597 {
598 _aggregate.put(buffer);
599
600
601 if (lastFrame && _onBinaryMessage!=null)
602 {
603 try
604 {
605 _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
606 }
607 finally
608 {
609 _opcode=-1;
610 _aggregate.clear();
611 }
612 }
613 }
614 }
615 break;
616 }
617 case WebSocketConnectionD06.OP_PING:
618 {
619 LOG.debug("PING {}",this);
620 if (!_closedOut)
621 _connection.sendControl(WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
622 break;
623 }
624
625 case WebSocketConnectionD06.OP_PONG:
626 {
627 LOG.debug("PONG {}",this);
628 break;
629 }
630
631 case WebSocketConnectionD06.OP_CLOSE:
632 {
633 int code=-1;
634 String message=null;
635 if (buffer.length()>=2)
636 {
637 code=buffer.array()[buffer.getIndex()]*0xff+buffer.array()[buffer.getIndex()+1];
638 if (buffer.length()>2)
639 message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
640 }
641 closeIn(code,message);
642 break;
643 }
644
645
646 case WebSocketConnectionD06.OP_TEXT:
647 {
648 if(_onTextMessage!=null)
649 {
650 if (lastFrame)
651 {
652
653 _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
654 }
655 else
656 {
657 if (_connection.getMaxTextMessageSize()>=0)
658 {
659
660 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
661 _opcode=WebSocketConnectionD06.OP_TEXT;
662 else
663 {
664 _utf8.reset();
665 _opcode=-1;
666 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
667 }
668 }
669 }
670 }
671 break;
672 }
673
674 default:
675 {
676 if (_onBinaryMessage!=null)
677 {
678 if (lastFrame)
679 {
680 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
681 }
682 else
683 {
684 if (_connection.getMaxBinaryMessageSize()>=0)
685 {
686 if (buffer.length()>_connection.getMaxBinaryMessageSize())
687 {
688 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
689 if (_aggregate!=null)
690 _aggregate.clear();
691 _opcode=-1;
692 }
693 else
694 {
695 _opcode=opcode;
696 if (_aggregate==null)
697 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
698 _aggregate.put(buffer);
699 }
700 }
701 }
702 }
703 }
704 }
705 }
706 catch(ThreadDeath th)
707 {
708 throw th;
709 }
710 catch(Throwable th)
711 {
712 LOG.warn(th);
713 }
714 }
715 }
716
717 public void close(int code,String message)
718 {
719 _connection.close(code,message);
720 }
721
722 public String toString()
723 {
724 return WebSocketConnectionD06.this.toString()+"FH";
725 }
726 }
727
728
729 private interface IdleCheck
730 {
731 void access(EndPoint endp);
732 }
733
734
735 public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
736 {
737 String uri=request.getRequestURI();
738 String query=request.getQueryString();
739 if (query!=null && query.length()>0)
740 uri+="?"+query;
741 String key = request.getHeader("Sec-WebSocket-Key");
742
743 response.setHeader("Upgrade","WebSocket");
744 response.addHeader("Connection","Upgrade");
745 response.addHeader("Sec-WebSocket-Accept",hashKey(key));
746 if (subprotocol!=null)
747 response.addHeader("Sec-WebSocket-Protocol",subprotocol);
748 response.sendError(101);
749
750 if (_onFrame!=null)
751 _onFrame.onHandshake(_connection);
752 _webSocket.onOpen(_connection);
753 }
754
755
756 public static String hashKey(String key)
757 {
758 try
759 {
760 MessageDigest md = MessageDigest.getInstance("SHA1");
761 md.update(key.getBytes("UTF-8"));
762 md.update(MAGIC);
763 return new String(B64Code.encode(md.digest()));
764 }
765 catch (Exception e)
766 {
767 throw new RuntimeException(e);
768 }
769 }
770 }