1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.websocket;
15
16 import java.io.IOException;
17 import java.io.UnsupportedEncodingException;
18 import java.security.MessageDigest;
19 import java.util.Collections;
20 import java.util.List;
21
22 import javax.servlet.http.HttpServletRequest;
23 import javax.servlet.http.HttpServletResponse;
24
25 import org.eclipse.jetty.io.AbstractConnection;
26 import org.eclipse.jetty.io.AsyncEndPoint;
27 import org.eclipse.jetty.io.Buffer;
28 import org.eclipse.jetty.io.ByteArrayBuffer;
29 import org.eclipse.jetty.io.Connection;
30 import org.eclipse.jetty.io.EndPoint;
31 import org.eclipse.jetty.util.B64Code;
32 import org.eclipse.jetty.util.StringUtil;
33 import org.eclipse.jetty.util.Utf8StringBuilder;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.log.Logger;
36 import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
37 import org.eclipse.jetty.websocket.WebSocket.OnControl;
38 import org.eclipse.jetty.websocket.WebSocket.OnFrame;
39 import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
40
41 public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection
42 {
43 private static final Logger LOG = Log.getLogger(WebSocketConnectionD06.class);
44
45 final static byte OP_CONTINUATION = 0x00;
46 final static byte OP_CLOSE = 0x01;
47 final static byte OP_PING = 0x02;
48 final static byte OP_PONG = 0x03;
49 final static byte OP_TEXT = 0x04;
50 final static byte OP_BINARY = 0x05;
51
52 final static int CLOSE_NORMAL=1000;
53 final static int CLOSE_SHUTDOWN=1001;
54 final static int CLOSE_PROTOCOL=1002;
55 final static int CLOSE_BADDATA=1003;
56 final static int CLOSE_LARGE=1004;
57
58 static boolean isLastFrame(int flags)
59 {
60 return (flags&0x8)!=0;
61 }
62
63 static boolean isControlFrame(int opcode)
64 {
65 switch(opcode)
66 {
67 case OP_CLOSE:
68 case OP_PING:
69 case OP_PONG:
70 return true;
71 default:
72 return false;
73 }
74 }
75
76
77 private final static byte[] MAGIC;
78 private final WebSocketParser _parser;
79 private final WebSocketGenerator _generator;
80 private final WebSocket _webSocket;
81 private final OnFrame _onFrame;
82 private final OnBinaryMessage _onBinaryMessage;
83 private final OnTextMessage _onTextMessage;
84 private final OnControl _onControl;
85 private final String _protocol;
86 private volatile boolean _closedIn;
87 private volatile boolean _closedOut;
88 private int _maxTextMessageSize;
89 private int _maxBinaryMessageSize=-1;
90
91 static
92 {
93 try
94 {
95 MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
96 }
97 catch (UnsupportedEncodingException e)
98 {
99 throw new RuntimeException(e);
100 }
101 }
102
103 private final WebSocketParser.FrameHandler _frameHandler= new FrameHandlerD06();
104
105
106
107
108 private final WebSocket.FrameConnection _connection = new FrameConnectionD06();
109
110
111
112 public WebSocketConnectionD06(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
113 throws IOException
114 {
115 super(endpoint,timestamp);
116
117 _endp.setMaxIdleTime(maxIdleTime);
118
119 _webSocket = websocket;
120 _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
121 _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
122 _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
123 _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
124 _generator = new WebSocketGeneratorD06(buffers, _endp,null);
125 _parser = new WebSocketParserD06(buffers, endpoint, _frameHandler,true);
126 _protocol=protocol;
127
128 _maxTextMessageSize=buffers.getBufferSize();
129 _maxBinaryMessageSize=-1;
130 }
131
132
133 public WebSocket.Connection getConnection()
134 {
135 return _connection;
136 }
137
138
139 public Connection handle() throws IOException
140 {
141 try
142 {
143
144 boolean progress=true;
145
146 while (progress)
147 {
148 int flushed=_generator.flush();
149 int filled=_parser.parseNext();
150
151 progress = flushed>0 || filled>0;
152
153 if (filled<0 || flushed<0)
154 {
155 _endp.close();
156 break;
157 }
158 }
159 }
160 catch(IOException e)
161 {
162 try
163 {
164 _endp.close();
165 }
166 catch(IOException e2)
167 {
168 LOG.ignore(e2);
169 }
170 throw e;
171 }
172 finally
173 {
174 if (_endp.isOpen())
175 {
176 if (_closedIn && _closedOut && _generator.isBufferEmpty())
177 _endp.close();
178 else if (_endp.isInputShutdown() && !_closedIn)
179 closeIn(CLOSE_PROTOCOL,null);
180 else
181 checkWriteable();
182 }
183
184 }
185 return this;
186 }
187
188
189 public void onInputShutdown() throws IOException
190 {
191
192 }
193
194
195 public boolean isIdle()
196 {
197 return _parser.isBufferEmpty() && _generator.isBufferEmpty();
198 }
199
200
201 @Override
202 public void onIdleExpired()
203 {
204 closeOut(WebSocketConnectionD06.CLOSE_NORMAL,"Idle");
205 }
206
207
208 public boolean isSuspended()
209 {
210 return false;
211 }
212
213
214 public void onClose()
215 {
216 _webSocket.onClose(WebSocketConnectionD06.CLOSE_NORMAL,"");
217 }
218
219
220 public synchronized void closeIn(int code,String message)
221 {
222 LOG.debug("ClosedIn {} {}",this,message);
223 try
224 {
225 if (_closedOut)
226 _endp.close();
227 else
228 closeOut(code,message);
229 }
230 catch(IOException e)
231 {
232 LOG.ignore(e);
233 }
234 finally
235 {
236 _closedIn=true;
237 }
238 }
239
240
241 public synchronized void closeOut(int code,String message)
242 {
243 LOG.debug("ClosedOut {} {}",this,message);
244 try
245 {
246 if (_closedIn || _closedOut)
247 _endp.close();
248 else
249 {
250 if (code<=0)
251 code=WebSocketConnectionD06.CLOSE_NORMAL;
252 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
253 bytes[0]=(byte)(code/0x100);
254 bytes[1]=(byte)(code%0x100);
255 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_CLOSE,bytes,0,bytes.length);
256 }
257 _generator.flush();
258
259 }
260 catch(IOException e)
261 {
262 LOG.ignore(e);
263 }
264 finally
265 {
266 _closedOut=true;
267 }
268 }
269
270
271 public void fillBuffersFrom(Buffer buffer)
272 {
273 _parser.fill(buffer);
274 }
275
276
277 private void checkWriteable()
278 {
279 if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
280 {
281 ((AsyncEndPoint)_endp).scheduleWrite();
282 }
283 }
284
285
286 public List<Extension> getExtensions()
287 {
288 return Collections.emptyList();
289 }
290
291
292
293
294 private class FrameConnectionD06 implements WebSocket.FrameConnection
295 {
296 volatile boolean _disconnecting;
297 int _maxTextMessage=WebSocketConnectionD06.this._maxTextMessageSize;
298 int _maxBinaryMessage=WebSocketConnectionD06.this._maxBinaryMessageSize;
299
300
301 public synchronized void sendMessage(String content) throws IOException
302 {
303 if (_closedOut)
304 throw new IOException("closing");
305 byte[] data = content.getBytes(StringUtil.__UTF8);
306 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length);
307 _generator.flush();
308 checkWriteable();
309 }
310
311
312 public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException
313 {
314 if (_closedOut)
315 throw new IOException("closing");
316 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length);
317 _generator.flush();
318 checkWriteable();
319 }
320
321
322 public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
323 {
324 if (_closedOut)
325 throw new IOException("closing");
326 _generator.addFrame(flags,opcode,content,offset,length);
327 _generator.flush();
328 checkWriteable();
329 }
330
331
332 public void sendControl(byte control, byte[] data, int offset, int length) throws IOException
333 {
334 if (_closedOut)
335 throw new IOException("closing");
336 _generator.addFrame((byte)0x8,control,data,offset,length);
337 _generator.flush();
338 checkWriteable();
339 }
340
341
342 public boolean isMessageComplete(byte flags)
343 {
344 return isLastFrame(flags);
345 }
346
347
348 public boolean isOpen()
349 {
350 return _endp!=null&&_endp.isOpen();
351 }
352
353
354 public void close(int code, String message)
355 {
356 if (_disconnecting)
357 return;
358 _disconnecting=true;
359 WebSocketConnectionD06.this.closeOut(code,message);
360 }
361
362
363 public void setMaxIdleTime(int ms)
364 {
365 try
366 {
367 _endp.setMaxIdleTime(ms);
368 }
369 catch(IOException e)
370 {
371 LOG.warn(e);
372 }
373 }
374
375
376 public void setMaxTextMessageSize(int size)
377 {
378 _maxTextMessage=size;
379 }
380
381
382 public void setMaxBinaryMessageSize(int size)
383 {
384 _maxBinaryMessage=size;
385 }
386
387
388 public int getMaxTextMessageSize()
389 {
390 return _maxTextMessage;
391 }
392
393
394 public int getMaxIdleTime()
395 {
396 return _endp.getMaxIdleTime();
397 }
398
399
400 public int getMaxBinaryMessageSize()
401 {
402 return _maxBinaryMessage;
403 }
404
405
406 public String getProtocol()
407 {
408 return _protocol;
409 }
410
411
412 public byte binaryOpcode()
413 {
414 return OP_BINARY;
415 }
416
417
418 public byte textOpcode()
419 {
420 return OP_TEXT;
421 }
422
423
424 public byte continuationOpcode()
425 {
426 return OP_CONTINUATION;
427 }
428
429
430 public byte finMask()
431 {
432 return 0x8;
433 }
434
435
436 public boolean isControl(byte opcode)
437 {
438 return isControlFrame(opcode);
439 }
440
441
442 public boolean isText(byte opcode)
443 {
444 return opcode==OP_TEXT;
445 }
446
447
448 public boolean isBinary(byte opcode)
449 {
450 return opcode==OP_BINARY;
451 }
452
453
454 public boolean isContinuation(byte opcode)
455 {
456 return opcode==OP_CONTINUATION;
457 }
458
459
460 public boolean isClose(byte opcode)
461 {
462 return opcode==OP_CLOSE;
463 }
464
465
466 public boolean isPing(byte opcode)
467 {
468 return opcode==OP_PING;
469 }
470
471
472 public boolean isPong(byte opcode)
473 {
474 return opcode==OP_PONG;
475 }
476
477
478 public void disconnect()
479 {
480 close();
481 }
482
483
484 public void close()
485 {
486 close(CLOSE_NORMAL,null);
487 }
488
489
490 public String toString()
491 {
492 return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
493 }
494
495 public void setAllowFrameFragmentation(boolean allowFragmentation)
496 {
497 }
498
499 public boolean isAllowFrameFragmentation()
500 {
501 return false;
502 }
503 }
504
505
506
507
508 private class FrameHandlerD06 implements WebSocketParser.FrameHandler
509 {
510 private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
511 private ByteArrayBuffer _aggregate;
512 private byte _opcode=-1;
513
514 public void onFrame(byte flags, byte opcode, Buffer buffer)
515 {
516 boolean lastFrame = isLastFrame(flags);
517
518 synchronized(WebSocketConnectionD06.this)
519 {
520
521 if (_closedIn)
522 return;
523
524 try
525 {
526 byte[] array=buffer.array();
527
528
529 if (_onFrame!=null)
530 {
531 if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
532 return;
533 }
534
535 if (_onControl!=null && isControlFrame(opcode))
536 {
537 if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
538 return;
539 }
540
541 switch(opcode)
542 {
543 case WebSocketConnectionD06.OP_CONTINUATION:
544 {
545
546 if (_opcode==WebSocketConnectionD06.OP_TEXT && _connection.getMaxTextMessageSize()>=0)
547 {
548 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
549 {
550
551 if (lastFrame && _onTextMessage!=null)
552 {
553 _opcode=-1;
554 String msg =_utf8.toString();
555 _utf8.reset();
556 _onTextMessage.onMessage(msg);
557 }
558 }
559 else
560 {
561 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
562 _utf8.reset();
563 _opcode=-1;
564 }
565 }
566 else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
567 {
568 if (_aggregate.space()<_aggregate.length())
569 {
570 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
571 _aggregate.clear();
572 _opcode=-1;
573 }
574 else
575 {
576 _aggregate.put(buffer);
577
578
579 if (lastFrame && _onBinaryMessage!=null)
580 {
581 try
582 {
583 _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
584 }
585 finally
586 {
587 _opcode=-1;
588 _aggregate.clear();
589 }
590 }
591 }
592 }
593 break;
594 }
595 case WebSocketConnectionD06.OP_PING:
596 {
597 LOG.debug("PING {}",this);
598 if (!_closedOut)
599 _connection.sendControl(WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
600 break;
601 }
602
603 case WebSocketConnectionD06.OP_PONG:
604 {
605 LOG.debug("PONG {}",this);
606 break;
607 }
608
609 case WebSocketConnectionD06.OP_CLOSE:
610 {
611 int code=-1;
612 String message=null;
613 if (buffer.length()>=2)
614 {
615 code=buffer.array()[buffer.getIndex()]*0xff+buffer.array()[buffer.getIndex()+1];
616 if (buffer.length()>2)
617 message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
618 }
619 closeIn(code,message);
620 break;
621 }
622
623
624 case WebSocketConnectionD06.OP_TEXT:
625 {
626 if(_onTextMessage!=null)
627 {
628 if (lastFrame)
629 {
630
631 _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
632 }
633 else
634 {
635 if (_connection.getMaxTextMessageSize()>=0)
636 {
637
638 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
639 _opcode=WebSocketConnectionD06.OP_TEXT;
640 else
641 {
642 _utf8.reset();
643 _opcode=-1;
644 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
645 }
646 }
647 }
648 }
649 break;
650 }
651
652 default:
653 {
654 if (_onBinaryMessage!=null)
655 {
656 if (lastFrame)
657 {
658 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
659 }
660 else
661 {
662 if (_connection.getMaxBinaryMessageSize()>=0)
663 {
664 if (buffer.length()>_connection.getMaxBinaryMessageSize())
665 {
666 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
667 if (_aggregate!=null)
668 _aggregate.clear();
669 _opcode=-1;
670 }
671 else
672 {
673 _opcode=opcode;
674 if (_aggregate==null)
675 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
676 _aggregate.put(buffer);
677 }
678 }
679 }
680 }
681 }
682 }
683 }
684 catch(Throwable th)
685 {
686 LOG.warn(th);
687 }
688 }
689 }
690
691 public void close(int code,String message)
692 {
693 _connection.close(code,message);
694 }
695
696 public String toString()
697 {
698 return WebSocketConnectionD06.this.toString()+"FH";
699 }
700 }
701
702
703 public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
704 {
705 String key = request.getHeader("Sec-WebSocket-Key");
706
707 response.setHeader("Upgrade","WebSocket");
708 response.addHeader("Connection","Upgrade");
709 response.addHeader("Sec-WebSocket-Accept",hashKey(key));
710 if (subprotocol!=null)
711 response.addHeader("Sec-WebSocket-Protocol",subprotocol);
712 response.sendError(101);
713
714 if (_onFrame!=null)
715 _onFrame.onHandshake(_connection);
716 _webSocket.onOpen(_connection);
717 }
718
719
720 public static String hashKey(String key)
721 {
722 try
723 {
724 MessageDigest md = MessageDigest.getInstance("SHA1");
725 md.update(key.getBytes("UTF-8"));
726 md.update(MAGIC);
727 return new String(B64Code.encode(md.digest()));
728 }
729 catch (Exception e)
730 {
731 throw new RuntimeException(e);
732 }
733 }
734 }