View Javadoc

1   // ========================================================================
2   // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.websocket;
15  
16  import java.io.IOException;
17  import java.io.UnsupportedEncodingException;
18  import java.security.MessageDigest;
19  import java.util.Collections;
20  import java.util.List;
21  
22  import javax.servlet.http.HttpServletRequest;
23  import javax.servlet.http.HttpServletResponse;
24  
25  import org.eclipse.jetty.io.AbstractConnection;
26  import org.eclipse.jetty.io.AsyncEndPoint;
27  import org.eclipse.jetty.io.Buffer;
28  import org.eclipse.jetty.io.ByteArrayBuffer;
29  import org.eclipse.jetty.io.Connection;
30  import org.eclipse.jetty.io.EndPoint;
31  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
32  import org.eclipse.jetty.util.B64Code;
33  import org.eclipse.jetty.util.StringUtil;
34  import org.eclipse.jetty.util.Utf8StringBuilder;
35  import org.eclipse.jetty.util.log.Log;
36  import org.eclipse.jetty.util.log.Logger;
37  import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
38  import org.eclipse.jetty.websocket.WebSocket.OnControl;
39  import org.eclipse.jetty.websocket.WebSocket.OnFrame;
40  import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
41  
42  public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection
43  {
44      private static final Logger LOG = Log.getLogger(WebSocketConnectionD06.class);
45  
46      final static byte OP_CONTINUATION = 0x00;
47      final static byte OP_CLOSE = 0x01;
48      final static byte OP_PING = 0x02;
49      final static byte OP_PONG = 0x03;
50      final static byte OP_TEXT = 0x04;
51      final static byte OP_BINARY = 0x05;
52      
53      final static int CLOSE_NORMAL=1000;
54      final static int CLOSE_SHUTDOWN=1001;
55      final static int CLOSE_PROTOCOL=1002;
56      final static int CLOSE_BADDATA=1003;
57      final static int CLOSE_LARGE=1004;
58      
59      static boolean isLastFrame(int flags)
60      {
61          return (flags&0x8)!=0;
62      }
63      
64      static boolean isControlFrame(int opcode)
65      {
66          switch(opcode)
67          {
68              case OP_CLOSE:
69              case OP_PING:
70              case OP_PONG:
71                  return true;
72              default:
73                  return false;
74          }
75      }
76      
77      
78      private final static byte[] MAGIC;
79      private final IdleCheck _idle;
80      private final WebSocketParser _parser;
81      private final WebSocketGenerator _generator;
82      private final WebSocket _webSocket;
83      private final OnFrame _onFrame;
84      private final OnBinaryMessage _onBinaryMessage;
85      private final OnTextMessage _onTextMessage;
86      private final OnControl _onControl;
87      private final String _protocol;
88      private boolean _closedIn;
89      private boolean _closedOut;
90      private int _maxTextMessageSize;
91      private int _maxBinaryMessageSize=-1;
92  
93      static
94      {
95          try
96          {
97              MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
98          }
99          catch (UnsupportedEncodingException e)
100         {
101             throw new RuntimeException(e);
102         }
103     }
104     
105     private final WebSocketParser.FrameHandler _frameHandler= new FrameHandlerD06();
106 
107     /* ------------------------------------------------------------ */
108     /* ------------------------------------------------------------ */
109     /* ------------------------------------------------------------ */
110     private final WebSocket.FrameConnection _connection = new FrameConnectionD06();
111     
112 
113     /* ------------------------------------------------------------ */
114     public WebSocketConnectionD06(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
115         throws IOException
116     {
117         super(endpoint,timestamp);
118         
119         // TODO - can we use the endpoint idle mechanism?
120         if (endpoint instanceof AsyncEndPoint)
121             ((AsyncEndPoint)endpoint).cancelIdle();
122         
123         _endp.setMaxIdleTime(maxIdleTime);
124         
125         _webSocket = websocket;
126         _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
127         _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
128         _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
129         _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
130         _generator = new WebSocketGeneratorD06(buffers, _endp,null);
131         _parser = new WebSocketParserD06(buffers, endpoint, _frameHandler,true);
132         _protocol=protocol;
133 
134         // TODO should these be AsyncEndPoint checks/calls?
135         if (_endp instanceof SelectChannelEndPoint)
136         {
137             final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
138             scep.cancelIdle();
139             _idle=new IdleCheck()
140             {
141                 public void access(EndPoint endp)
142                 {
143                     scep.scheduleIdle();
144                 }
145             };
146             scep.scheduleIdle();
147         }
148         else
149         {
150             _idle = new IdleCheck()
151             {
152                 public void access(EndPoint endp)
153                 {}
154             };
155         }
156         
157         _maxTextMessageSize=buffers.getBufferSize(); 
158         _maxBinaryMessageSize=-1;
159     }
160 
161     /* ------------------------------------------------------------ */
162     public WebSocket.Connection getConnection()
163     {
164         return _connection;
165     }
166     
167     /* ------------------------------------------------------------ */
168     public Connection handle() throws IOException
169     {
170         try
171         {
172             // handle the framing protocol
173             boolean progress=true;
174 
175             while (progress)
176             {
177                 int flushed=_generator.flush();
178                 int filled=_parser.parseNext();
179 
180                 progress = flushed>0 || filled>0;
181                 
182                 if (filled<0 || flushed<0)
183                 {
184                     _endp.close();
185                     break;
186                 }
187             }
188         }
189         catch(IOException e)
190         {
191             try
192             {
193                 _endp.close();
194             }
195             catch(IOException e2)
196             {
197                 LOG.ignore(e2);
198             }
199             throw e;
200         }
201         finally
202         {
203             if (_endp.isOpen())
204             {
205                 _idle.access(_endp);
206                 if (_closedIn && _closedOut && _generator.isBufferEmpty())
207                     _endp.close();
208                 else if (_endp.isInputShutdown() && !_closedIn)
209                     closeIn(CLOSE_PROTOCOL,null);
210                 else
211                     checkWriteable();
212             }
213            
214         }
215         return this;
216     }
217 
218     /* ------------------------------------------------------------ */
219     public boolean isIdle()
220     {
221         return _parser.isBufferEmpty() && _generator.isBufferEmpty();
222     }
223 
224     /* ------------------------------------------------------------ */
225     @Override
226     public void idleExpired()
227     {
228         closeOut(WebSocketConnectionD06.CLOSE_NORMAL,"Idle");
229     }
230 
231     /* ------------------------------------------------------------ */
232     public boolean isSuspended()
233     {
234         return false;
235     }
236 
237     /* ------------------------------------------------------------ */
238     public void closed()
239     {
240         _webSocket.onClose(WebSocketConnectionD06.CLOSE_NORMAL,"");
241     }
242 
243     /* ------------------------------------------------------------ */
244     public synchronized void closeIn(int code,String message)
245     {
246         LOG.debug("ClosedIn {} {}",this,message);
247         try
248         {
249             if (_closedOut)
250                 _endp.close();
251             else 
252                 closeOut(code,message);
253         }
254         catch(IOException e)
255         {
256             LOG.ignore(e);
257         }
258         finally
259         {
260             _closedIn=true;
261         }
262     }
263 
264     /* ------------------------------------------------------------ */
265     public synchronized void closeOut(int code,String message)
266     {
267         LOG.debug("ClosedOut {} {}",this,message);
268         try
269         {
270             if (_closedIn || _closedOut)
271                 _endp.close();
272             else 
273             {
274                 if (code<=0)
275                     code=WebSocketConnectionD06.CLOSE_NORMAL;
276                 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
277                 bytes[0]=(byte)(code/0x100);
278                 bytes[1]=(byte)(code%0x100);
279                 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_CLOSE,bytes,0,bytes.length);
280             }
281             _generator.flush();
282             
283         }
284         catch(IOException e)
285         {
286             LOG.ignore(e);
287         }
288         finally
289         {
290             _closedOut=true;
291         }
292     }
293 
294     /* ------------------------------------------------------------ */
295     public void fillBuffersFrom(Buffer buffer)
296     {
297         _parser.fill(buffer);
298     }
299 
300     /* ------------------------------------------------------------ */
301     private void checkWriteable()
302     {
303         if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
304         {
305             ((AsyncEndPoint)_endp).scheduleWrite();
306         }
307     }
308 
309     /* ------------------------------------------------------------ */
310     public List<Extension> getExtensions()
311     {
312         return Collections.emptyList();
313     }
314 
315     /* ------------------------------------------------------------ */
316     /* ------------------------------------------------------------ */
317     /* ------------------------------------------------------------ */
318     private class FrameConnectionD06 implements WebSocket.FrameConnection
319     {
320         volatile boolean _disconnecting;
321         int _maxTextMessage=WebSocketConnectionD06.this._maxTextMessageSize;
322         int _maxBinaryMessage=WebSocketConnectionD06.this._maxBinaryMessageSize;
323 
324         /* ------------------------------------------------------------ */
325         public synchronized void sendMessage(String content) throws IOException
326         {
327             if (_closedOut)
328                 throw new IOException("closing");
329             byte[] data = content.getBytes(StringUtil.__UTF8);
330             _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length);
331             _generator.flush();
332             checkWriteable();
333             _idle.access(_endp);
334         }
335 
336         /* ------------------------------------------------------------ */
337         public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException
338         {
339             if (_closedOut)
340                 throw new IOException("closing");
341             _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length);
342             _generator.flush();
343             checkWriteable();
344             _idle.access(_endp);
345         }
346 
347         /* ------------------------------------------------------------ */
348         public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
349         {
350             if (_closedOut)
351                 throw new IOException("closing");
352             _generator.addFrame(flags,opcode,content,offset,length);
353             _generator.flush();
354             checkWriteable();
355             _idle.access(_endp);
356         }
357 
358         /* ------------------------------------------------------------ */
359         public void sendControl(byte control, byte[] data, int offset, int length) throws IOException
360         {
361             if (_closedOut)
362                 throw new IOException("closing");
363             _generator.addFrame((byte)0x8,control,data,offset,length);
364             _generator.flush();
365             checkWriteable();
366             _idle.access(_endp);
367         }
368 
369         /* ------------------------------------------------------------ */
370         public boolean isMessageComplete(byte flags)
371         {
372             return isLastFrame(flags);
373         }
374 
375         /* ------------------------------------------------------------ */
376         public boolean isOpen()
377         {
378             return _endp!=null&&_endp.isOpen();
379         }
380 
381         /* ------------------------------------------------------------ */
382         public void close(int code, String message)
383         {
384             if (_disconnecting)
385                 return;
386             _disconnecting=true;
387             WebSocketConnectionD06.this.closeOut(code,message);
388         }
389 
390         /* ------------------------------------------------------------ */
391         public void setMaxIdleTime(int ms) 
392         {
393             try
394             {
395                 _endp.setMaxIdleTime(ms);
396             }
397             catch(IOException e)
398             {
399                 LOG.warn(e);
400             }
401         }
402 
403         /* ------------------------------------------------------------ */
404         public void setMaxTextMessageSize(int size)
405         {
406             _maxTextMessage=size;
407         }
408 
409         /* ------------------------------------------------------------ */
410         public void setMaxBinaryMessageSize(int size)
411         {
412             _maxBinaryMessage=size;
413         }
414 
415         /* ------------------------------------------------------------ */
416         public int getMaxTextMessageSize()
417         {
418             return _maxTextMessage;
419         }
420 
421         /* ------------------------------------------------------------ */
422         public int getMaxIdleTime()
423         {
424             return _endp.getMaxIdleTime();
425         }
426 
427         /* ------------------------------------------------------------ */
428         public int getMaxBinaryMessageSize()
429         {
430             return _maxBinaryMessage;
431         }
432 
433         /* ------------------------------------------------------------ */
434         public String getProtocol()
435         {
436             return _protocol;
437         }
438 
439         /* ------------------------------------------------------------ */
440         public byte binaryOpcode()
441         {
442             return OP_BINARY;
443         }
444 
445         /* ------------------------------------------------------------ */
446         public byte textOpcode()
447         {
448             return OP_TEXT;
449         }
450 
451         /* ------------------------------------------------------------ */
452         public byte continuationOpcode()
453         {
454             return OP_CONTINUATION;
455         }
456 
457         /* ------------------------------------------------------------ */
458         public byte finMask()
459         {
460             return 0x8;
461         }
462         
463         /* ------------------------------------------------------------ */
464         public boolean isControl(byte opcode)
465         {
466             return isControlFrame(opcode);
467         }
468 
469         /* ------------------------------------------------------------ */
470         public boolean isText(byte opcode)
471         {
472             return opcode==OP_TEXT;
473         }
474 
475         /* ------------------------------------------------------------ */
476         public boolean isBinary(byte opcode)
477         {
478             return opcode==OP_BINARY;
479         }
480 
481         /* ------------------------------------------------------------ */
482         public boolean isContinuation(byte opcode)
483         {
484             return opcode==OP_CONTINUATION;
485         }
486 
487         /* ------------------------------------------------------------ */
488         public boolean isClose(byte opcode)
489         {
490             return opcode==OP_CLOSE;
491         }
492 
493         /* ------------------------------------------------------------ */
494         public boolean isPing(byte opcode)
495         {
496             return opcode==OP_PING;
497         }
498 
499         /* ------------------------------------------------------------ */
500         public boolean isPong(byte opcode)
501         {
502             return opcode==OP_PONG;
503         }
504 
505         /* ------------------------------------------------------------ */
506         public void disconnect()
507         {
508             close(CLOSE_NORMAL,null);
509         }
510 
511         /* ------------------------------------------------------------ */
512         public String toString()
513         {
514             return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
515         }
516 
517         public void setAllowFrameFragmentation(boolean allowFragmentation)
518         {
519         }
520 
521         public boolean isAllowFrameFragmentation()
522         {
523             return false;
524         }
525     }
526 
527     /* ------------------------------------------------------------ */
528     /* ------------------------------------------------------------ */
529     /* ------------------------------------------------------------ */
530     private class FrameHandlerD06 implements WebSocketParser.FrameHandler
531     {
532         private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
533         private ByteArrayBuffer _aggregate;
534         private byte _opcode=-1;
535 
536         public void onFrame(byte flags, byte opcode, Buffer buffer)
537         {
538             boolean lastFrame = isLastFrame(flags); 
539             
540             synchronized(WebSocketConnectionD06.this)
541             {
542                 // Ignore incoming after a close
543                 if (_closedIn)
544                     return;
545                 
546                 try
547                 {
548                     byte[] array=buffer.array();
549 
550                     // Deliver frame if websocket is a FrameWebSocket
551                     if (_onFrame!=null)
552                     {
553                         if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
554                             return;
555                     }
556                     
557                     if (_onControl!=null && isControlFrame(opcode))
558                     {
559                         if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
560                             return;
561                     }
562                     
563                     switch(opcode)
564                     {
565                         case WebSocketConnectionD06.OP_CONTINUATION:
566                         {
567                             // If text, append to the message buffer
568                             if (_opcode==WebSocketConnectionD06.OP_TEXT && _connection.getMaxTextMessageSize()>=0)
569                             {
570                                 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
571                                 {
572                                     // If this is the last fragment, deliver the text buffer
573                                     if (lastFrame && _onTextMessage!=null)
574                                     {
575                                         _opcode=-1;
576                                         String msg =_utf8.toString();
577                                         _utf8.reset();
578                                         _onTextMessage.onMessage(msg);
579                                     }
580                                 }
581                                 else
582                                 {
583                                     _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
584                                     _utf8.reset();
585                                     _opcode=-1;
586                                 }    
587                             }
588                             else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
589                             {
590                                 if (_aggregate.space()<_aggregate.length())
591                                 {
592                                     _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
593                                     _aggregate.clear();
594                                     _opcode=-1;
595                                 }
596                                 else
597                                 {
598                                     _aggregate.put(buffer);
599 
600                                     // If this is the last fragment, deliver
601                                     if (lastFrame && _onBinaryMessage!=null)
602                                     {
603                                         try
604                                         {
605                                             _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
606                                         }
607                                         finally
608                                         {
609                                             _opcode=-1;
610                                             _aggregate.clear();
611                                         }
612                                     }
613                                 }
614                             }
615                             break;
616                         }
617                         case WebSocketConnectionD06.OP_PING:
618                         {
619                             LOG.debug("PING {}",this);
620                             if (!_closedOut)
621                                 _connection.sendControl(WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
622                             break;
623                         }
624 
625                         case WebSocketConnectionD06.OP_PONG:
626                         {
627                             LOG.debug("PONG {}",this);
628                             break;
629                         }
630 
631                         case WebSocketConnectionD06.OP_CLOSE:
632                         {
633                             int code=-1;
634                             String message=null;
635                             if (buffer.length()>=2)
636                             {
637                                 code=buffer.array()[buffer.getIndex()]*0xff+buffer.array()[buffer.getIndex()+1];
638                                 if (buffer.length()>2)
639                                     message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
640                             }
641                             closeIn(code,message);
642                             break;
643                         }
644 
645 
646                         case WebSocketConnectionD06.OP_TEXT:
647                         {
648                             if(_onTextMessage!=null)
649                             {
650                                 if (lastFrame)
651                                 {
652                                     // Deliver the message
653                                     _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
654                                 }
655                                 else 
656                                 {
657                                     if (_connection.getMaxTextMessageSize()>=0)
658                                     {
659                                         // If this is a text fragment, append to buffer
660                                         if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
661                                             _opcode=WebSocketConnectionD06.OP_TEXT;
662                                         else
663                                         {
664                                             _utf8.reset();
665                                             _opcode=-1;                                    
666                                             _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
667                                         }
668                                     }
669                                 }
670                             }
671                             break;
672                         }
673 
674                         default:
675                         {
676                             if (_onBinaryMessage!=null)
677                             {
678                                 if (lastFrame)
679                                 {
680                                     _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
681                                 }
682                                 else   
683                                 {
684                                     if (_connection.getMaxBinaryMessageSize()>=0)
685                                     {
686                                         if (buffer.length()>_connection.getMaxBinaryMessageSize())
687                                         {
688                                             _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
689                                             if (_aggregate!=null)
690                                                 _aggregate.clear();
691                                             _opcode=-1;
692                                         }
693                                         else
694                                         {
695                                             _opcode=opcode;
696                                             if (_aggregate==null)
697                                                 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
698                                             _aggregate.put(buffer);
699                                         }
700                                     }
701                                 }
702                             }
703                         }      
704                     }
705                 }
706                 catch(ThreadDeath th)
707                 {
708                     throw th;
709                 }
710                 catch(Throwable th)
711                 {
712                     LOG.warn(th);
713                 }
714             }
715         }
716 
717         public void close(int code,String message)
718         {
719             _connection.close(code,message);
720         }
721 
722         public String toString()
723         {
724             return WebSocketConnectionD06.this.toString()+"FH";
725         }
726     }
727 
728     /* ------------------------------------------------------------ */
729     private interface IdleCheck
730     {
731         void access(EndPoint endp);
732     }
733 
734     /* ------------------------------------------------------------ */
735     public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
736     {
737         String uri=request.getRequestURI();
738         String query=request.getQueryString();
739         if (query!=null && query.length()>0)
740             uri+="?"+query;
741         String key = request.getHeader("Sec-WebSocket-Key");
742         
743         response.setHeader("Upgrade","WebSocket");
744         response.addHeader("Connection","Upgrade");
745         response.addHeader("Sec-WebSocket-Accept",hashKey(key));
746         if (subprotocol!=null)
747             response.addHeader("Sec-WebSocket-Protocol",subprotocol);
748         response.sendError(101);
749 
750         if (_onFrame!=null)
751             _onFrame.onHandshake(_connection);
752         _webSocket.onOpen(_connection);
753     }
754 
755     /* ------------------------------------------------------------ */
756     public static String hashKey(String key)
757     {
758         try
759         {
760             MessageDigest md = MessageDigest.getInstance("SHA1");
761             md.update(key.getBytes("UTF-8"));
762             md.update(MAGIC);
763             return new String(B64Code.encode(md.digest()));
764         }
765         catch (Exception e)
766         {
767             throw new RuntimeException(e);
768         }
769     }
770 }