View Javadoc

1   // ========================================================================
2   // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.websocket;
15  
16  import java.io.IOException;
17  import java.security.SecureRandom;
18  import java.util.Random;
19  
20  import org.eclipse.jetty.io.Buffer;
21  import org.eclipse.jetty.io.EndPoint;
22  import org.eclipse.jetty.io.EofException;
23  import org.eclipse.jetty.util.TypeUtil;
24  
25  
26  /* ------------------------------------------------------------ */
27  /** WebSocketGenerator.
28   * This class generates websocket packets.
29   * It is fully synchronized because it is likely that async
30   * threads will call the addMessage methods while other
31   * threads are flushing the generator.
32   */
33  public class WebSocketGeneratorD06 implements WebSocketGenerator
34  {
35      final private WebSocketBuffers _buffers;
36      final private EndPoint _endp;
37      private Buffer _buffer;
38      private final byte[] _mask=new byte[4];
39      private int _m;
40      private boolean _opsent;
41      private final MaskGen _maskGen;
42  
43      public interface MaskGen
44      {
45          void genMask(byte[] mask);
46      }
47      
48      public static class NullMaskGen implements MaskGen
49      {
50          public void genMask(byte[] mask)
51          {
52              mask[0]=mask[1]=mask[2]=mask[3]=0;
53          }
54      }
55      
56      public static class FixedMaskGen implements MaskGen
57      {
58          final byte[] _mask;
59          public FixedMaskGen()
60          {
61              _mask=new byte[]{(byte)0xff,(byte)0xff,(byte)0xff,(byte)0xff};
62          }
63          
64          public FixedMaskGen(byte[] mask)
65          {
66              _mask=mask;
67          }
68          
69          public void genMask(byte[] mask)
70          {
71              mask[0]=_mask[0];
72              mask[1]=_mask[1];
73              mask[2]=_mask[2];
74              mask[3]=_mask[3];
75          }
76      }
77  
78      public static class RandomMaskGen implements MaskGen
79      {
80          final Random _random;
81          public RandomMaskGen()
82          {
83              _random=new SecureRandom(); 
84          }
85          
86          public RandomMaskGen(Random random)
87          {
88              _random=random;
89          }
90          
91          public void genMask(byte[] mask)
92          {
93              _random.nextBytes(mask);
94          }
95      }
96  
97      
98      public WebSocketGeneratorD06(WebSocketBuffers buffers, EndPoint endp)
99      {
100         _buffers=buffers;
101         _endp=endp;
102         _maskGen=null;
103     }
104     
105     public WebSocketGeneratorD06(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
106     {
107         _buffers=buffers;
108         _endp=endp;
109         _maskGen=maskGen;
110     }
111 
112     public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length, int blockFor) throws IOException
113     {
114         // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
115         
116         if (_buffer==null)
117             _buffer=(_maskGen!=null)?_buffers.getBuffer():_buffers.getDirectBuffer();
118             
119         boolean last=WebSocketConnectionD06.isLastFrame(flags);
120         opcode=(byte)(((0xf&flags)<<4)+0xf&opcode);
121         
122         int space=(_maskGen!=null)?14:10;
123         
124         do
125         {
126             opcode = _opsent?WebSocketConnectionD06.OP_CONTINUATION:opcode;
127             _opsent=true;
128             
129             int payload=length;
130             if (payload+space>_buffer.capacity())
131             {
132                 // We must fragement, so clear FIN bit
133                 opcode&=(byte)0x7F; // Clear the FIN bit
134                 payload=_buffer.capacity()-space;
135             }
136             else if (last)
137                 opcode|=(byte)0x80; // Set the FIN bit
138 
139             // ensure there is space for header
140             if (_buffer.space() <= space)
141                 expelBuffer(blockFor);
142             
143             // write mask
144             if ((_maskGen!=null))
145             {
146                 _maskGen.genMask(_mask);
147                 _m=0;
148                 _buffer.put(_mask);
149             }
150 
151             // write the opcode and length
152             if (payload>0xffff)
153             {
154                 bufferPut(new byte[]{
155                         opcode,
156                         (byte)0x7f,
157                         (byte)((payload>>56)&0x7f),
158                         (byte)((payload>>48)&0xff), 
159                         (byte)((payload>>40)&0xff), 
160                         (byte)((payload>>32)&0xff),
161                         (byte)((payload>>24)&0xff),
162                         (byte)((payload>>16)&0xff),
163                         (byte)((payload>>8)&0xff),
164                         (byte)(payload&0xff)});
165             }
166             else if (payload >=0x7e)
167             {
168                 bufferPut(new byte[]{
169                         opcode,
170                         (byte)0x7e,
171                         (byte)(payload>>8),
172                         (byte)(payload&0xff)});
173             }
174             else
175             {
176                 bufferPut(opcode);
177                 bufferPut((byte)payload);
178             }
179 
180             // write payload
181             int remaining = payload;
182             while (remaining > 0)
183             {
184                 _buffer.compact();
185                 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
186                 
187                 if ((_maskGen!=null))
188                 {
189                     for (int i=0;i<chunk;i++)
190                         bufferPut(content[offset+ (payload-remaining)+i]);
191                 }
192                 else
193                     _buffer.put(content, offset + (payload - remaining), chunk);
194                 
195                 remaining -= chunk;
196                 if (_buffer.space() > 0)
197                 {
198                     // Gently flush the data, issuing a non-blocking write
199                     flushBuffer();
200                 }
201                 else
202                 {
203                     // Forcibly flush the data, issuing a blocking write
204                     expelBuffer(blockFor);
205                     if (remaining == 0)
206                     {
207                         // Gently flush the data, issuing a non-blocking write
208                         flushBuffer();
209                     }
210                 }
211             }
212             offset+=payload;
213             length-=payload;
214         }
215         while (length>0);
216         _opsent=!last;
217     }
218 
219     private synchronized void bufferPut(byte[] data) throws IOException
220     {
221         if (_maskGen!=null)
222             for (int i=0;i<data.length;i++)
223                 data[i]^=_mask[+_m++%4];
224         _buffer.put(data);
225     }
226     
227     private synchronized void bufferPut(byte data) throws IOException
228     {
229         _buffer.put((byte)(data^_mask[+_m++%4]));
230     }
231 
232     public synchronized int flush(int blockFor) throws IOException
233     {
234         return expelBuffer(blockFor);
235     }
236 
237     public synchronized int flush() throws IOException
238     {
239         int flushed = flushBuffer();
240         if (_buffer!=null && _buffer.length()==0)
241         {
242             _buffers.returnBuffer(_buffer);
243             _buffer=null;
244         }
245         return flushed;
246     }
247 
248     private synchronized int flushBuffer() throws IOException
249     {
250         if (!_endp.isOpen())
251             throw new EofException();
252 
253         if (_buffer!=null)
254             return _endp.flush(_buffer);
255 
256         return 0;
257     }
258 
259     private synchronized int expelBuffer(long blockFor) throws IOException
260     {
261         if (_buffer==null)
262             return 0;
263         int result = flushBuffer();
264         _buffer.compact();
265         if (!_endp.isBlocking())
266         {
267             while (_buffer.space()==0)
268             {
269                 // TODO: in case the I/O system signals write ready, but when we attempt to write we cannot
270                 // TODO: we should decrease the blockFor timeout instead of waiting again the whole timeout
271                 boolean ready = _endp.blockWritable(blockFor);
272                 if (!ready)
273                     throw new IOException("Write timeout");
274 
275                 result += flushBuffer();
276                 _buffer.compact();
277             }
278         }
279         return result;
280     }
281 
282     public synchronized boolean isBufferEmpty()
283     {
284         return _buffer==null || _buffer.length()==0;
285     }
286 
287 }