View Javadoc

1   /*******************************************************************************
2    * Copyright (c) 2011 Intalio, Inc.
3    * ======================================================================
4    * All rights reserved. This program and the accompanying materials
5    * are made available under the terms of the Eclipse Public License v1.0
6    * and Apache License v2.0 which accompanies this distribution.
7    *
8    *   The Eclipse Public License is available at
9    *   http://www.eclipse.org/legal/epl-v10.html
10   *
11   *   The Apache License v2.0 is available at
12   *   http://www.opensource.org/licenses/apache2.0.php
13   *
14   * You may elect to redistribute this code under either of these licenses.
15   *******************************************************************************/
16  // ========================================================================
17  // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
18  // ------------------------------------------------------------------------
19  // All rights reserved. This program and the accompanying materials
20  // are made available under the terms of the Eclipse Public License v1.0
21  // and Apache License v2.0 which accompanies this distribution.
22  // The Eclipse Public License is available at
23  // http://www.eclipse.org/legal/epl-v10.html
24  // The Apache License v2.0 is available at
25  // http://www.opensource.org/licenses/apache2.0.php
26  // You may elect to redistribute this code under either of these licenses.
27  // ========================================================================
28  
29  package org.eclipse.jetty.websocket;
30  
31  import java.io.IOException;
32  import java.io.UnsupportedEncodingException;
33  import java.security.MessageDigest;
34  import java.util.Collections;
35  import java.util.List;
36  
37  import org.eclipse.jetty.io.AbstractConnection;
38  import org.eclipse.jetty.io.AsyncEndPoint;
39  import org.eclipse.jetty.io.Buffer;
40  import org.eclipse.jetty.io.ByteArrayBuffer;
41  import org.eclipse.jetty.io.Connection;
42  import org.eclipse.jetty.io.EndPoint;
43  import org.eclipse.jetty.util.B64Code;
44  import org.eclipse.jetty.util.StringUtil;
45  import org.eclipse.jetty.util.Utf8StringBuilder;
46  import org.eclipse.jetty.util.log.Log;
47  import org.eclipse.jetty.util.log.Logger;
48  import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
49  import org.eclipse.jetty.websocket.WebSocket.OnControl;
50  import org.eclipse.jetty.websocket.WebSocket.OnFrame;
51  import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
52  
53  public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection
54  {
55      private static final Logger LOG = Log.getLogger(WebSocketConnectionD06.class);
56  
57      final static byte OP_CONTINUATION = 0x00;
58      final static byte OP_CLOSE = 0x01;
59      final static byte OP_PING = 0x02;
60      final static byte OP_PONG = 0x03;
61      final static byte OP_TEXT = 0x04;
62      final static byte OP_BINARY = 0x05;
63  
64      final static int CLOSE_NORMAL=1000;
65      final static int CLOSE_SHUTDOWN=1001;
66      final static int CLOSE_PROTOCOL=1002;
67      final static int CLOSE_BADDATA=1003;
68      final static int CLOSE_LARGE=1004;
69  
70      static boolean isLastFrame(int flags)
71      {
72          return (flags&0x8)!=0;
73      }
74  
75      static boolean isControlFrame(int opcode)
76      {
77          switch(opcode)
78          {
79              case OP_CLOSE:
80              case OP_PING:
81              case OP_PONG:
82                  return true;
83              default:
84                  return false;
85          }
86      }
87  
88      private final static byte[] MAGIC;
89      private final WebSocketParser _parser;
90      private final WebSocketGenerator _generator;
91      private final WebSocket _webSocket;
92      private final OnFrame _onFrame;
93      private final OnBinaryMessage _onBinaryMessage;
94      private final OnTextMessage _onTextMessage;
95      private final OnControl _onControl;
96      private final String _protocol;
97      private volatile boolean _closedIn;
98      private volatile boolean _closedOut;
99      private int _maxTextMessageSize;
100     private int _maxBinaryMessageSize=-1;
101 
102     static
103     {
104         try
105         {
106             MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
107         }
108         catch (UnsupportedEncodingException e)
109         {
110             throw new RuntimeException(e);
111         }
112     }
113 
114     private final WebSocketParser.FrameHandler _frameHandler= new FrameHandlerD06();
115     private final WebSocket.FrameConnection _connection = new FrameConnectionD06();
116 
117 
118     /* ------------------------------------------------------------ */
119     public WebSocketConnectionD06(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
120         throws IOException
121     {
122         super(endpoint,timestamp);
123 
124         _endp.setMaxIdleTime(maxIdleTime);
125 
126         _webSocket = websocket;
127         _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
128         _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
129         _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
130         _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
131         _generator = new WebSocketGeneratorD06(buffers, _endp,null);
132         _parser = new WebSocketParserD06(buffers, endpoint, _frameHandler,true);
133         _protocol=protocol;
134 
135         _maxTextMessageSize=buffers.getBufferSize();
136         _maxBinaryMessageSize=-1;
137     }
138 
139     /* ------------------------------------------------------------ */
140     public WebSocket.Connection getConnection()
141     {
142         return _connection;
143     }
144 
145     /* ------------------------------------------------------------ */
146     public Connection handle() throws IOException
147     {
148         try
149         {
150             // handle the framing protocol
151             boolean progress=true;
152 
153             while (progress)
154             {
155                 int flushed=_generator.flush();
156                 int filled=_parser.parseNext();
157 
158                 progress = flushed>0 || filled>0;
159 
160                 if (filled<0 || flushed<0)
161                 {
162                     _endp.close();
163                     break;
164                 }
165             }
166         }
167         catch(IOException e)
168         {
169             try
170             {
171                 _endp.close();
172             }
173             catch(IOException e2)
174             {
175                 LOG.ignore(e2);
176             }
177             throw e;
178         }
179         finally
180         {
181             if (_endp.isOpen())
182             {
183                 if (_closedIn && _closedOut && _generator.isBufferEmpty())
184                     _endp.close();
185                 else if (_endp.isInputShutdown() && !_closedIn)
186                     closeIn(CLOSE_PROTOCOL,null);
187                 else
188                     checkWriteable();
189             }
190 
191         }
192         return this;
193     }
194 
195     /* ------------------------------------------------------------ */
196     public void onInputShutdown() throws IOException
197     {
198         // TODO
199     }
200 
201     /* ------------------------------------------------------------ */
202     public boolean isIdle()
203     {
204         return _parser.isBufferEmpty() && _generator.isBufferEmpty();
205     }
206 
207     /* ------------------------------------------------------------ */
208     @Override
209     public void onIdleExpired(long idleForMs)
210     {
211         closeOut(WebSocketConnectionD06.CLOSE_NORMAL,"Idle");
212     }
213 
214     /* ------------------------------------------------------------ */
215     public boolean isSuspended()
216     {
217         return false;
218     }
219 
220     /* ------------------------------------------------------------ */
221     public void onClose()
222     {
223         _webSocket.onClose(WebSocketConnectionD06.CLOSE_NORMAL,"");
224     }
225 
226     /* ------------------------------------------------------------ */
227     public synchronized void closeIn(int code,String message)
228     {
229         LOG.debug("ClosedIn {} {}",this,message);
230         try
231         {
232             if (_closedOut)
233                 _endp.close();
234             else
235                 closeOut(code,message);
236         }
237         catch(IOException e)
238         {
239             LOG.ignore(e);
240         }
241         finally
242         {
243             _closedIn=true;
244         }
245     }
246 
247     /* ------------------------------------------------------------ */
248     public synchronized void closeOut(int code,String message)
249     {
250         LOG.debug("ClosedOut {} {}",this,message);
251         try
252         {
253             if (_closedIn || _closedOut)
254                 _endp.close();
255             else
256             {
257                 if (code<=0)
258                     code=WebSocketConnectionD06.CLOSE_NORMAL;
259                 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
260                 bytes[0]=(byte)(code/0x100);
261                 bytes[1]=(byte)(code%0x100);
262                 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_CLOSE,bytes,0,bytes.length);
263             }
264             _generator.flush();
265 
266         }
267         catch(IOException e)
268         {
269             LOG.ignore(e);
270         }
271         finally
272         {
273             _closedOut=true;
274         }
275     }
276 
277     public void shutdown()
278     {
279         final WebSocket.Connection connection = _connection;
280         if (connection != null)
281             connection.close(CLOSE_SHUTDOWN, null);
282     }
283 
284     /* ------------------------------------------------------------ */
285     public void fillBuffersFrom(Buffer buffer)
286     {
287         _parser.fill(buffer);
288     }
289 
290     /* ------------------------------------------------------------ */
291     private void checkWriteable()
292     {
293         if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
294         {
295             ((AsyncEndPoint)_endp).scheduleWrite();
296         }
297     }
298 
299     /* ------------------------------------------------------------ */
300     public List<Extension> getExtensions()
301     {
302         return Collections.emptyList();
303     }
304 
305     protected void onFrameHandshake()
306     {
307         if (_onFrame!=null)
308         {
309             _onFrame.onHandshake(_connection);
310         }
311     }
312 
313     protected void onWebSocketOpen()
314     {
315         _webSocket.onOpen(_connection);
316     }
317 
318     /* ------------------------------------------------------------ */
319     private class FrameConnectionD06 implements WebSocket.FrameConnection
320     {
321         volatile boolean _disconnecting;
322         int _maxTextMessage=WebSocketConnectionD06.this._maxTextMessageSize;
323         int _maxBinaryMessage=WebSocketConnectionD06.this._maxBinaryMessageSize;
324 
325         /* ------------------------------------------------------------ */
326         public synchronized void sendMessage(String content) throws IOException
327         {
328             if (_closedOut)
329                 throw new IOException("closing");
330             byte[] data = content.getBytes(StringUtil.__UTF8);
331             _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length);
332             _generator.flush();
333             checkWriteable();
334         }
335 
336         /* ------------------------------------------------------------ */
337         public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException
338         {
339             if (_closedOut)
340                 throw new IOException("closing");
341             _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length);
342             _generator.flush();
343             checkWriteable();
344         }
345 
346         /* ------------------------------------------------------------ */
347         public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
348         {
349             if (_closedOut)
350                 throw new IOException("closing");
351             _generator.addFrame(flags,opcode,content,offset,length);
352             _generator.flush();
353             checkWriteable();
354         }
355 
356         /* ------------------------------------------------------------ */
357         public void sendControl(byte control, byte[] data, int offset, int length) throws IOException
358         {
359             if (_closedOut)
360                 throw new IOException("closing");
361             _generator.addFrame((byte)0x8,control,data,offset,length);
362             _generator.flush();
363             checkWriteable();
364         }
365 
366         /* ------------------------------------------------------------ */
367         public boolean isMessageComplete(byte flags)
368         {
369             return isLastFrame(flags);
370         }
371 
372         /* ------------------------------------------------------------ */
373         public boolean isOpen()
374         {
375             return _endp!=null&&_endp.isOpen();
376         }
377 
378         /* ------------------------------------------------------------ */
379         public void close(int code, String message)
380         {
381             if (_disconnecting)
382                 return;
383             _disconnecting=true;
384             WebSocketConnectionD06.this.closeOut(code,message);
385         }
386 
387         /* ------------------------------------------------------------ */
388         public void setMaxIdleTime(int ms)
389         {
390             try
391             {
392                 _endp.setMaxIdleTime(ms);
393             }
394             catch(IOException e)
395             {
396                 LOG.warn(e);
397             }
398         }
399 
400         /* ------------------------------------------------------------ */
401         public void setMaxTextMessageSize(int size)
402         {
403             _maxTextMessage=size;
404         }
405 
406         /* ------------------------------------------------------------ */
407         public void setMaxBinaryMessageSize(int size)
408         {
409             _maxBinaryMessage=size;
410         }
411 
412         /* ------------------------------------------------------------ */
413         public int getMaxTextMessageSize()
414         {
415             return _maxTextMessage;
416         }
417 
418         /* ------------------------------------------------------------ */
419         public int getMaxIdleTime()
420         {
421             return _endp.getMaxIdleTime();
422         }
423 
424         /* ------------------------------------------------------------ */
425         public int getMaxBinaryMessageSize()
426         {
427             return _maxBinaryMessage;
428         }
429 
430         /* ------------------------------------------------------------ */
431         public String getProtocol()
432         {
433             return _protocol;
434         }
435 
436         /* ------------------------------------------------------------ */
437         public byte binaryOpcode()
438         {
439             return OP_BINARY;
440         }
441 
442         /* ------------------------------------------------------------ */
443         public byte textOpcode()
444         {
445             return OP_TEXT;
446         }
447 
448         /* ------------------------------------------------------------ */
449         public byte continuationOpcode()
450         {
451             return OP_CONTINUATION;
452         }
453 
454         /* ------------------------------------------------------------ */
455         public byte finMask()
456         {
457             return 0x8;
458         }
459 
460         /* ------------------------------------------------------------ */
461         public boolean isControl(byte opcode)
462         {
463             return isControlFrame(opcode);
464         }
465 
466         /* ------------------------------------------------------------ */
467         public boolean isText(byte opcode)
468         {
469             return opcode==OP_TEXT;
470         }
471 
472         /* ------------------------------------------------------------ */
473         public boolean isBinary(byte opcode)
474         {
475             return opcode==OP_BINARY;
476         }
477 
478         /* ------------------------------------------------------------ */
479         public boolean isContinuation(byte opcode)
480         {
481             return opcode==OP_CONTINUATION;
482         }
483 
484         /* ------------------------------------------------------------ */
485         public boolean isClose(byte opcode)
486         {
487             return opcode==OP_CLOSE;
488         }
489 
490         /* ------------------------------------------------------------ */
491         public boolean isPing(byte opcode)
492         {
493             return opcode==OP_PING;
494         }
495 
496         /* ------------------------------------------------------------ */
497         public boolean isPong(byte opcode)
498         {
499             return opcode==OP_PONG;
500         }
501 
502         /* ------------------------------------------------------------ */
503         public void disconnect()
504         {
505             close();
506         }
507 
508         /* ------------------------------------------------------------ */
509         public void close()
510         {
511             close(CLOSE_NORMAL,null);
512         }
513 
514         /* ------------------------------------------------------------ */
515         @Override
516         public String toString()
517         {
518             return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
519         }
520 
521         public void setAllowFrameFragmentation(boolean allowFragmentation)
522         {
523         }
524 
525         public boolean isAllowFrameFragmentation()
526         {
527             return false;
528         }
529     }
530 
531     /* ------------------------------------------------------------ */
532     /* ------------------------------------------------------------ */
533     /* ------------------------------------------------------------ */
534     private class FrameHandlerD06 implements WebSocketParser.FrameHandler
535     {
536         private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
537         private ByteArrayBuffer _aggregate;
538         private byte _opcode=-1;
539 
540         public void onFrame(byte flags, byte opcode, Buffer buffer)
541         {
542             boolean lastFrame = isLastFrame(flags);
543 
544             synchronized(WebSocketConnectionD06.this)
545             {
546                 // Ignore incoming after a close
547                 if (_closedIn)
548                     return;
549 
550                 try
551                 {
552                     byte[] array=buffer.array();
553 
554                     // Deliver frame if websocket is a FrameWebSocket
555                     if (_onFrame!=null)
556                     {
557                         if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
558                             return;
559                     }
560 
561                     if (_onControl!=null && isControlFrame(opcode))
562                     {
563                         if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
564                             return;
565                     }
566 
567                     switch(opcode)
568                     {
569                         case WebSocketConnectionD06.OP_CONTINUATION:
570                         {
571                             // If text, append to the message buffer
572                             if (_opcode==WebSocketConnectionD06.OP_TEXT && _connection.getMaxTextMessageSize()>=0)
573                             {
574                                 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
575                                 {
576                                     // If this is the last fragment, deliver the text buffer
577                                     if (lastFrame && _onTextMessage!=null)
578                                     {
579                                         _opcode=-1;
580                                         String msg =_utf8.toString();
581                                         _utf8.reset();
582                                         _onTextMessage.onMessage(msg);
583                                     }
584                                 }
585                                 else
586                                 {
587                                     _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
588                                     _utf8.reset();
589                                     _opcode=-1;
590                                 }
591                             }
592                             else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
593                             {
594                                 if (_aggregate.space()<_aggregate.length())
595                                 {
596                                     _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
597                                     _aggregate.clear();
598                                     _opcode=-1;
599                                 }
600                                 else
601                                 {
602                                     _aggregate.put(buffer);
603 
604                                     // If this is the last fragment, deliver
605                                     if (lastFrame && _onBinaryMessage!=null)
606                                     {
607                                         try
608                                         {
609                                             _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
610                                         }
611                                         finally
612                                         {
613                                             _opcode=-1;
614                                             _aggregate.clear();
615                                         }
616                                     }
617                                 }
618                             }
619                             break;
620                         }
621                         case WebSocketConnectionD06.OP_PING:
622                         {
623                             LOG.debug("PING {}",this);
624                             if (!_closedOut)
625                                 _connection.sendControl(WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
626                             break;
627                         }
628 
629                         case WebSocketConnectionD06.OP_PONG:
630                         {
631                             LOG.debug("PONG {}",this);
632                             break;
633                         }
634 
635                         case WebSocketConnectionD06.OP_CLOSE:
636                         {
637                             int code=-1;
638                             String message=null;
639                             if (buffer.length()>=2)
640                             {
641                                 code=buffer.array()[buffer.getIndex()]*0xff+buffer.array()[buffer.getIndex()+1];
642                                 if (buffer.length()>2)
643                                     message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
644                             }
645                             closeIn(code,message);
646                             break;
647                         }
648 
649 
650                         case WebSocketConnectionD06.OP_TEXT:
651                         {
652                             if(_onTextMessage!=null)
653                             {
654                                 if (lastFrame)
655                                 {
656                                     // Deliver the message
657                                     _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
658                                 }
659                                 else
660                                 {
661                                     if (_connection.getMaxTextMessageSize()>=0)
662                                     {
663                                         // If this is a text fragment, append to buffer
664                                         if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
665                                             _opcode=WebSocketConnectionD06.OP_TEXT;
666                                         else
667                                         {
668                                             _utf8.reset();
669                                             _opcode=-1;
670                                             _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
671                                         }
672                                     }
673                                 }
674                             }
675                             break;
676                         }
677 
678                         default:
679                         {
680                             if (_onBinaryMessage!=null)
681                             {
682                                 if (lastFrame)
683                                 {
684                                     _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
685                                 }
686                                 else
687                                 {
688                                     if (_connection.getMaxBinaryMessageSize()>=0)
689                                     {
690                                         if (buffer.length()>_connection.getMaxBinaryMessageSize())
691                                         {
692                                             _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
693                                             if (_aggregate!=null)
694                                                 _aggregate.clear();
695                                             _opcode=-1;
696                                         }
697                                         else
698                                         {
699                                             _opcode=opcode;
700                                             if (_aggregate==null)
701                                                 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
702                                             _aggregate.put(buffer);
703                                         }
704                                     }
705                                 }
706                             }
707                         }
708                     }
709                 }
710                 catch(Throwable th)
711                 {
712                     LOG.warn(th);
713                 }
714             }
715         }
716 
717         public void close(int code,String message)
718         {
719             _connection.close(code,message);
720         }
721 
722         @Override
723         public String toString()
724         {
725             return WebSocketConnectionD06.this.toString()+"FH";
726         }
727     }
728 
729     /* ------------------------------------------------------------ */
730     public static String hashKey(String key)
731     {
732         try
733         {
734             MessageDigest md = MessageDigest.getInstance("SHA1");
735             md.update(key.getBytes("UTF-8"));
736             md.update(MAGIC);
737             return new String(B64Code.encode(md.digest()));
738         }
739         catch (Exception e)
740         {
741             throw new RuntimeException(e);
742         }
743     }
744 }