1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.websocket;
15
16 import java.io.IOException;
17 import java.io.UnsupportedEncodingException;
18 import java.security.MessageDigest;
19 import java.util.Collections;
20 import java.util.List;
21 import javax.servlet.http.HttpServletRequest;
22 import javax.servlet.http.HttpServletResponse;
23
24 import org.eclipse.jetty.io.AbstractConnection;
25 import org.eclipse.jetty.io.AsyncEndPoint;
26 import org.eclipse.jetty.io.Buffer;
27 import org.eclipse.jetty.io.ByteArrayBuffer;
28 import org.eclipse.jetty.io.Connection;
29 import org.eclipse.jetty.io.EndPoint;
30 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
31 import org.eclipse.jetty.util.B64Code;
32 import org.eclipse.jetty.util.StringUtil;
33 import org.eclipse.jetty.util.Utf8StringBuilder;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.log.Logger;
36 import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
37 import org.eclipse.jetty.websocket.WebSocket.OnControl;
38 import org.eclipse.jetty.websocket.WebSocket.OnFrame;
39 import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
40
41 public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection
42 {
43 private static final Logger LOG = Log.getLogger(WebSocketConnectionD06.class);
44
45 final static byte OP_CONTINUATION = 0x00;
46 final static byte OP_CLOSE = 0x01;
47 final static byte OP_PING = 0x02;
48 final static byte OP_PONG = 0x03;
49 final static byte OP_TEXT = 0x04;
50 final static byte OP_BINARY = 0x05;
51
52 final static int CLOSE_NORMAL=1000;
53 final static int CLOSE_SHUTDOWN=1001;
54 final static int CLOSE_PROTOCOL=1002;
55 final static int CLOSE_BADDATA=1003;
56 final static int CLOSE_LARGE=1004;
57
58 static boolean isLastFrame(int flags)
59 {
60 return (flags&0x8)!=0;
61 }
62
63 static boolean isControlFrame(int opcode)
64 {
65 switch(opcode)
66 {
67 case OP_CLOSE:
68 case OP_PING:
69 case OP_PONG:
70 return true;
71 default:
72 return false;
73 }
74 }
75
76
77 private final static byte[] MAGIC;
78 private final IdleCheck _idle;
79 private final WebSocketParser _parser;
80 private final WebSocketGenerator _generator;
81 private final WebSocket _webSocket;
82 private final OnFrame _onFrame;
83 private final OnBinaryMessage _onBinaryMessage;
84 private final OnTextMessage _onTextMessage;
85 private final OnControl _onControl;
86 private final String _protocol;
87 private volatile boolean _closedIn;
88 private volatile boolean _closedOut;
89 private int _maxTextMessageSize;
90 private int _maxBinaryMessageSize=-1;
91
92 static
93 {
94 try
95 {
96 MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
97 }
98 catch (UnsupportedEncodingException e)
99 {
100 throw new RuntimeException(e);
101 }
102 }
103
104 private final WebSocketParser.FrameHandler _frameHandler= new FrameHandlerD06();
105
106
107
108
109 private final WebSocket.FrameConnection _connection = new FrameConnectionD06();
110
111
112
113 public WebSocketConnectionD06(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
114 throws IOException
115 {
116 super(endpoint,timestamp);
117
118 if (endpoint instanceof AsyncEndPoint)
119 ((AsyncEndPoint)endpoint).cancelIdle();
120
121 _endp.setMaxIdleTime(maxIdleTime);
122
123 _webSocket = websocket;
124 _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
125 _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
126 _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
127 _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
128 _generator = new WebSocketGeneratorD06(buffers, _endp,null);
129 _parser = new WebSocketParserD06(buffers, endpoint, _frameHandler,true);
130 _protocol=protocol;
131
132 if (_endp instanceof SelectChannelEndPoint)
133 {
134 final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
135 scep.cancelIdle();
136 _idle=new IdleCheck()
137 {
138 public void access(EndPoint endp)
139 {
140 scep.scheduleIdle();
141 }
142 };
143 scep.scheduleIdle();
144 }
145 else
146 {
147 _idle = new IdleCheck()
148 {
149 public void access(EndPoint endp)
150 {}
151 };
152 }
153
154 _maxTextMessageSize=buffers.getBufferSize();
155 _maxBinaryMessageSize=-1;
156 }
157
158
159 public WebSocket.Connection getConnection()
160 {
161 return _connection;
162 }
163
164
165 public Connection handle() throws IOException
166 {
167 try
168 {
169
170 boolean progress=true;
171
172 while (progress)
173 {
174 int flushed=_generator.flush();
175 int filled=_parser.parseNext();
176
177 progress = flushed>0 || filled>0;
178
179 if (filled<0 || flushed<0)
180 {
181 _endp.close();
182 break;
183 }
184 }
185 }
186 catch(IOException e)
187 {
188 try
189 {
190 _endp.close();
191 }
192 catch(IOException e2)
193 {
194 LOG.ignore(e2);
195 }
196 throw e;
197 }
198 finally
199 {
200 if (_endp.isOpen())
201 {
202 _idle.access(_endp);
203 if (_closedIn && _closedOut && _generator.isBufferEmpty())
204 _endp.close();
205 else if (_endp.isInputShutdown() && !_closedIn)
206 closeIn(CLOSE_PROTOCOL,null);
207 else
208 checkWriteable();
209 }
210
211 }
212 return this;
213 }
214
215
216 public boolean isIdle()
217 {
218 return _parser.isBufferEmpty() && _generator.isBufferEmpty();
219 }
220
221
222 @Override
223 public void idleExpired()
224 {
225 closeOut(WebSocketConnectionD06.CLOSE_NORMAL,"Idle");
226 }
227
228
229 public boolean isSuspended()
230 {
231 return false;
232 }
233
234
235 public void closed()
236 {
237 _webSocket.onClose(WebSocketConnectionD06.CLOSE_NORMAL,"");
238 }
239
240
241 public synchronized void closeIn(int code,String message)
242 {
243 LOG.debug("ClosedIn {} {}",this,message);
244 try
245 {
246 if (_closedOut)
247 _endp.close();
248 else
249 closeOut(code,message);
250 }
251 catch(IOException e)
252 {
253 LOG.ignore(e);
254 }
255 finally
256 {
257 _closedIn=true;
258 }
259 }
260
261
262 public synchronized void closeOut(int code,String message)
263 {
264 LOG.debug("ClosedOut {} {}",this,message);
265 try
266 {
267 if (_closedIn || _closedOut)
268 _endp.close();
269 else
270 {
271 if (code<=0)
272 code=WebSocketConnectionD06.CLOSE_NORMAL;
273 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
274 bytes[0]=(byte)(code/0x100);
275 bytes[1]=(byte)(code%0x100);
276 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_CLOSE,bytes,0,bytes.length);
277 }
278 _generator.flush();
279
280 }
281 catch(IOException e)
282 {
283 LOG.ignore(e);
284 }
285 finally
286 {
287 _closedOut=true;
288 }
289 }
290
291
292 public void fillBuffersFrom(Buffer buffer)
293 {
294 _parser.fill(buffer);
295 }
296
297
298 private void checkWriteable()
299 {
300 if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
301 {
302 ((AsyncEndPoint)_endp).scheduleWrite();
303 }
304 }
305
306
307 public List<Extension> getExtensions()
308 {
309 return Collections.emptyList();
310 }
311
312
313
314
315 private class FrameConnectionD06 implements WebSocket.FrameConnection
316 {
317 volatile boolean _disconnecting;
318 int _maxTextMessage=WebSocketConnectionD06.this._maxTextMessageSize;
319 int _maxBinaryMessage=WebSocketConnectionD06.this._maxBinaryMessageSize;
320
321
322 public synchronized void sendMessage(String content) throws IOException
323 {
324 if (_closedOut)
325 throw new IOException("closing");
326 byte[] data = content.getBytes(StringUtil.__UTF8);
327 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length);
328 _generator.flush();
329 checkWriteable();
330 _idle.access(_endp);
331 }
332
333
334 public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException
335 {
336 if (_closedOut)
337 throw new IOException("closing");
338 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length);
339 _generator.flush();
340 checkWriteable();
341 _idle.access(_endp);
342 }
343
344
345 public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
346 {
347 if (_closedOut)
348 throw new IOException("closing");
349 _generator.addFrame(flags,opcode,content,offset,length);
350 _generator.flush();
351 checkWriteable();
352 _idle.access(_endp);
353 }
354
355
356 public void sendControl(byte control, byte[] data, int offset, int length) throws IOException
357 {
358 if (_closedOut)
359 throw new IOException("closing");
360 _generator.addFrame((byte)0x8,control,data,offset,length);
361 _generator.flush();
362 checkWriteable();
363 _idle.access(_endp);
364 }
365
366
367 public boolean isMessageComplete(byte flags)
368 {
369 return isLastFrame(flags);
370 }
371
372
373 public boolean isOpen()
374 {
375 return _endp!=null&&_endp.isOpen();
376 }
377
378
379 public void close(int code, String message)
380 {
381 if (_disconnecting)
382 return;
383 _disconnecting=true;
384 WebSocketConnectionD06.this.closeOut(code,message);
385 }
386
387
388 public void setMaxIdleTime(int ms)
389 {
390 try
391 {
392 _endp.setMaxIdleTime(ms);
393 }
394 catch(IOException e)
395 {
396 LOG.warn(e);
397 }
398 }
399
400
401 public void setMaxTextMessageSize(int size)
402 {
403 _maxTextMessage=size;
404 }
405
406
407 public void setMaxBinaryMessageSize(int size)
408 {
409 _maxBinaryMessage=size;
410 }
411
412
413 public int getMaxTextMessageSize()
414 {
415 return _maxTextMessage;
416 }
417
418
419 public int getMaxIdleTime()
420 {
421 return _endp.getMaxIdleTime();
422 }
423
424
425 public int getMaxBinaryMessageSize()
426 {
427 return _maxBinaryMessage;
428 }
429
430
431 public String getProtocol()
432 {
433 return _protocol;
434 }
435
436
437 public byte binaryOpcode()
438 {
439 return OP_BINARY;
440 }
441
442
443 public byte textOpcode()
444 {
445 return OP_TEXT;
446 }
447
448
449 public byte continuationOpcode()
450 {
451 return OP_CONTINUATION;
452 }
453
454
455 public byte finMask()
456 {
457 return 0x8;
458 }
459
460
461 public boolean isControl(byte opcode)
462 {
463 return isControlFrame(opcode);
464 }
465
466
467 public boolean isText(byte opcode)
468 {
469 return opcode==OP_TEXT;
470 }
471
472
473 public boolean isBinary(byte opcode)
474 {
475 return opcode==OP_BINARY;
476 }
477
478
479 public boolean isContinuation(byte opcode)
480 {
481 return opcode==OP_CONTINUATION;
482 }
483
484
485 public boolean isClose(byte opcode)
486 {
487 return opcode==OP_CLOSE;
488 }
489
490
491 public boolean isPing(byte opcode)
492 {
493 return opcode==OP_PING;
494 }
495
496
497 public boolean isPong(byte opcode)
498 {
499 return opcode==OP_PONG;
500 }
501
502
503 public void disconnect()
504 {
505 close(CLOSE_NORMAL,null);
506 }
507
508
509 public String toString()
510 {
511 return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
512 }
513
514 public void setAllowFrameFragmentation(boolean allowFragmentation)
515 {
516 }
517
518 public boolean isAllowFrameFragmentation()
519 {
520 return false;
521 }
522 }
523
524
525
526
527 private class FrameHandlerD06 implements WebSocketParser.FrameHandler
528 {
529 private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
530 private ByteArrayBuffer _aggregate;
531 private byte _opcode=-1;
532
533 public void onFrame(byte flags, byte opcode, Buffer buffer)
534 {
535 boolean lastFrame = isLastFrame(flags);
536
537 synchronized(WebSocketConnectionD06.this)
538 {
539
540 if (_closedIn)
541 return;
542
543 try
544 {
545 byte[] array=buffer.array();
546
547
548 if (_onFrame!=null)
549 {
550 if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
551 return;
552 }
553
554 if (_onControl!=null && isControlFrame(opcode))
555 {
556 if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
557 return;
558 }
559
560 switch(opcode)
561 {
562 case WebSocketConnectionD06.OP_CONTINUATION:
563 {
564
565 if (_opcode==WebSocketConnectionD06.OP_TEXT && _connection.getMaxTextMessageSize()>=0)
566 {
567 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
568 {
569
570 if (lastFrame && _onTextMessage!=null)
571 {
572 _opcode=-1;
573 String msg =_utf8.toString();
574 _utf8.reset();
575 _onTextMessage.onMessage(msg);
576 }
577 }
578 else
579 {
580 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
581 _utf8.reset();
582 _opcode=-1;
583 }
584 }
585 else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
586 {
587 if (_aggregate.space()<_aggregate.length())
588 {
589 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
590 _aggregate.clear();
591 _opcode=-1;
592 }
593 else
594 {
595 _aggregate.put(buffer);
596
597
598 if (lastFrame && _onBinaryMessage!=null)
599 {
600 try
601 {
602 _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
603 }
604 finally
605 {
606 _opcode=-1;
607 _aggregate.clear();
608 }
609 }
610 }
611 }
612 break;
613 }
614 case WebSocketConnectionD06.OP_PING:
615 {
616 LOG.debug("PING {}",this);
617 if (!_closedOut)
618 _connection.sendControl(WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
619 break;
620 }
621
622 case WebSocketConnectionD06.OP_PONG:
623 {
624 LOG.debug("PONG {}",this);
625 break;
626 }
627
628 case WebSocketConnectionD06.OP_CLOSE:
629 {
630 int code=-1;
631 String message=null;
632 if (buffer.length()>=2)
633 {
634 code=buffer.array()[buffer.getIndex()]*0xff+buffer.array()[buffer.getIndex()+1];
635 if (buffer.length()>2)
636 message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
637 }
638 closeIn(code,message);
639 break;
640 }
641
642
643 case WebSocketConnectionD06.OP_TEXT:
644 {
645 if(_onTextMessage!=null)
646 {
647 if (lastFrame)
648 {
649
650 _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
651 }
652 else
653 {
654 if (_connection.getMaxTextMessageSize()>=0)
655 {
656
657 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
658 _opcode=WebSocketConnectionD06.OP_TEXT;
659 else
660 {
661 _utf8.reset();
662 _opcode=-1;
663 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
664 }
665 }
666 }
667 }
668 break;
669 }
670
671 default:
672 {
673 if (_onBinaryMessage!=null)
674 {
675 if (lastFrame)
676 {
677 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
678 }
679 else
680 {
681 if (_connection.getMaxBinaryMessageSize()>=0)
682 {
683 if (buffer.length()>_connection.getMaxBinaryMessageSize())
684 {
685 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
686 if (_aggregate!=null)
687 _aggregate.clear();
688 _opcode=-1;
689 }
690 else
691 {
692 _opcode=opcode;
693 if (_aggregate==null)
694 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
695 _aggregate.put(buffer);
696 }
697 }
698 }
699 }
700 }
701 }
702 }
703 catch(ThreadDeath th)
704 {
705 throw th;
706 }
707 catch(Throwable th)
708 {
709 LOG.warn(th);
710 }
711 }
712 }
713
714 public void close(int code,String message)
715 {
716 _connection.close(code,message);
717 }
718
719 public String toString()
720 {
721 return WebSocketConnectionD06.this.toString()+"FH";
722 }
723 }
724
725
726 private interface IdleCheck
727 {
728 void access(EndPoint endp);
729 }
730
731
732 public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
733 {
734 String key = request.getHeader("Sec-WebSocket-Key");
735
736 response.setHeader("Upgrade","WebSocket");
737 response.addHeader("Connection","Upgrade");
738 response.addHeader("Sec-WebSocket-Accept",hashKey(key));
739 if (subprotocol!=null)
740 response.addHeader("Sec-WebSocket-Protocol",subprotocol);
741 response.sendError(101);
742
743 if (_onFrame!=null)
744 _onFrame.onHandshake(_connection);
745 _webSocket.onOpen(_connection);
746 }
747
748
749 public static String hashKey(String key)
750 {
751 try
752 {
753 MessageDigest md = MessageDigest.getInstance("SHA1");
754 md.update(key.getBytes("UTF-8"));
755 md.update(MAGIC);
756 return new String(B64Code.encode(md.digest()));
757 }
758 catch (Exception e)
759 {
760 throw new RuntimeException(e);
761 }
762 }
763 }