View Javadoc

1   // ========================================================================
2   // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.websocket;
15  
16  import java.io.IOException;
17  import java.io.UnsupportedEncodingException;
18  import java.security.MessageDigest;
19  import java.util.Collections;
20  import java.util.List;
21  import javax.servlet.http.HttpServletRequest;
22  import javax.servlet.http.HttpServletResponse;
23  
24  import org.eclipse.jetty.io.AbstractConnection;
25  import org.eclipse.jetty.io.AsyncEndPoint;
26  import org.eclipse.jetty.io.Buffer;
27  import org.eclipse.jetty.io.ByteArrayBuffer;
28  import org.eclipse.jetty.io.Connection;
29  import org.eclipse.jetty.io.EndPoint;
30  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
31  import org.eclipse.jetty.util.B64Code;
32  import org.eclipse.jetty.util.StringUtil;
33  import org.eclipse.jetty.util.Utf8StringBuilder;
34  import org.eclipse.jetty.util.log.Log;
35  import org.eclipse.jetty.util.log.Logger;
36  import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
37  import org.eclipse.jetty.websocket.WebSocket.OnControl;
38  import org.eclipse.jetty.websocket.WebSocket.OnFrame;
39  import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
40  
41  public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection
42  {
43      private static final Logger LOG = Log.getLogger(WebSocketConnectionD06.class);
44  
45      final static byte OP_CONTINUATION = 0x00;
46      final static byte OP_CLOSE = 0x01;
47      final static byte OP_PING = 0x02;
48      final static byte OP_PONG = 0x03;
49      final static byte OP_TEXT = 0x04;
50      final static byte OP_BINARY = 0x05;
51  
52      final static int CLOSE_NORMAL=1000;
53      final static int CLOSE_SHUTDOWN=1001;
54      final static int CLOSE_PROTOCOL=1002;
55      final static int CLOSE_BADDATA=1003;
56      final static int CLOSE_LARGE=1004;
57  
58      static boolean isLastFrame(int flags)
59      {
60          return (flags&0x8)!=0;
61      }
62  
63      static boolean isControlFrame(int opcode)
64      {
65          switch(opcode)
66          {
67              case OP_CLOSE:
68              case OP_PING:
69              case OP_PONG:
70                  return true;
71              default:
72                  return false;
73          }
74      }
75  
76  
77      private final static byte[] MAGIC;
78      private final IdleCheck _idle;
79      private final WebSocketParser _parser;
80      private final WebSocketGenerator _generator;
81      private final WebSocket _webSocket;
82      private final OnFrame _onFrame;
83      private final OnBinaryMessage _onBinaryMessage;
84      private final OnTextMessage _onTextMessage;
85      private final OnControl _onControl;
86      private final String _protocol;
87      private volatile boolean _closedIn;
88      private volatile boolean _closedOut;
89      private int _maxTextMessageSize;
90      private int _maxBinaryMessageSize=-1;
91  
92      static
93      {
94          try
95          {
96              MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
97          }
98          catch (UnsupportedEncodingException e)
99          {
100             throw new RuntimeException(e);
101         }
102     }
103 
104     private final WebSocketParser.FrameHandler _frameHandler= new FrameHandlerD06();
105 
106     /* ------------------------------------------------------------ */
107     /* ------------------------------------------------------------ */
108     /* ------------------------------------------------------------ */
109     private final WebSocket.FrameConnection _connection = new FrameConnectionD06();
110 
111 
112     /* ------------------------------------------------------------ */
113     public WebSocketConnectionD06(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
114         throws IOException
115     {
116         super(endpoint,timestamp);
117 
118         if (endpoint instanceof AsyncEndPoint)
119             ((AsyncEndPoint)endpoint).cancelIdle();
120 
121         _endp.setMaxIdleTime(maxIdleTime);
122 
123         _webSocket = websocket;
124         _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
125         _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
126         _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
127         _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
128         _generator = new WebSocketGeneratorD06(buffers, _endp,null);
129         _parser = new WebSocketParserD06(buffers, endpoint, _frameHandler,true);
130         _protocol=protocol;
131 
132         if (_endp instanceof SelectChannelEndPoint)
133         {
134             final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
135             scep.cancelIdle();
136             _idle=new IdleCheck()
137             {
138                 public void access(EndPoint endp)
139                 {
140                     scep.scheduleIdle();
141                 }
142             };
143             scep.scheduleIdle();
144         }
145         else
146         {
147             _idle = new IdleCheck()
148             {
149                 public void access(EndPoint endp)
150                 {}
151             };
152         }
153 
154         _maxTextMessageSize=buffers.getBufferSize();
155         _maxBinaryMessageSize=-1;
156     }
157 
158     /* ------------------------------------------------------------ */
159     public WebSocket.Connection getConnection()
160     {
161         return _connection;
162     }
163 
164     /* ------------------------------------------------------------ */
165     public Connection handle() throws IOException
166     {
167         try
168         {
169             // handle the framing protocol
170             boolean progress=true;
171 
172             while (progress)
173             {
174                 int flushed=_generator.flush();
175                 int filled=_parser.parseNext();
176 
177                 progress = flushed>0 || filled>0;
178 
179                 if (filled<0 || flushed<0)
180                 {
181                     _endp.close();
182                     break;
183                 }
184             }
185         }
186         catch(IOException e)
187         {
188             try
189             {
190                 _endp.close();
191             }
192             catch(IOException e2)
193             {
194                 LOG.ignore(e2);
195             }
196             throw e;
197         }
198         finally
199         {
200             if (_endp.isOpen())
201             {
202                 _idle.access(_endp);
203                 if (_closedIn && _closedOut && _generator.isBufferEmpty())
204                     _endp.close();
205                 else if (_endp.isInputShutdown() && !_closedIn)
206                     closeIn(CLOSE_PROTOCOL,null);
207                 else
208                     checkWriteable();
209             }
210 
211         }
212         return this;
213     }
214 
215     /* ------------------------------------------------------------ */
216     public boolean isIdle()
217     {
218         return _parser.isBufferEmpty() && _generator.isBufferEmpty();
219     }
220 
221     /* ------------------------------------------------------------ */
222     @Override
223     public void idleExpired()
224     {
225         closeOut(WebSocketConnectionD06.CLOSE_NORMAL,"Idle");
226     }
227 
228     /* ------------------------------------------------------------ */
229     public boolean isSuspended()
230     {
231         return false;
232     }
233 
234     /* ------------------------------------------------------------ */
235     public void closed()
236     {
237         _webSocket.onClose(WebSocketConnectionD06.CLOSE_NORMAL,"");
238     }
239 
240     /* ------------------------------------------------------------ */
241     public synchronized void closeIn(int code,String message)
242     {
243         LOG.debug("ClosedIn {} {}",this,message);
244         try
245         {
246             if (_closedOut)
247                 _endp.close();
248             else
249                 closeOut(code,message);
250         }
251         catch(IOException e)
252         {
253             LOG.ignore(e);
254         }
255         finally
256         {
257             _closedIn=true;
258         }
259     }
260 
261     /* ------------------------------------------------------------ */
262     public synchronized void closeOut(int code,String message)
263     {
264         LOG.debug("ClosedOut {} {}",this,message);
265         try
266         {
267             if (_closedIn || _closedOut)
268                 _endp.close();
269             else
270             {
271                 if (code<=0)
272                     code=WebSocketConnectionD06.CLOSE_NORMAL;
273                 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
274                 bytes[0]=(byte)(code/0x100);
275                 bytes[1]=(byte)(code%0x100);
276                 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_CLOSE,bytes,0,bytes.length);
277             }
278             _generator.flush();
279 
280         }
281         catch(IOException e)
282         {
283             LOG.ignore(e);
284         }
285         finally
286         {
287             _closedOut=true;
288         }
289     }
290 
291     /* ------------------------------------------------------------ */
292     public void fillBuffersFrom(Buffer buffer)
293     {
294         _parser.fill(buffer);
295     }
296 
297     /* ------------------------------------------------------------ */
298     private void checkWriteable()
299     {
300         if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
301         {
302             ((AsyncEndPoint)_endp).scheduleWrite();
303         }
304     }
305 
306     /* ------------------------------------------------------------ */
307     public List<Extension> getExtensions()
308     {
309         return Collections.emptyList();
310     }
311 
312     /* ------------------------------------------------------------ */
313     /* ------------------------------------------------------------ */
314     /* ------------------------------------------------------------ */
315     private class FrameConnectionD06 implements WebSocket.FrameConnection
316     {
317         volatile boolean _disconnecting;
318         int _maxTextMessage=WebSocketConnectionD06.this._maxTextMessageSize;
319         int _maxBinaryMessage=WebSocketConnectionD06.this._maxBinaryMessageSize;
320 
321         /* ------------------------------------------------------------ */
322         public synchronized void sendMessage(String content) throws IOException
323         {
324             if (_closedOut)
325                 throw new IOException("closing");
326             byte[] data = content.getBytes(StringUtil.__UTF8);
327             _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length);
328             _generator.flush();
329             checkWriteable();
330             _idle.access(_endp);
331         }
332 
333         /* ------------------------------------------------------------ */
334         public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException
335         {
336             if (_closedOut)
337                 throw new IOException("closing");
338             _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length);
339             _generator.flush();
340             checkWriteable();
341             _idle.access(_endp);
342         }
343 
344         /* ------------------------------------------------------------ */
345         public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
346         {
347             if (_closedOut)
348                 throw new IOException("closing");
349             _generator.addFrame(flags,opcode,content,offset,length);
350             _generator.flush();
351             checkWriteable();
352             _idle.access(_endp);
353         }
354 
355         /* ------------------------------------------------------------ */
356         public void sendControl(byte control, byte[] data, int offset, int length) throws IOException
357         {
358             if (_closedOut)
359                 throw new IOException("closing");
360             _generator.addFrame((byte)0x8,control,data,offset,length);
361             _generator.flush();
362             checkWriteable();
363             _idle.access(_endp);
364         }
365 
366         /* ------------------------------------------------------------ */
367         public boolean isMessageComplete(byte flags)
368         {
369             return isLastFrame(flags);
370         }
371 
372         /* ------------------------------------------------------------ */
373         public boolean isOpen()
374         {
375             return _endp!=null&&_endp.isOpen();
376         }
377 
378         /* ------------------------------------------------------------ */
379         public void close(int code, String message)
380         {
381             if (_disconnecting)
382                 return;
383             _disconnecting=true;
384             WebSocketConnectionD06.this.closeOut(code,message);
385         }
386 
387         /* ------------------------------------------------------------ */
388         public void setMaxIdleTime(int ms)
389         {
390             try
391             {
392                 _endp.setMaxIdleTime(ms);
393             }
394             catch(IOException e)
395             {
396                 LOG.warn(e);
397             }
398         }
399 
400         /* ------------------------------------------------------------ */
401         public void setMaxTextMessageSize(int size)
402         {
403             _maxTextMessage=size;
404         }
405 
406         /* ------------------------------------------------------------ */
407         public void setMaxBinaryMessageSize(int size)
408         {
409             _maxBinaryMessage=size;
410         }
411 
412         /* ------------------------------------------------------------ */
413         public int getMaxTextMessageSize()
414         {
415             return _maxTextMessage;
416         }
417 
418         /* ------------------------------------------------------------ */
419         public int getMaxIdleTime()
420         {
421             return _endp.getMaxIdleTime();
422         }
423 
424         /* ------------------------------------------------------------ */
425         public int getMaxBinaryMessageSize()
426         {
427             return _maxBinaryMessage;
428         }
429 
430         /* ------------------------------------------------------------ */
431         public String getProtocol()
432         {
433             return _protocol;
434         }
435 
436         /* ------------------------------------------------------------ */
437         public byte binaryOpcode()
438         {
439             return OP_BINARY;
440         }
441 
442         /* ------------------------------------------------------------ */
443         public byte textOpcode()
444         {
445             return OP_TEXT;
446         }
447 
448         /* ------------------------------------------------------------ */
449         public byte continuationOpcode()
450         {
451             return OP_CONTINUATION;
452         }
453 
454         /* ------------------------------------------------------------ */
455         public byte finMask()
456         {
457             return 0x8;
458         }
459 
460         /* ------------------------------------------------------------ */
461         public boolean isControl(byte opcode)
462         {
463             return isControlFrame(opcode);
464         }
465 
466         /* ------------------------------------------------------------ */
467         public boolean isText(byte opcode)
468         {
469             return opcode==OP_TEXT;
470         }
471 
472         /* ------------------------------------------------------------ */
473         public boolean isBinary(byte opcode)
474         {
475             return opcode==OP_BINARY;
476         }
477 
478         /* ------------------------------------------------------------ */
479         public boolean isContinuation(byte opcode)
480         {
481             return opcode==OP_CONTINUATION;
482         }
483 
484         /* ------------------------------------------------------------ */
485         public boolean isClose(byte opcode)
486         {
487             return opcode==OP_CLOSE;
488         }
489 
490         /* ------------------------------------------------------------ */
491         public boolean isPing(byte opcode)
492         {
493             return opcode==OP_PING;
494         }
495 
496         /* ------------------------------------------------------------ */
497         public boolean isPong(byte opcode)
498         {
499             return opcode==OP_PONG;
500         }
501 
502         /* ------------------------------------------------------------ */
503         public void disconnect()
504         {
505             close(CLOSE_NORMAL,null);
506         }
507 
508         /* ------------------------------------------------------------ */
509         public String toString()
510         {
511             return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
512         }
513 
514         public void setAllowFrameFragmentation(boolean allowFragmentation)
515         {
516         }
517 
518         public boolean isAllowFrameFragmentation()
519         {
520             return false;
521         }
522     }
523 
524     /* ------------------------------------------------------------ */
525     /* ------------------------------------------------------------ */
526     /* ------------------------------------------------------------ */
527     private class FrameHandlerD06 implements WebSocketParser.FrameHandler
528     {
529         private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
530         private ByteArrayBuffer _aggregate;
531         private byte _opcode=-1;
532 
533         public void onFrame(byte flags, byte opcode, Buffer buffer)
534         {
535             boolean lastFrame = isLastFrame(flags);
536 
537             synchronized(WebSocketConnectionD06.this)
538             {
539                 // Ignore incoming after a close
540                 if (_closedIn)
541                     return;
542 
543                 try
544                 {
545                     byte[] array=buffer.array();
546 
547                     // Deliver frame if websocket is a FrameWebSocket
548                     if (_onFrame!=null)
549                     {
550                         if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
551                             return;
552                     }
553 
554                     if (_onControl!=null && isControlFrame(opcode))
555                     {
556                         if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
557                             return;
558                     }
559 
560                     switch(opcode)
561                     {
562                         case WebSocketConnectionD06.OP_CONTINUATION:
563                         {
564                             // If text, append to the message buffer
565                             if (_opcode==WebSocketConnectionD06.OP_TEXT && _connection.getMaxTextMessageSize()>=0)
566                             {
567                                 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
568                                 {
569                                     // If this is the last fragment, deliver the text buffer
570                                     if (lastFrame && _onTextMessage!=null)
571                                     {
572                                         _opcode=-1;
573                                         String msg =_utf8.toString();
574                                         _utf8.reset();
575                                         _onTextMessage.onMessage(msg);
576                                     }
577                                 }
578                                 else
579                                 {
580                                     _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
581                                     _utf8.reset();
582                                     _opcode=-1;
583                                 }
584                             }
585                             else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
586                             {
587                                 if (_aggregate.space()<_aggregate.length())
588                                 {
589                                     _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
590                                     _aggregate.clear();
591                                     _opcode=-1;
592                                 }
593                                 else
594                                 {
595                                     _aggregate.put(buffer);
596 
597                                     // If this is the last fragment, deliver
598                                     if (lastFrame && _onBinaryMessage!=null)
599                                     {
600                                         try
601                                         {
602                                             _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
603                                         }
604                                         finally
605                                         {
606                                             _opcode=-1;
607                                             _aggregate.clear();
608                                         }
609                                     }
610                                 }
611                             }
612                             break;
613                         }
614                         case WebSocketConnectionD06.OP_PING:
615                         {
616                             LOG.debug("PING {}",this);
617                             if (!_closedOut)
618                                 _connection.sendControl(WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
619                             break;
620                         }
621 
622                         case WebSocketConnectionD06.OP_PONG:
623                         {
624                             LOG.debug("PONG {}",this);
625                             break;
626                         }
627 
628                         case WebSocketConnectionD06.OP_CLOSE:
629                         {
630                             int code=-1;
631                             String message=null;
632                             if (buffer.length()>=2)
633                             {
634                                 code=buffer.array()[buffer.getIndex()]*0xff+buffer.array()[buffer.getIndex()+1];
635                                 if (buffer.length()>2)
636                                     message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
637                             }
638                             closeIn(code,message);
639                             break;
640                         }
641 
642 
643                         case WebSocketConnectionD06.OP_TEXT:
644                         {
645                             if(_onTextMessage!=null)
646                             {
647                                 if (lastFrame)
648                                 {
649                                     // Deliver the message
650                                     _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
651                                 }
652                                 else
653                                 {
654                                     if (_connection.getMaxTextMessageSize()>=0)
655                                     {
656                                         // If this is a text fragment, append to buffer
657                                         if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
658                                             _opcode=WebSocketConnectionD06.OP_TEXT;
659                                         else
660                                         {
661                                             _utf8.reset();
662                                             _opcode=-1;
663                                             _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
664                                         }
665                                     }
666                                 }
667                             }
668                             break;
669                         }
670 
671                         default:
672                         {
673                             if (_onBinaryMessage!=null)
674                             {
675                                 if (lastFrame)
676                                 {
677                                     _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
678                                 }
679                                 else
680                                 {
681                                     if (_connection.getMaxBinaryMessageSize()>=0)
682                                     {
683                                         if (buffer.length()>_connection.getMaxBinaryMessageSize())
684                                         {
685                                             _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
686                                             if (_aggregate!=null)
687                                                 _aggregate.clear();
688                                             _opcode=-1;
689                                         }
690                                         else
691                                         {
692                                             _opcode=opcode;
693                                             if (_aggregate==null)
694                                                 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
695                                             _aggregate.put(buffer);
696                                         }
697                                     }
698                                 }
699                             }
700                         }
701                     }
702                 }
703                 catch(ThreadDeath th)
704                 {
705                     throw th;
706                 }
707                 catch(Throwable th)
708                 {
709                     LOG.warn(th);
710                 }
711             }
712         }
713 
714         public void close(int code,String message)
715         {
716             _connection.close(code,message);
717         }
718 
719         public String toString()
720         {
721             return WebSocketConnectionD06.this.toString()+"FH";
722         }
723     }
724 
725     /* ------------------------------------------------------------ */
726     private interface IdleCheck
727     {
728         void access(EndPoint endp);
729     }
730 
731     /* ------------------------------------------------------------ */
732     public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
733     {
734         String key = request.getHeader("Sec-WebSocket-Key");
735 
736         response.setHeader("Upgrade","WebSocket");
737         response.addHeader("Connection","Upgrade");
738         response.addHeader("Sec-WebSocket-Accept",hashKey(key));
739         if (subprotocol!=null)
740             response.addHeader("Sec-WebSocket-Protocol",subprotocol);
741         response.sendError(101);
742 
743         if (_onFrame!=null)
744             _onFrame.onHandshake(_connection);
745         _webSocket.onOpen(_connection);
746     }
747 
748     /* ------------------------------------------------------------ */
749     public static String hashKey(String key)
750     {
751         try
752         {
753             MessageDigest md = MessageDigest.getInstance("SHA1");
754             md.update(key.getBytes("UTF-8"));
755             md.update(MAGIC);
756             return new String(B64Code.encode(md.digest()));
757         }
758         catch (Exception e)
759         {
760             throw new RuntimeException(e);
761         }
762     }
763 }