1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 package org.eclipse.jetty.websocket;
30
31 import java.io.IOException;
32 import java.io.UnsupportedEncodingException;
33 import java.security.MessageDigest;
34 import java.util.Collections;
35 import java.util.List;
36
37 import org.eclipse.jetty.io.AbstractConnection;
38 import org.eclipse.jetty.io.AsyncEndPoint;
39 import org.eclipse.jetty.io.Buffer;
40 import org.eclipse.jetty.io.ByteArrayBuffer;
41 import org.eclipse.jetty.io.Connection;
42 import org.eclipse.jetty.io.EndPoint;
43 import org.eclipse.jetty.util.B64Code;
44 import org.eclipse.jetty.util.StringUtil;
45 import org.eclipse.jetty.util.Utf8StringBuilder;
46 import org.eclipse.jetty.util.log.Log;
47 import org.eclipse.jetty.util.log.Logger;
48 import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
49 import org.eclipse.jetty.websocket.WebSocket.OnControl;
50 import org.eclipse.jetty.websocket.WebSocket.OnFrame;
51 import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
52
53 public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection
54 {
55 private static final Logger LOG = Log.getLogger(WebSocketConnectionD06.class);
56
57 final static byte OP_CONTINUATION = 0x00;
58 final static byte OP_CLOSE = 0x01;
59 final static byte OP_PING = 0x02;
60 final static byte OP_PONG = 0x03;
61 final static byte OP_TEXT = 0x04;
62 final static byte OP_BINARY = 0x05;
63
64 final static int CLOSE_NORMAL=1000;
65 final static int CLOSE_SHUTDOWN=1001;
66 final static int CLOSE_PROTOCOL=1002;
67 final static int CLOSE_BADDATA=1003;
68 final static int CLOSE_LARGE=1004;
69
70 static boolean isLastFrame(int flags)
71 {
72 return (flags&0x8)!=0;
73 }
74
75 static boolean isControlFrame(int opcode)
76 {
77 switch(opcode)
78 {
79 case OP_CLOSE:
80 case OP_PING:
81 case OP_PONG:
82 return true;
83 default:
84 return false;
85 }
86 }
87
88 private final static byte[] MAGIC;
89 private final WebSocketParser _parser;
90 private final WebSocketGenerator _generator;
91 private final WebSocket _webSocket;
92 private final OnFrame _onFrame;
93 private final OnBinaryMessage _onBinaryMessage;
94 private final OnTextMessage _onTextMessage;
95 private final OnControl _onControl;
96 private final String _protocol;
97 private volatile boolean _closedIn;
98 private volatile boolean _closedOut;
99 private int _maxTextMessageSize;
100 private int _maxBinaryMessageSize=-1;
101
102 static
103 {
104 try
105 {
106 MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
107 }
108 catch (UnsupportedEncodingException e)
109 {
110 throw new RuntimeException(e);
111 }
112 }
113
114 private final WebSocketParser.FrameHandler _frameHandler= new FrameHandlerD06();
115 private final WebSocket.FrameConnection _connection = new FrameConnectionD06();
116
117
118
119 public WebSocketConnectionD06(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
120 throws IOException
121 {
122 super(endpoint,timestamp);
123
124 _endp.setMaxIdleTime(maxIdleTime);
125
126 _webSocket = websocket;
127 _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
128 _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
129 _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
130 _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
131 _generator = new WebSocketGeneratorD06(buffers, _endp,null);
132 _parser = new WebSocketParserD06(buffers, endpoint, _frameHandler,true);
133 _protocol=protocol;
134
135 _maxTextMessageSize=buffers.getBufferSize();
136 _maxBinaryMessageSize=-1;
137 }
138
139
140 public WebSocket.Connection getConnection()
141 {
142 return _connection;
143 }
144
145
146 public Connection handle() throws IOException
147 {
148 try
149 {
150
151 boolean progress=true;
152
153 while (progress)
154 {
155 int flushed=_generator.flush();
156 int filled=_parser.parseNext();
157
158 progress = flushed>0 || filled>0;
159
160 if (filled<0 || flushed<0)
161 {
162 _endp.close();
163 break;
164 }
165 }
166 }
167 catch(IOException e)
168 {
169 try
170 {
171 _endp.close();
172 }
173 catch(IOException e2)
174 {
175 LOG.ignore(e2);
176 }
177 throw e;
178 }
179 finally
180 {
181 if (_endp.isOpen())
182 {
183 if (_closedIn && _closedOut && _generator.isBufferEmpty())
184 _endp.close();
185 else if (_endp.isInputShutdown() && !_closedIn)
186 closeIn(CLOSE_PROTOCOL,null);
187 else
188 checkWriteable();
189 }
190
191 }
192 return this;
193 }
194
195
196 public void onInputShutdown() throws IOException
197 {
198
199 }
200
201
202 public boolean isIdle()
203 {
204 return _parser.isBufferEmpty() && _generator.isBufferEmpty();
205 }
206
207
208 @Override
209 public void onIdleExpired(long idleForMs)
210 {
211 closeOut(WebSocketConnectionD06.CLOSE_NORMAL,"Idle");
212 }
213
214
215 public boolean isSuspended()
216 {
217 return false;
218 }
219
220
221 public void onClose()
222 {
223 _webSocket.onClose(WebSocketConnectionD06.CLOSE_NORMAL,"");
224 }
225
226
227 public synchronized void closeIn(int code,String message)
228 {
229 LOG.debug("ClosedIn {} {}",this,message);
230 try
231 {
232 if (_closedOut)
233 _endp.close();
234 else
235 closeOut(code,message);
236 }
237 catch(IOException e)
238 {
239 LOG.ignore(e);
240 }
241 finally
242 {
243 _closedIn=true;
244 }
245 }
246
247
248 public synchronized void closeOut(int code,String message)
249 {
250 LOG.debug("ClosedOut {} {}",this,message);
251 try
252 {
253 if (_closedIn || _closedOut)
254 _endp.close();
255 else
256 {
257 if (code<=0)
258 code=WebSocketConnectionD06.CLOSE_NORMAL;
259 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
260 bytes[0]=(byte)(code/0x100);
261 bytes[1]=(byte)(code%0x100);
262 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_CLOSE,bytes,0,bytes.length);
263 }
264 _generator.flush();
265
266 }
267 catch(IOException e)
268 {
269 LOG.ignore(e);
270 }
271 finally
272 {
273 _closedOut=true;
274 }
275 }
276
277 public void shutdown()
278 {
279 final WebSocket.Connection connection = _connection;
280 if (connection != null)
281 connection.close(CLOSE_SHUTDOWN, null);
282 }
283
284
285 public void fillBuffersFrom(Buffer buffer)
286 {
287 _parser.fill(buffer);
288 }
289
290
291 private void checkWriteable()
292 {
293 if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
294 {
295 ((AsyncEndPoint)_endp).scheduleWrite();
296 }
297 }
298
299
300 public List<Extension> getExtensions()
301 {
302 return Collections.emptyList();
303 }
304
305 protected void onFrameHandshake()
306 {
307 if (_onFrame!=null)
308 {
309 _onFrame.onHandshake(_connection);
310 }
311 }
312
313 protected void onWebSocketOpen()
314 {
315 _webSocket.onOpen(_connection);
316 }
317
318
319 private class FrameConnectionD06 implements WebSocket.FrameConnection
320 {
321 volatile boolean _disconnecting;
322 int _maxTextMessage=WebSocketConnectionD06.this._maxTextMessageSize;
323 int _maxBinaryMessage=WebSocketConnectionD06.this._maxBinaryMessageSize;
324
325
326 public synchronized void sendMessage(String content) throws IOException
327 {
328 if (_closedOut)
329 throw new IOException("closing");
330 byte[] data = content.getBytes(StringUtil.__UTF8);
331 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length);
332 _generator.flush();
333 checkWriteable();
334 }
335
336
337 public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException
338 {
339 if (_closedOut)
340 throw new IOException("closing");
341 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length);
342 _generator.flush();
343 checkWriteable();
344 }
345
346
347 public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
348 {
349 if (_closedOut)
350 throw new IOException("closing");
351 _generator.addFrame(flags,opcode,content,offset,length);
352 _generator.flush();
353 checkWriteable();
354 }
355
356
357 public void sendControl(byte control, byte[] data, int offset, int length) throws IOException
358 {
359 if (_closedOut)
360 throw new IOException("closing");
361 _generator.addFrame((byte)0x8,control,data,offset,length);
362 _generator.flush();
363 checkWriteable();
364 }
365
366
367 public boolean isMessageComplete(byte flags)
368 {
369 return isLastFrame(flags);
370 }
371
372
373 public boolean isOpen()
374 {
375 return _endp!=null&&_endp.isOpen();
376 }
377
378
379 public void close(int code, String message)
380 {
381 if (_disconnecting)
382 return;
383 _disconnecting=true;
384 WebSocketConnectionD06.this.closeOut(code,message);
385 }
386
387
388 public void setMaxIdleTime(int ms)
389 {
390 try
391 {
392 _endp.setMaxIdleTime(ms);
393 }
394 catch(IOException e)
395 {
396 LOG.warn(e);
397 }
398 }
399
400
401 public void setMaxTextMessageSize(int size)
402 {
403 _maxTextMessage=size;
404 }
405
406
407 public void setMaxBinaryMessageSize(int size)
408 {
409 _maxBinaryMessage=size;
410 }
411
412
413 public int getMaxTextMessageSize()
414 {
415 return _maxTextMessage;
416 }
417
418
419 public int getMaxIdleTime()
420 {
421 return _endp.getMaxIdleTime();
422 }
423
424
425 public int getMaxBinaryMessageSize()
426 {
427 return _maxBinaryMessage;
428 }
429
430
431 public String getProtocol()
432 {
433 return _protocol;
434 }
435
436
437 public byte binaryOpcode()
438 {
439 return OP_BINARY;
440 }
441
442
443 public byte textOpcode()
444 {
445 return OP_TEXT;
446 }
447
448
449 public byte continuationOpcode()
450 {
451 return OP_CONTINUATION;
452 }
453
454
455 public byte finMask()
456 {
457 return 0x8;
458 }
459
460
461 public boolean isControl(byte opcode)
462 {
463 return isControlFrame(opcode);
464 }
465
466
467 public boolean isText(byte opcode)
468 {
469 return opcode==OP_TEXT;
470 }
471
472
473 public boolean isBinary(byte opcode)
474 {
475 return opcode==OP_BINARY;
476 }
477
478
479 public boolean isContinuation(byte opcode)
480 {
481 return opcode==OP_CONTINUATION;
482 }
483
484
485 public boolean isClose(byte opcode)
486 {
487 return opcode==OP_CLOSE;
488 }
489
490
491 public boolean isPing(byte opcode)
492 {
493 return opcode==OP_PING;
494 }
495
496
497 public boolean isPong(byte opcode)
498 {
499 return opcode==OP_PONG;
500 }
501
502
503 public void disconnect()
504 {
505 close();
506 }
507
508
509 public void close()
510 {
511 close(CLOSE_NORMAL,null);
512 }
513
514
515 @Override
516 public String toString()
517 {
518 return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
519 }
520
521 public void setAllowFrameFragmentation(boolean allowFragmentation)
522 {
523 }
524
525 public boolean isAllowFrameFragmentation()
526 {
527 return false;
528 }
529 }
530
531
532
533
534 private class FrameHandlerD06 implements WebSocketParser.FrameHandler
535 {
536 private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
537 private ByteArrayBuffer _aggregate;
538 private byte _opcode=-1;
539
540 public void onFrame(byte flags, byte opcode, Buffer buffer)
541 {
542 boolean lastFrame = isLastFrame(flags);
543
544 synchronized(WebSocketConnectionD06.this)
545 {
546
547 if (_closedIn)
548 return;
549
550 try
551 {
552 byte[] array=buffer.array();
553
554
555 if (_onFrame!=null)
556 {
557 if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
558 return;
559 }
560
561 if (_onControl!=null && isControlFrame(opcode))
562 {
563 if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
564 return;
565 }
566
567 switch(opcode)
568 {
569 case WebSocketConnectionD06.OP_CONTINUATION:
570 {
571
572 if (_opcode==WebSocketConnectionD06.OP_TEXT && _connection.getMaxTextMessageSize()>=0)
573 {
574 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
575 {
576
577 if (lastFrame && _onTextMessage!=null)
578 {
579 _opcode=-1;
580 String msg =_utf8.toString();
581 _utf8.reset();
582 _onTextMessage.onMessage(msg);
583 }
584 }
585 else
586 {
587 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
588 _utf8.reset();
589 _opcode=-1;
590 }
591 }
592 else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
593 {
594 if (_aggregate.space()<_aggregate.length())
595 {
596 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
597 _aggregate.clear();
598 _opcode=-1;
599 }
600 else
601 {
602 _aggregate.put(buffer);
603
604
605 if (lastFrame && _onBinaryMessage!=null)
606 {
607 try
608 {
609 _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
610 }
611 finally
612 {
613 _opcode=-1;
614 _aggregate.clear();
615 }
616 }
617 }
618 }
619 break;
620 }
621 case WebSocketConnectionD06.OP_PING:
622 {
623 LOG.debug("PING {}",this);
624 if (!_closedOut)
625 _connection.sendControl(WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
626 break;
627 }
628
629 case WebSocketConnectionD06.OP_PONG:
630 {
631 LOG.debug("PONG {}",this);
632 break;
633 }
634
635 case WebSocketConnectionD06.OP_CLOSE:
636 {
637 int code=-1;
638 String message=null;
639 if (buffer.length()>=2)
640 {
641 code=buffer.array()[buffer.getIndex()]*0xff+buffer.array()[buffer.getIndex()+1];
642 if (buffer.length()>2)
643 message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
644 }
645 closeIn(code,message);
646 break;
647 }
648
649
650 case WebSocketConnectionD06.OP_TEXT:
651 {
652 if(_onTextMessage!=null)
653 {
654 if (lastFrame)
655 {
656
657 _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
658 }
659 else
660 {
661 if (_connection.getMaxTextMessageSize()>=0)
662 {
663
664 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
665 _opcode=WebSocketConnectionD06.OP_TEXT;
666 else
667 {
668 _utf8.reset();
669 _opcode=-1;
670 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
671 }
672 }
673 }
674 }
675 break;
676 }
677
678 default:
679 {
680 if (_onBinaryMessage!=null)
681 {
682 if (lastFrame)
683 {
684 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
685 }
686 else
687 {
688 if (_connection.getMaxBinaryMessageSize()>=0)
689 {
690 if (buffer.length()>_connection.getMaxBinaryMessageSize())
691 {
692 _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
693 if (_aggregate!=null)
694 _aggregate.clear();
695 _opcode=-1;
696 }
697 else
698 {
699 _opcode=opcode;
700 if (_aggregate==null)
701 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
702 _aggregate.put(buffer);
703 }
704 }
705 }
706 }
707 }
708 }
709 }
710 catch(Throwable th)
711 {
712 LOG.warn(th);
713 }
714 }
715 }
716
717 public void close(int code,String message)
718 {
719 _connection.close(code,message);
720 }
721
722 @Override
723 public String toString()
724 {
725 return WebSocketConnectionD06.this.toString()+"FH";
726 }
727 }
728
729
730 public static String hashKey(String key)
731 {
732 try
733 {
734 MessageDigest md = MessageDigest.getInstance("SHA1");
735 md.update(key.getBytes("UTF-8"));
736 md.update(MAGIC);
737 return new String(B64Code.encode(md.digest()));
738 }
739 catch (Exception e)
740 {
741 throw new RuntimeException(e);
742 }
743 }
744 }