View Javadoc

1   /*******************************************************************************
2    * Copyright (c) 2011 Intalio, Inc.
3    * ======================================================================
4    * All rights reserved. This program and the accompanying materials
5    * are made available under the terms of the Eclipse Public License v1.0
6    * and Apache License v2.0 which accompanies this distribution.
7    *
8    *   The Eclipse Public License is available at
9    *   http://www.eclipse.org/legal/epl-v10.html
10   *
11   *   The Apache License v2.0 is available at
12   *   http://www.opensource.org/licenses/apache2.0.php
13   *
14   * You may elect to redistribute this code under either of these licenses.
15   *******************************************************************************/
16  // ========================================================================
17  // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
18  // ------------------------------------------------------------------------
19  // All rights reserved. This program and the accompanying materials
20  // are made available under the terms of the Eclipse Public License v1.0
21  // and Apache License v2.0 which accompanies this distribution.
22  // The Eclipse Public License is available at
23  // http://www.eclipse.org/legal/epl-v10.html
24  // The Apache License v2.0 is available at
25  // http://www.opensource.org/licenses/apache2.0.php
26  // You may elect to redistribute this code under either of these licenses.
27  // ========================================================================
28  
29  package org.eclipse.jetty.websocket;
30  
31  import java.io.IOException;
32  
33  import org.eclipse.jetty.io.Buffer;
34  import org.eclipse.jetty.io.EndPoint;
35  import org.eclipse.jetty.io.EofException;
36  
37  
38  /* ------------------------------------------------------------ */
39  /** WebSocketGenerator.
40   * This class generates websocket packets.
41   * It is fully synchronized because it is likely that async
42   * threads will call the addMessage methods while other
43   * threads are flushing the generator.
44   */
45  public class WebSocketGeneratorRFC6455 implements WebSocketGenerator
46  {
47      final private WebSocketBuffers _buffers;
48      final private EndPoint _endp;
49      private Buffer _buffer;
50      private final byte[] _mask=new byte[4];
51      private int _m;
52      private boolean _opsent;
53      private final MaskGen _maskGen;
54      private boolean _closed;
55  
56      public WebSocketGeneratorRFC6455(WebSocketBuffers buffers, EndPoint endp)
57      {
58          _buffers=buffers;
59          _endp=endp;
60          _maskGen=null;
61      }
62  
63      public WebSocketGeneratorRFC6455(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
64      {
65          _buffers=buffers;
66          _endp=endp;
67          _maskGen=maskGen;
68      }
69  
70      public synchronized Buffer getBuffer()
71      {
72          return _buffer;
73      }
74  
75      public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
76      {
77          // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
78  
79          if (_closed)
80              throw new EofException("Closed");
81          if (opcode==WebSocketConnectionRFC6455.OP_CLOSE)
82              _closed=true;
83  
84          boolean mask=_maskGen!=null;
85  
86          if (_buffer==null)
87              _buffer=mask?_buffers.getBuffer():_buffers.getDirectBuffer();
88  
89          boolean last=WebSocketConnectionRFC6455.isLastFrame(flags);
90  
91          int space=mask?14:10;
92  
93          do
94          {
95              opcode = _opsent?WebSocketConnectionRFC6455.OP_CONTINUATION:opcode;
96              opcode=(byte)(((0xf&flags)<<4)+(0xf&opcode));
97              _opsent=true;
98  
99              int payload=length;
100             if (payload+space>_buffer.capacity())
101             {
102                 // We must fragement, so clear FIN bit
103                 opcode=(byte)(opcode&0x7F); // Clear the FIN bit
104                 payload=_buffer.capacity()-space;
105             }
106             else if (last)
107                 opcode= (byte)(opcode|0x80); // Set the FIN bit
108 
109             // ensure there is space for header
110             if (_buffer.space() <= space)
111             {
112                 flushBuffer();
113                 if (_buffer.space() <= space)
114                     flush();
115             }
116 
117             // write the opcode and length
118             if (payload>0xffff)
119             {
120                 _buffer.put(new byte[]{
121                         opcode,
122                         mask?(byte)0xff:(byte)0x7f,
123                         (byte)0,
124                         (byte)0,
125                         (byte)0,
126                         (byte)0,
127                         (byte)((payload>>24)&0xff),
128                         (byte)((payload>>16)&0xff),
129                         (byte)((payload>>8)&0xff),
130                         (byte)(payload&0xff)});
131             }
132             else if (payload >=0x7e)
133             {
134                 _buffer.put(new byte[]{
135                         opcode,
136                         mask?(byte)0xfe:(byte)0x7e,
137                         (byte)(payload>>8),
138                         (byte)(payload&0xff)});
139             }
140             else
141             {
142                 _buffer.put(new byte[]{
143                         opcode,
144                         (byte)(mask?(0x80|payload):payload)});
145             }
146 
147             // write mask
148             if (mask)
149             {
150                 _maskGen.genMask(_mask);
151                 _m=0;
152                 _buffer.put(_mask);
153             }
154 
155             // write payload
156             int remaining = payload;
157             while (remaining > 0)
158             {
159                 _buffer.compact();
160                 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
161 
162                 if (mask)
163                 {
164                     for (int i=0;i<chunk;i++)
165                         _buffer.put((byte)(content[offset+ (payload-remaining)+i]^_mask[+_m++%4]));
166                 }
167                 else
168                     _buffer.put(content, offset + (payload - remaining), chunk);
169 
170                 remaining -= chunk;
171                 if (_buffer.space() > 0)
172                 {
173                     // Gently flush the data, issuing a non-blocking write
174                     flushBuffer();
175                 }
176                 else
177                 {
178                     // Forcibly flush the data, issuing a blocking write
179                     flush();
180                     if (remaining == 0)
181                     {
182                         // Gently flush the data, issuing a non-blocking write
183                         flushBuffer();
184                     }
185                 }
186             }
187             offset+=payload;
188             length-=payload;
189         }
190         while (length>0);
191         _opsent=!last;
192 
193         if (_buffer!=null && _buffer.length()==0)
194         {
195             _buffers.returnBuffer(_buffer);
196             _buffer=null;
197         }
198     }
199 
200     public synchronized int flushBuffer() throws IOException
201     {
202         if (!_endp.isOpen())
203             throw new EofException();
204 
205         if (_buffer!=null)
206         {
207             int flushed=_buffer.hasContent()?_endp.flush(_buffer):0;
208             if (_closed&&_buffer.length()==0)
209                 _endp.shutdownOutput();
210             return flushed;
211         }
212 
213         return 0;
214     }
215 
216     public synchronized int flush() throws IOException
217     {
218         if (_buffer==null)
219             return 0;
220         int result = flushBuffer();
221 
222         if (!_endp.isBlocking())
223         {
224             long now = System.currentTimeMillis();
225             long end=now+_endp.getMaxIdleTime();
226             while (_buffer.length()>0)
227             {
228                 boolean ready = _endp.blockWritable(end-now);
229                 if (!ready)
230                 {
231                     now = System.currentTimeMillis();
232                     if (now<end)
233                         continue;
234                     throw new IOException("Write timeout");
235                 }
236 
237                 result += flushBuffer();
238             }
239         }
240         _buffer.compact();
241         return result;
242     }
243 
244     public synchronized boolean isBufferEmpty()
245     {
246         return _buffer==null || _buffer.length()==0;
247     }
248 
249     public synchronized void returnBuffer()
250     {
251         if (_buffer!=null && _buffer.length()==0)
252         {
253             _buffers.returnBuffer(_buffer);
254             _buffer=null;
255         }
256     }
257 
258     @Override
259     public String toString()
260     {
261         // Do NOT use synchronized (this)
262         // because it's very easy to deadlock when debugging is enabled.
263         // We do a best effort to print the right toString() and that's it.
264         Buffer buffer = _buffer;
265         return String.format("%s@%x closed=%b buffer=%d",
266                 getClass().getSimpleName(),
267                 hashCode(),
268                 _closed,
269                 buffer == null ? -1 : buffer.length());
270     }
271 }