View Javadoc

1   // ========================================================================
2   // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.websocket;
15  
16  import java.io.IOException;
17  import java.io.UnsupportedEncodingException;
18  import java.security.MessageDigest;
19  import java.util.Collections;
20  import java.util.List;
21  
22  import javax.servlet.http.HttpServletRequest;
23  import javax.servlet.http.HttpServletResponse;
24  
25  import org.eclipse.jetty.io.AbstractConnection;
26  import org.eclipse.jetty.io.AsyncEndPoint;
27  import org.eclipse.jetty.io.Buffer;
28  import org.eclipse.jetty.io.ByteArrayBuffer;
29  import org.eclipse.jetty.io.Connection;
30  import org.eclipse.jetty.io.EndPoint;
31  import org.eclipse.jetty.util.B64Code;
32  import org.eclipse.jetty.util.StringUtil;
33  import org.eclipse.jetty.util.Utf8StringBuilder;
34  import org.eclipse.jetty.util.log.Log;
35  import org.eclipse.jetty.util.log.Logger;
36  import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
37  import org.eclipse.jetty.websocket.WebSocket.OnControl;
38  import org.eclipse.jetty.websocket.WebSocket.OnFrame;
39  import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
40  
41  public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection
42  {
43      private static final Logger LOG = Log.getLogger(WebSocketConnectionD06.class);
44  
45      final static byte OP_CONTINUATION = 0x00;
46      final static byte OP_CLOSE = 0x01;
47      final static byte OP_PING = 0x02;
48      final static byte OP_PONG = 0x03;
49      final static byte OP_TEXT = 0x04;
50      final static byte OP_BINARY = 0x05;
51  
52      final static int CLOSE_NORMAL=1000;
53      final static int CLOSE_SHUTDOWN=1001;
54      final static int CLOSE_PROTOCOL=1002;
55      final static int CLOSE_BADDATA=1003;
56      final static int CLOSE_LARGE=1004;
57  
58      static boolean isLastFrame(int flags)
59      {
60          return (flags&0x8)!=0;
61      }
62  
63      static boolean isControlFrame(int opcode)
64      {
65          switch(opcode)
66          {
67              case OP_CLOSE:
68              case OP_PING:
69              case OP_PONG:
70                  return true;
71              default:
72                  return false;
73          }
74      }
75  
76  
77      private final static byte[] MAGIC;
78      private final WebSocketParser _parser;
79      private final WebSocketGenerator _generator;
80      private final WebSocket _webSocket;
81      private final OnFrame _onFrame;
82      private final OnBinaryMessage _onBinaryMessage;
83      private final OnTextMessage _onTextMessage;
84      private final OnControl _onControl;
85      private final String _protocol;
86      private volatile boolean _closedIn;
87      private volatile boolean _closedOut;
88      private int _maxTextMessageSize;
89      private int _maxBinaryMessageSize=-1;
90  
91      static
92      {
93          try
94          {
95              MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
96          }
97          catch (UnsupportedEncodingException e)
98          {
99              throw new RuntimeException(e);
100         }
101     }
102 
103     private final WebSocketParser.FrameHandler _frameHandler= new FrameHandlerD06();
104 
105     /* ------------------------------------------------------------ */
106     /* ------------------------------------------------------------ */
107     /* ------------------------------------------------------------ */
108     private final WebSocket.FrameConnection _connection = new FrameConnectionD06();
109 
110 
111     /* ------------------------------------------------------------ */
112     public WebSocketConnectionD06(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
113         throws IOException
114     {
115         super(endpoint,timestamp);
116 
117         _endp.setMaxIdleTime(maxIdleTime);
118 
119         _webSocket = websocket;
120         _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
121         _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
122         _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
123         _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
124         _generator = new WebSocketGeneratorD06(buffers, _endp,null);
125         _parser = new WebSocketParserD06(buffers, endpoint, _frameHandler,true);
126         _protocol=protocol;
127 
128         _maxTextMessageSize=buffers.getBufferSize();
129         _maxBinaryMessageSize=-1;
130     }
131 
132     /* ------------------------------------------------------------ */
133     public WebSocket.Connection getConnection()
134     {
135         return _connection;
136     }
137 
138     /* ------------------------------------------------------------ */
139     public Connection handle() throws IOException
140     {
141         try
142         {
143             // handle the framing protocol
144             boolean progress=true;
145 
146             while (progress)
147             {
148                 int flushed=_generator.flush();
149                 int filled=_parser.parseNext();
150 
151                 progress = flushed>0 || filled>0;
152 
153                 if (filled<0 || flushed<0)
154                 {
155                     _endp.close();
156                     break;
157                 }
158             }
159         }
160         catch(IOException e)
161         {
162             try
163             {
164                 _endp.close();
165             }
166             catch(IOException e2)
167             {
168                 LOG.ignore(e2);
169             }
170             throw e;
171         }
172         finally
173         {
174             if (_endp.isOpen())
175             {
176                 if (_closedIn && _closedOut && _generator.isBufferEmpty())
177                     _endp.close();
178                 else if (_endp.isInputShutdown() && !_closedIn)
179                     closeIn(CLOSE_PROTOCOL,null);
180                 else
181                     checkWriteable();
182             }
183 
184         }
185         return this;
186     }
187 
188     /* ------------------------------------------------------------ */
189     public void onInputShutdown() throws IOException
190     {
191         // TODO
192     }
193 
194     /* ------------------------------------------------------------ */
195     public boolean isIdle()
196     {
197         return _parser.isBufferEmpty() && _generator.isBufferEmpty();
198     }
199 
200     /* ------------------------------------------------------------ */
201     @Override
202     public void onIdleExpired()
203     {
204         closeOut(WebSocketConnectionD06.CLOSE_NORMAL,"Idle");
205     }
206 
207     /* ------------------------------------------------------------ */
208     public boolean isSuspended()
209     {
210         return false;
211     }
212 
213     /* ------------------------------------------------------------ */
214     public void onClose()
215     {
216         _webSocket.onClose(WebSocketConnectionD06.CLOSE_NORMAL,"");
217     }
218 
219     /* ------------------------------------------------------------ */
220     public synchronized void closeIn(int code,String message)
221     {
222         LOG.debug("ClosedIn {} {}",this,message);
223         try
224         {
225             if (_closedOut)
226                 _endp.close();
227             else
228                 closeOut(code,message);
229         }
230         catch(IOException e)
231         {
232             LOG.ignore(e);
233         }
234         finally
235         {
236             _closedIn=true;
237         }
238     }
239 
240     /* ------------------------------------------------------------ */
241     public synchronized void closeOut(int code,String message)
242     {
243         LOG.debug("ClosedOut {} {}",this,message);
244         try
245         {
246             if (_closedIn || _closedOut)
247                 _endp.close();
248             else
249             {
250                 if (code<=0)
251                     code=WebSocketConnectionD06.CLOSE_NORMAL;
252                 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
253                 bytes[0]=(byte)(code/0x100);
254                 bytes[1]=(byte)(code%0x100);
255                 _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_CLOSE,bytes,0,bytes.length);
256             }
257             _generator.flush();
258 
259         }
260         catch(IOException e)
261         {
262             LOG.ignore(e);
263         }
264         finally
265         {
266             _closedOut=true;
267         }
268     }
269 
270     /* ------------------------------------------------------------ */
271     public void fillBuffersFrom(Buffer buffer)
272     {
273         _parser.fill(buffer);
274     }
275 
276     /* ------------------------------------------------------------ */
277     private void checkWriteable()
278     {
279         if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
280         {
281             ((AsyncEndPoint)_endp).scheduleWrite();
282         }
283     }
284 
285     /* ------------------------------------------------------------ */
286     public List<Extension> getExtensions()
287     {
288         return Collections.emptyList();
289     }
290 
291     /* ------------------------------------------------------------ */
292     /* ------------------------------------------------------------ */
293     /* ------------------------------------------------------------ */
294     private class FrameConnectionD06 implements WebSocket.FrameConnection
295     {
296         volatile boolean _disconnecting;
297         int _maxTextMessage=WebSocketConnectionD06.this._maxTextMessageSize;
298         int _maxBinaryMessage=WebSocketConnectionD06.this._maxBinaryMessageSize;
299 
300         /* ------------------------------------------------------------ */
301         public synchronized void sendMessage(String content) throws IOException
302         {
303             if (_closedOut)
304                 throw new IOException("closing");
305             byte[] data = content.getBytes(StringUtil.__UTF8);
306             _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length);
307             _generator.flush();
308             checkWriteable();
309         }
310 
311         /* ------------------------------------------------------------ */
312         public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException
313         {
314             if (_closedOut)
315                 throw new IOException("closing");
316             _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length);
317             _generator.flush();
318             checkWriteable();
319         }
320 
321         /* ------------------------------------------------------------ */
322         public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
323         {
324             if (_closedOut)
325                 throw new IOException("closing");
326             _generator.addFrame(flags,opcode,content,offset,length);
327             _generator.flush();
328             checkWriteable();
329         }
330 
331         /* ------------------------------------------------------------ */
332         public void sendControl(byte control, byte[] data, int offset, int length) throws IOException
333         {
334             if (_closedOut)
335                 throw new IOException("closing");
336             _generator.addFrame((byte)0x8,control,data,offset,length);
337             _generator.flush();
338             checkWriteable();
339         }
340 
341         /* ------------------------------------------------------------ */
342         public boolean isMessageComplete(byte flags)
343         {
344             return isLastFrame(flags);
345         }
346 
347         /* ------------------------------------------------------------ */
348         public boolean isOpen()
349         {
350             return _endp!=null&&_endp.isOpen();
351         }
352 
353         /* ------------------------------------------------------------ */
354         public void close(int code, String message)
355         {
356             if (_disconnecting)
357                 return;
358             _disconnecting=true;
359             WebSocketConnectionD06.this.closeOut(code,message);
360         }
361 
362         /* ------------------------------------------------------------ */
363         public void setMaxIdleTime(int ms)
364         {
365             try
366             {
367                 _endp.setMaxIdleTime(ms);
368             }
369             catch(IOException e)
370             {
371                 LOG.warn(e);
372             }
373         }
374 
375         /* ------------------------------------------------------------ */
376         public void setMaxTextMessageSize(int size)
377         {
378             _maxTextMessage=size;
379         }
380 
381         /* ------------------------------------------------------------ */
382         public void setMaxBinaryMessageSize(int size)
383         {
384             _maxBinaryMessage=size;
385         }
386 
387         /* ------------------------------------------------------------ */
388         public int getMaxTextMessageSize()
389         {
390             return _maxTextMessage;
391         }
392 
393         /* ------------------------------------------------------------ */
394         public int getMaxIdleTime()
395         {
396             return _endp.getMaxIdleTime();
397         }
398 
399         /* ------------------------------------------------------------ */
400         public int getMaxBinaryMessageSize()
401         {
402             return _maxBinaryMessage;
403         }
404 
405         /* ------------------------------------------------------------ */
406         public String getProtocol()
407         {
408             return _protocol;
409         }
410 
411         /* ------------------------------------------------------------ */
412         public byte binaryOpcode()
413         {
414             return OP_BINARY;
415         }
416 
417         /* ------------------------------------------------------------ */
418         public byte textOpcode()
419         {
420             return OP_TEXT;
421         }
422 
423         /* ------------------------------------------------------------ */
424         public byte continuationOpcode()
425         {
426             return OP_CONTINUATION;
427         }
428 
429         /* ------------------------------------------------------------ */
430         public byte finMask()
431         {
432             return 0x8;
433         }
434 
435         /* ------------------------------------------------------------ */
436         public boolean isControl(byte opcode)
437         {
438             return isControlFrame(opcode);
439         }
440 
441         /* ------------------------------------------------------------ */
442         public boolean isText(byte opcode)
443         {
444             return opcode==OP_TEXT;
445         }
446 
447         /* ------------------------------------------------------------ */
448         public boolean isBinary(byte opcode)
449         {
450             return opcode==OP_BINARY;
451         }
452 
453         /* ------------------------------------------------------------ */
454         public boolean isContinuation(byte opcode)
455         {
456             return opcode==OP_CONTINUATION;
457         }
458 
459         /* ------------------------------------------------------------ */
460         public boolean isClose(byte opcode)
461         {
462             return opcode==OP_CLOSE;
463         }
464 
465         /* ------------------------------------------------------------ */
466         public boolean isPing(byte opcode)
467         {
468             return opcode==OP_PING;
469         }
470 
471         /* ------------------------------------------------------------ */
472         public boolean isPong(byte opcode)
473         {
474             return opcode==OP_PONG;
475         }
476 
477         /* ------------------------------------------------------------ */
478         public void disconnect()
479         {
480             close();
481         }
482 
483         /* ------------------------------------------------------------ */
484         public void close()
485         {
486             close(CLOSE_NORMAL,null);
487         }
488 
489         /* ------------------------------------------------------------ */
490         public String toString()
491         {
492             return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
493         }
494 
495         public void setAllowFrameFragmentation(boolean allowFragmentation)
496         {
497         }
498 
499         public boolean isAllowFrameFragmentation()
500         {
501             return false;
502         }
503     }
504 
505     /* ------------------------------------------------------------ */
506     /* ------------------------------------------------------------ */
507     /* ------------------------------------------------------------ */
508     private class FrameHandlerD06 implements WebSocketParser.FrameHandler
509     {
510         private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
511         private ByteArrayBuffer _aggregate;
512         private byte _opcode=-1;
513 
514         public void onFrame(byte flags, byte opcode, Buffer buffer)
515         {
516             boolean lastFrame = isLastFrame(flags);
517 
518             synchronized(WebSocketConnectionD06.this)
519             {
520                 // Ignore incoming after a close
521                 if (_closedIn)
522                     return;
523 
524                 try
525                 {
526                     byte[] array=buffer.array();
527 
528                     // Deliver frame if websocket is a FrameWebSocket
529                     if (_onFrame!=null)
530                     {
531                         if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
532                             return;
533                     }
534 
535                     if (_onControl!=null && isControlFrame(opcode))
536                     {
537                         if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
538                             return;
539                     }
540 
541                     switch(opcode)
542                     {
543                         case WebSocketConnectionD06.OP_CONTINUATION:
544                         {
545                             // If text, append to the message buffer
546                             if (_opcode==WebSocketConnectionD06.OP_TEXT && _connection.getMaxTextMessageSize()>=0)
547                             {
548                                 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
549                                 {
550                                     // If this is the last fragment, deliver the text buffer
551                                     if (lastFrame && _onTextMessage!=null)
552                                     {
553                                         _opcode=-1;
554                                         String msg =_utf8.toString();
555                                         _utf8.reset();
556                                         _onTextMessage.onMessage(msg);
557                                     }
558                                 }
559                                 else
560                                 {
561                                     _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
562                                     _utf8.reset();
563                                     _opcode=-1;
564                                 }
565                             }
566                             else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
567                             {
568                                 if (_aggregate.space()<_aggregate.length())
569                                 {
570                                     _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
571                                     _aggregate.clear();
572                                     _opcode=-1;
573                                 }
574                                 else
575                                 {
576                                     _aggregate.put(buffer);
577 
578                                     // If this is the last fragment, deliver
579                                     if (lastFrame && _onBinaryMessage!=null)
580                                     {
581                                         try
582                                         {
583                                             _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
584                                         }
585                                         finally
586                                         {
587                                             _opcode=-1;
588                                             _aggregate.clear();
589                                         }
590                                     }
591                                 }
592                             }
593                             break;
594                         }
595                         case WebSocketConnectionD06.OP_PING:
596                         {
597                             LOG.debug("PING {}",this);
598                             if (!_closedOut)
599                                 _connection.sendControl(WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
600                             break;
601                         }
602 
603                         case WebSocketConnectionD06.OP_PONG:
604                         {
605                             LOG.debug("PONG {}",this);
606                             break;
607                         }
608 
609                         case WebSocketConnectionD06.OP_CLOSE:
610                         {
611                             int code=-1;
612                             String message=null;
613                             if (buffer.length()>=2)
614                             {
615                                 code=buffer.array()[buffer.getIndex()]*0xff+buffer.array()[buffer.getIndex()+1];
616                                 if (buffer.length()>2)
617                                     message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
618                             }
619                             closeIn(code,message);
620                             break;
621                         }
622 
623 
624                         case WebSocketConnectionD06.OP_TEXT:
625                         {
626                             if(_onTextMessage!=null)
627                             {
628                                 if (lastFrame)
629                                 {
630                                     // Deliver the message
631                                     _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
632                                 }
633                                 else
634                                 {
635                                     if (_connection.getMaxTextMessageSize()>=0)
636                                     {
637                                         // If this is a text fragment, append to buffer
638                                         if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
639                                             _opcode=WebSocketConnectionD06.OP_TEXT;
640                                         else
641                                         {
642                                             _utf8.reset();
643                                             _opcode=-1;
644                                             _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
645                                         }
646                                     }
647                                 }
648                             }
649                             break;
650                         }
651 
652                         default:
653                         {
654                             if (_onBinaryMessage!=null)
655                             {
656                                 if (lastFrame)
657                                 {
658                                     _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
659                                 }
660                                 else
661                                 {
662                                     if (_connection.getMaxBinaryMessageSize()>=0)
663                                     {
664                                         if (buffer.length()>_connection.getMaxBinaryMessageSize())
665                                         {
666                                             _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
667                                             if (_aggregate!=null)
668                                                 _aggregate.clear();
669                                             _opcode=-1;
670                                         }
671                                         else
672                                         {
673                                             _opcode=opcode;
674                                             if (_aggregate==null)
675                                                 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
676                                             _aggregate.put(buffer);
677                                         }
678                                     }
679                                 }
680                             }
681                         }
682                     }
683                 }
684                 catch(Throwable th)
685                 {
686                     LOG.warn(th);
687                 }
688             }
689         }
690 
691         public void close(int code,String message)
692         {
693             _connection.close(code,message);
694         }
695 
696         public String toString()
697         {
698             return WebSocketConnectionD06.this.toString()+"FH";
699         }
700     }
701 
702     /* ------------------------------------------------------------ */
703     public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
704     {
705         String key = request.getHeader("Sec-WebSocket-Key");
706 
707         response.setHeader("Upgrade","WebSocket");
708         response.addHeader("Connection","Upgrade");
709         response.addHeader("Sec-WebSocket-Accept",hashKey(key));
710         if (subprotocol!=null)
711             response.addHeader("Sec-WebSocket-Protocol",subprotocol);
712         response.sendError(101);
713 
714         if (_onFrame!=null)
715             _onFrame.onHandshake(_connection);
716         _webSocket.onOpen(_connection);
717     }
718 
719     /* ------------------------------------------------------------ */
720     public static String hashKey(String key)
721     {
722         try
723         {
724             MessageDigest md = MessageDigest.getInstance("SHA1");
725             md.update(key.getBytes("UTF-8"));
726             md.update(MAGIC);
727             return new String(B64Code.encode(md.digest()));
728         }
729         catch (Exception e)
730         {
731             throw new RuntimeException(e);
732         }
733     }
734 }