View Javadoc

1   // ========================================================================
2   // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.websocket;
15  
16  import java.io.IOException;
17  import java.util.Random;
18  
19  import org.eclipse.jetty.io.Buffer;
20  import org.eclipse.jetty.io.EndPoint;
21  import org.eclipse.jetty.io.EofException;
22  
23  
24  /* ------------------------------------------------------------ */
25  /** WebSocketGenerator.
26   * This class generates websocket packets.
27   * It is fully synchronized because it is likely that async
28   * threads will call the addMessage methods while other
29   * threads are flushing the generator.
30   */
31  public class WebSocketGeneratorD10 implements WebSocketGenerator
32  {
33      final private WebSocketBuffers _buffers;
34      final private EndPoint _endp;
35      private Buffer _buffer;
36      private final byte[] _mask=new byte[4];
37      private int _m;
38      private boolean _opsent;
39      private final MaskGen _maskGen;
40  
41      public interface MaskGen
42      {
43          void genMask(byte[] mask);
44      }
45      
46      public static class NullMaskGen implements MaskGen
47      {
48          public void genMask(byte[] mask)
49          {
50              mask[0]=mask[1]=mask[2]=mask[3]=0;
51          }
52      }
53      
54      public static class FixedMaskGen implements MaskGen
55      {
56          final byte[] _mask;
57          public FixedMaskGen()
58          {
59              _mask=new byte[]{(byte)0xff,(byte)0xff,(byte)0xff,(byte)0xff};
60          }
61          
62          public FixedMaskGen(byte[] mask)
63          {
64              _mask=mask;
65          }
66          
67          public void genMask(byte[] mask)
68          {
69              mask[0]=_mask[0];
70              mask[1]=_mask[1];
71              mask[2]=_mask[2];
72              mask[3]=_mask[3];
73          }
74      }
75  
76      public static class RandomMaskGen implements MaskGen
77      {
78          final Random _random;
79          public RandomMaskGen()
80          {
81              _random=new Random(); 
82          }
83          
84          public RandomMaskGen(Random random)
85          {
86              _random=random;
87          }
88          
89          public void genMask(byte[] mask)
90          {
91              _random.nextBytes(mask);
92          }
93      }
94  
95      
96      public WebSocketGeneratorD10(WebSocketBuffers buffers, EndPoint endp)
97      {
98          _buffers=buffers;
99          _endp=endp;
100         _maskGen=null;
101     }
102     
103     public WebSocketGeneratorD10(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
104     {
105         _buffers=buffers;
106         _endp=endp;
107         _maskGen=maskGen;
108     }
109 
110     public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
111     {
112         // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
113         
114         boolean mask=_maskGen!=null;
115         
116         if (_buffer==null)
117             _buffer=mask?_buffers.getBuffer():_buffers.getDirectBuffer();
118             
119         boolean last=WebSocketConnectionD10.isLastFrame(flags);
120         byte orig=opcode;
121         
122         int space=mask?14:10;
123         
124         do
125         {
126             opcode = _opsent?WebSocketConnectionD10.OP_CONTINUATION:opcode;
127             opcode=(byte)(((0xf&flags)<<4)+(0xf&opcode));
128             _opsent=true;
129             
130             int payload=length;
131             if (payload+space>_buffer.capacity())
132             {
133                 // We must fragement, so clear FIN bit
134                 opcode=(byte)(opcode&0x7F); // Clear the FIN bit
135                 payload=_buffer.capacity()-space;
136             }
137             else if (last)
138                 opcode= (byte)(opcode|0x80); // Set the FIN bit
139       
140             // ensure there is space for header
141             if (_buffer.space() <= space)
142             {
143                 flushBuffer();
144                 if (_buffer.space() <= space)
145                     flush();
146             }
147             
148             // write the opcode and length
149             if (payload>0xffff)
150             {
151                 _buffer.put(new byte[]{
152                         opcode,
153                         mask?(byte)0xff:(byte)0x7f,
154                         (byte)((payload>>56)&0x7f),
155                         (byte)((payload>>48)&0xff), 
156                         (byte)((payload>>40)&0xff), 
157                         (byte)((payload>>32)&0xff),
158                         (byte)((payload>>24)&0xff),
159                         (byte)((payload>>16)&0xff),
160                         (byte)((payload>>8)&0xff),
161                         (byte)(payload&0xff)});
162             }
163             else if (payload >=0x7e)
164             {
165                 _buffer.put(new byte[]{
166                         opcode,
167                         mask?(byte)0xfe:(byte)0x7e,
168                         (byte)(payload>>8),
169                         (byte)(payload&0xff)});
170             }
171             else
172             {
173                 _buffer.put(new byte[]{ 
174                         opcode,
175                         (byte)(mask?(0x80|payload):payload)});
176             }
177 
178             // write mask
179             if (mask)
180             {
181                 _maskGen.genMask(_mask);
182                 _m=0;
183                 _buffer.put(_mask);
184             }
185 
186             
187             // write payload
188             int remaining = payload;
189             while (remaining > 0)
190             {
191                 _buffer.compact();
192                 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
193                 
194                 if (mask)
195                 {
196                     for (int i=0;i<chunk;i++)
197                         _buffer.put((byte)(content[offset+ (payload-remaining)+i]^_mask[+_m++%4]));
198                 }
199                 else
200                     _buffer.put(content, offset + (payload - remaining), chunk);
201                 
202                 remaining -= chunk;
203                 if (_buffer.space() > 0)
204                 {
205                     // Gently flush the data, issuing a non-blocking write
206                     flushBuffer();
207                 }
208                 else
209                 {
210                     // Forcibly flush the data, issuing a blocking write
211                     flush();
212                     if (remaining == 0)
213                     {
214                         // Gently flush the data, issuing a non-blocking write
215                         flushBuffer();
216                     }
217                 }
218             }
219             offset+=payload;
220             length-=payload;
221         }
222         while (length>0);
223         _opsent=!last;
224         
225         if (_buffer!=null && _buffer.length()==0)
226         {
227             _buffers.returnBuffer(_buffer);
228             _buffer=null;
229         }
230     }
231 
232     public synchronized int flushBuffer() throws IOException
233     {
234         if (!_endp.isOpen())
235             throw new EofException();
236 
237         if (_buffer!=null)
238             return _endp.flush(_buffer);
239 
240         return 0;
241     }
242 
243     public synchronized int flush() throws IOException
244     {
245         if (_buffer==null)
246             return 0;
247         int result = flushBuffer();
248         
249         if (!_endp.isBlocking())
250         {
251             long now = System.currentTimeMillis();
252             long end=now+_endp.getMaxIdleTime();
253             while (_buffer.length()>0)
254             {
255                 boolean ready = _endp.blockWritable(end-now);
256                 if (!ready)
257                 {
258                     now = System.currentTimeMillis();
259                     if (now<end)
260                         continue;
261                     throw new IOException("Write timeout");
262                 }
263 
264                 result += flushBuffer();
265             }
266         }
267         _buffer.compact();
268         return result;
269     }
270 
271     public synchronized boolean isBufferEmpty()
272     {
273         return _buffer==null || _buffer.length()==0;
274     }
275 
276     public synchronized void idle()
277     {
278         if (_buffer!=null && _buffer.length()==0)
279         {
280             _buffers.returnBuffer(_buffer);
281             _buffer=null;
282         }
283     }
284 
285 }