View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket;
20  
21  import java.io.IOException;
22  import java.io.UnsupportedEncodingException;
23  import java.security.MessageDigest;
24  import java.util.Collections;
25  import java.util.List;
26  
27  import org.eclipse.jetty.io.AbstractConnection;
28  import org.eclipse.jetty.io.AsyncEndPoint;
29  import org.eclipse.jetty.io.Buffer;
30  import org.eclipse.jetty.io.ByteArrayBuffer;
31  import org.eclipse.jetty.io.Connection;
32  import org.eclipse.jetty.io.EndPoint;
33  import org.eclipse.jetty.util.B64Code;
34  import org.eclipse.jetty.util.StringUtil;
35  import org.eclipse.jetty.util.Utf8StringBuilder;
36  import org.eclipse.jetty.util.log.Log;
37  import org.eclipse.jetty.util.log.Logger;
38  import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
39  import org.eclipse.jetty.websocket.WebSocket.OnControl;
40  import org.eclipse.jetty.websocket.WebSocket.OnFrame;
41  import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
42  
43  public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection
44  {
45      private static final Logger LOG = Log.getLogger(WebSocketConnectionD06.class);
46  
47      final static byte OP_CONTINUATION = 0x00;
48      final static byte OP_CLOSE = 0x01;
49      final static byte OP_PING = 0x02;
50      final static byte OP_PONG = 0x03;
51      final static byte OP_TEXT = 0x04;
52      final static byte OP_BINARY = 0x05;
53  
54      final static int CLOSE_NORMAL=1000;
55      final static int CLOSE_SHUTDOWN=1001;
56      final static int CLOSE_PROTOCOL=1002;
57      final static int CLOSE_BADDATA=1003;
58      final static int CLOSE_LARGE=1004;
59  
60      static boolean isLastFrame(int flags)
61      {
62          return (flags&0x8)!=0;
63      }
64  
65      static boolean isControlFrame(int opcode)
66      {
67          switch(opcode)
68          {
69              case OP_CLOSE:
70              case OP_PING:
71              case OP_PONG:
72                  return true;
73              default:
74                  return false;
75          }
76      }
77  
78      private final static byte[] MAGIC;
79      private final WebSocketParser _parser;
80      private final WebSocketGenerator _generator;
81      private final WebSocket _webSocket;
82      private final OnFrame _onFrame;
83      private final OnBinaryMessage _onBinaryMessage;
84      private final OnTextMessage _onTextMessage;
85      private final OnControl _onControl;
86      private final String _protocol;
87      private volatile boolean _closedIn;
88      private volatile boolean _closedOut;
89      private int _maxTextMessageSize;
90      private int _maxBinaryMessageSize=-1;
91  
92      static
93      {
94          try
95          {
96              MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
97          }
98          catch (UnsupportedEncodingException e)
99          {
100             throw new RuntimeException(e);
101         }
102     }
103 
104     private final WebSocketParser.FrameHandler _frameHandler= new FrameHandlerD06();
105     private final WebSocket.FrameConnection _connection = new FrameConnectionD06();
106 
107 
108     /* ------------------------------------------------------------ */
109     public WebSocketConnectionD06(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
110         throws IOException
111     {
112         super(endpoint,timestamp);
113 
114         _endp.setMaxIdleTime(maxIdleTime);
115 
116         _webSocket = websocket;
117         _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
118         _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
119         _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
120         _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
121         _generator = new WebSocketGeneratorD06(buffers, _endp,null);
122         _parser = new WebSocketParserD06(buffers, endpoint, _frameHandler,true);
123         _protocol=protocol;
124 
125         _maxTextMessageSize=buffers.getBufferSize();
126         _maxBinaryMessageSize=-1;
127     }
128 
129     /* ------------------------------------------------------------ */
130     public WebSocket.Connection getConnection()
131     {
132         return _connection;
133     }
134 
135     /* ------------------------------------------------------------ */
136     public Connection handle() throws IOException
137     {
138         try
139         {
140             // handle the framing protocol
141             boolean progress=true;
142 
143             while (progress)
144             {
145                 int flushed=_generator.flush();
146                 int filled=_parser.parseNext();
147 
148                 progress = flushed>0 || filled>0;
149 
150                 if (filled<0 || flushed<0)
151                 {
152                     _endp.close();
153                     break;
154                 }
155             }
156         }
157         catch(IOException e)
158         {
159             try
160             {
161                 _endp.close();
162             }
163             catch(IOException e2)
164             {
165                 LOG.ignore(e2);
166             }
167             throw e;
168         }
169         finally
170         {
171             if (_endp.isOpen())
172             {
173                 if (_closedIn && _closedOut && _generator.isBufferEmpty())
174                     _endp.close();
175                 else if (_endp.isInputShutdown() && !_closedIn)
176                     closeIn(CLOSE_PROTOCOL,null);
177                 else
178                     checkWriteable();
179             }
180 
181         }
182         return this;
183     }
184 
185     /* ------------------------------------------------------------ */
186     public void onInputShutdown() throws IOException
187     {
188         // TODO
189     }
190 
191     /* ------------------------------------------------------------ */
192     public boolean isIdle()
193     {
194         return _parser.isBufferEmpty() && _generator.isBufferEmpty();
195     }
196 
197     /* ------------------------------------------------------------ */
198     @Override
199     public void onIdleExpired(long idleForMs)
200     {
201         closeOut(WebSocketConnectionD06.CLOSE_NORMAL,"Idle");
202     }
203 
204     /* ------------------------------------------------------------ */
205     public boolean isSuspended()
206     {
207         return false;
208     }
209 
210     /* ------------------------------------------------------------ */
211     public void onClose()
212     {
213         _webSocket.onClose(WebSocketConnectionD06.CLOSE_NORMAL,"");
214     }
215 
216     /* ------------------------------------------------------------ */
217     public synchronized void closeIn(int code,String message)
218     {
219         LOG.debug("ClosedIn {} {}",this,message);
220         try
221         {
222             if (_closedOut)
223                 _endp.close();
224             else
225                 closeOut(code,message);
226         }
227         catch(IOException e)
228         {
229             LOG.ignore(e);
230         }
231         finally
232         {
233             _closedIn=true;
234         }
235     }
236 
237     /* ------------------------------------------------------------ */
238     public synchronized void closeOut(int code,String message)
239     {
240         LOG.debug("ClosedOut {} {}",this,message);
241         try
242         {
243             if (_closedIn || _closedOut)
244                 _endp.close();
245             else
246             {
247                 if (code<=0)
248                     code=WebSocketConnectionD06.CLOSE_NORMAL;
249                 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
250                 bytes[0]=(byte)(code/0x100);
251                 bytes[1]=(byte)(code%0x100);
252                 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_CLOSE,bytes,0,bytes.length);
253             }
254             _generator.flush();
255 
256         }
257         catch(IOException e)
258         {
259             LOG.ignore(e);
260         }
261         finally
262         {
263             _closedOut=true;
264         }
265     }
266 
267     public void shutdown()
268     {
269         final WebSocket.Connection connection = _connection;
270         if (connection != null)
271             connection.close(CLOSE_SHUTDOWN, null);
272     }
273 
274     /* ------------------------------------------------------------ */
275     public void fillBuffersFrom(Buffer buffer)
276     {
277         _parser.fill(buffer);
278     }
279 
280     /* ------------------------------------------------------------ */
281     private void checkWriteable()
282     {
283         if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
284         {
285             ((AsyncEndPoint)_endp).scheduleWrite();
286         }
287     }
288 
289     /* ------------------------------------------------------------ */
290     public List<Extension> getExtensions()
291     {
292         return Collections.emptyList();
293     }
294 
295     protected void onFrameHandshake()
296     {
297         if (_onFrame!=null)
298         {
299             _onFrame.onHandshake(_connection);
300         }
301     }
302 
303     protected void onWebSocketOpen()
304     {
305         _webSocket.onOpen(_connection);
306     }
307 
308     /* ------------------------------------------------------------ */
309     private class FrameConnectionD06 implements WebSocket.FrameConnection
310     {
311         volatile boolean _disconnecting;
312         int _maxTextMessage=WebSocketConnectionD06.this._maxTextMessageSize;
313         int _maxBinaryMessage=WebSocketConnectionD06.this._maxBinaryMessageSize;
314 
315         /* ------------------------------------------------------------ */
316         public synchronized void sendMessage(String content) throws IOException
317         {
318             if (_closedOut)
319                 throw new IOException("closing");
320             byte[] data = content.getBytes(StringUtil.__UTF8);
321             _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length);
322             _generator.flush();
323             checkWriteable();
324         }
325 
326         /* ------------------------------------------------------------ */
327         public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException
328         {
329             if (_closedOut)
330                 throw new IOException("closing");
331             _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length);
332             _generator.flush();
333             checkWriteable();
334         }
335 
336         /* ------------------------------------------------------------ */
337         public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
338         {
339             if (_closedOut)
340                 throw new IOException("closing");
341             _generator.addFrame(flags,opcode,content,offset,length);
342             _generator.flush();
343             checkWriteable();
344         }
345 
346         /* ------------------------------------------------------------ */
347         public void sendControl(byte control, byte[] data, int offset, int length) throws IOException
348         {
349             if (_closedOut)
350                 throw new IOException("closing");
351             _generator.addFrame((byte)0x8,control,data,offset,length);
352             _generator.flush();
353             checkWriteable();
354         }
355 
356         /* ------------------------------------------------------------ */
357         public boolean isMessageComplete(byte flags)
358         {
359             return isLastFrame(flags);
360         }
361 
362         /* ------------------------------------------------------------ */
363         public boolean isOpen()
364         {
365             return _endp!=null&&_endp.isOpen();
366         }
367 
368         /* ------------------------------------------------------------ */
369         public void close(int code, String message)
370         {
371             if (_disconnecting)
372                 return;
373             _disconnecting=true;
374             WebSocketConnectionD06.this.closeOut(code,message);
375         }
376 
377         /* ------------------------------------------------------------ */
378         public void setMaxIdleTime(int ms)
379         {
380             try
381             {
382                 _endp.setMaxIdleTime(ms);
383             }
384             catch(IOException e)
385             {
386                 LOG.warn(e);
387             }
388         }
389 
390         /* ------------------------------------------------------------ */
391         public void setMaxTextMessageSize(int size)
392         {
393             _maxTextMessage=size;
394         }
395 
396         /* ------------------------------------------------------------ */
397         public void setMaxBinaryMessageSize(int size)
398         {
399             _maxBinaryMessage=size;
400         }
401 
402         /* ------------------------------------------------------------ */
403         public int getMaxTextMessageSize()
404         {
405             return _maxTextMessage;
406         }
407 
408         /* ------------------------------------------------------------ */
409         public int getMaxIdleTime()
410         {
411             return _endp.getMaxIdleTime();
412         }
413 
414         /* ------------------------------------------------------------ */
415         public int getMaxBinaryMessageSize()
416         {
417             return _maxBinaryMessage;
418         }
419 
420         /* ------------------------------------------------------------ */
421         public String getProtocol()
422         {
423             return _protocol;
424         }
425 
426         /* ------------------------------------------------------------ */
427         public byte binaryOpcode()
428         {
429             return OP_BINARY;
430         }
431 
432         /* ------------------------------------------------------------ */
433         public byte textOpcode()
434         {
435             return OP_TEXT;
436         }
437 
438         /* ------------------------------------------------------------ */
439         public byte continuationOpcode()
440         {
441             return OP_CONTINUATION;
442         }
443 
444         /* ------------------------------------------------------------ */
445         public byte finMask()
446         {
447             return 0x8;
448         }
449 
450         /* ------------------------------------------------------------ */
451         public boolean isControl(byte opcode)
452         {
453             return isControlFrame(opcode);
454         }
455 
456         /* ------------------------------------------------------------ */
457         public boolean isText(byte opcode)
458         {
459             return opcode==OP_TEXT;
460         }
461 
462         /* ------------------------------------------------------------ */
463         public boolean isBinary(byte opcode)
464         {
465             return opcode==OP_BINARY;
466         }
467 
468         /* ------------------------------------------------------------ */
469         public boolean isContinuation(byte opcode)
470         {
471             return opcode==OP_CONTINUATION;
472         }
473 
474         /* ------------------------------------------------------------ */
475         public boolean isClose(byte opcode)
476         {
477             return opcode==OP_CLOSE;
478         }
479 
480         /* ------------------------------------------------------------ */
481         public boolean isPing(byte opcode)
482         {
483             return opcode==OP_PING;
484         }
485 
486         /* ------------------------------------------------------------ */
487         public boolean isPong(byte opcode)
488         {
489             return opcode==OP_PONG;
490         }
491 
492         /* ------------------------------------------------------------ */
493         public void disconnect()
494         {
495             close();
496         }
497 
498         /* ------------------------------------------------------------ */
499         public void close()
500         {
501             close(CLOSE_NORMAL,null);
502         }
503 
504         /* ------------------------------------------------------------ */
505         @Override
506         public String toString()
507         {
508             return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
509         }
510 
511         public void setAllowFrameFragmentation(boolean allowFragmentation)
512         {
513         }
514 
515         public boolean isAllowFrameFragmentation()
516         {
517             return false;
518         }
519     }
520 
521     /* ------------------------------------------------------------ */
522     /* ------------------------------------------------------------ */
523     /* ------------------------------------------------------------ */
524     private class FrameHandlerD06 implements WebSocketParser.FrameHandler
525     {
526         private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
527         private ByteArrayBuffer _aggregate;
528         private byte _opcode=-1;
529 
530         public void onFrame(byte flags, byte opcode, Buffer buffer)
531         {
532             boolean lastFrame = isLastFrame(flags);
533 
534             synchronized(WebSocketConnectionD06.this)
535             {
536                 // Ignore incoming after a close
537                 if (_closedIn)
538                     return;
539 
540                 try
541                 {
542                     byte[] array=buffer.array();
543 
544                     // Deliver frame if websocket is a FrameWebSocket
545                     if (_onFrame!=null)
546                     {
547                         if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
548                             return;
549                     }
550 
551                     if (_onControl!=null && isControlFrame(opcode))
552                     {
553                         if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
554                             return;
555                     }
556 
557                     switch(opcode)
558                     {
559                         case WebSocketConnectionD06.OP_CONTINUATION:
560                         {
561                             // If text, append to the message buffer
562                             if (_opcode==WebSocketConnectionD06.OP_TEXT && _connection.getMaxTextMessageSize()>=0)
563                             {
564                                 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
565                                 {
566                                     // If this is the last fragment, deliver the text buffer
567                                     if (lastFrame && _onTextMessage!=null)
568                                     {
569                                         _opcode=-1;
570                                         String msg =_utf8.toString();
571                                         _utf8.reset();
572                                         _onTextMessage.onMessage(msg);
573                                     }
574                                 }
575                                 else
576                                 {
577                                     _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
578                                     _utf8.reset();
579                                     _opcode=-1;
580                                 }
581                             }
582                             else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
583                             {
584                                 if (_aggregate.space()<_aggregate.length())
585                                 {
586                                     _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
587                                     _aggregate.clear();
588                                     _opcode=-1;
589                                 }
590                                 else
591                                 {
592                                     _aggregate.put(buffer);
593 
594                                     // If this is the last fragment, deliver
595                                     if (lastFrame && _onBinaryMessage!=null)
596                                     {
597                                         try
598                                         {
599                                             _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
600                                         }
601                                         finally
602                                         {
603                                             _opcode=-1;
604                                             _aggregate.clear();
605                                         }
606                                     }
607                                 }
608                             }
609                             break;
610                         }
611                         case WebSocketConnectionD06.OP_PING:
612                         {
613                             LOG.debug("PING {}",this);
614                             if (!_closedOut)
615                                 _connection.sendControl(WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
616                             break;
617                         }
618 
619                         case WebSocketConnectionD06.OP_PONG:
620                         {
621                             LOG.debug("PONG {}",this);
622                             break;
623                         }
624 
625                         case WebSocketConnectionD06.OP_CLOSE:
626                         {
627                             int code=-1;
628                             String message=null;
629                             if (buffer.length()>=2)
630                             {
631                                 code=buffer.array()[buffer.getIndex()]*0xff+buffer.array()[buffer.getIndex()+1];
632                                 if (buffer.length()>2)
633                                     message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
634                             }
635                             closeIn(code,message);
636                             break;
637                         }
638 
639 
640                         case WebSocketConnectionD06.OP_TEXT:
641                         {
642                             if(_onTextMessage!=null)
643                             {
644                                 if (lastFrame)
645                                 {
646                                     // Deliver the message
647                                     _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
648                                 }
649                                 else
650                                 {
651                                     if (_connection.getMaxTextMessageSize()>=0)
652                                     {
653                                         // If this is a text fragment, append to buffer
654                                         if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
655                                             _opcode=WebSocketConnectionD06.OP_TEXT;
656                                         else
657                                         {
658                                             _utf8.reset();
659                                             _opcode=-1;
660                                             _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
661                                         }
662                                     }
663                                 }
664                             }
665                             break;
666                         }
667 
668                         default:
669                         {
670                             if (_onBinaryMessage!=null)
671                             {
672                                 if (lastFrame)
673                                 {
674                                     _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
675                                 }
676                                 else
677                                 {
678                                     if (_connection.getMaxBinaryMessageSize()>=0)
679                                     {
680                                         if (buffer.length()>_connection.getMaxBinaryMessageSize())
681                                         {
682                                             _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
683                                             if (_aggregate!=null)
684                                                 _aggregate.clear();
685                                             _opcode=-1;
686                                         }
687                                         else
688                                         {
689                                             _opcode=opcode;
690                                             if (_aggregate==null)
691                                                 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
692                                             _aggregate.put(buffer);
693                                         }
694                                     }
695                                 }
696                             }
697                         }
698                     }
699                 }
700                 catch(Throwable th)
701                 {
702                     LOG.warn(th);
703                 }
704             }
705         }
706 
707         public void close(int code,String message)
708         {
709             _connection.close(code,message);
710         }
711 
712         @Override
713         public String toString()
714         {
715             return WebSocketConnectionD06.this.toString()+"FH";
716         }
717     }
718 
719     /* ------------------------------------------------------------ */
720     public static String hashKey(String key)
721     {
722         try
723         {
724             MessageDigest md = MessageDigest.getInstance("SHA1");
725             md.update(key.getBytes("UTF-8"));
726             md.update(MAGIC);
727             return new String(B64Code.encode(md.digest()));
728         }
729         catch (Exception e)
730         {
731             throw new RuntimeException(e);
732         }
733     }
734 }