View Javadoc

1   // ========================================================================
2   // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.websocket;
15  
16  import java.io.IOException;
17  import java.security.SecureRandom;
18  import java.util.Random;
19  
20  import org.eclipse.jetty.io.Buffer;
21  import org.eclipse.jetty.io.EndPoint;
22  import org.eclipse.jetty.io.EofException;
23  
24  
25  /* ------------------------------------------------------------ */
26  /** WebSocketGenerator.
27   * This class generates websocket packets.
28   * It is fully synchronized because it is likely that async
29   * threads will call the addMessage methods while other
30   * threads are flushing the generator.
31   */
32  public class WebSocketGeneratorD06 implements WebSocketGenerator
33  {
34      final private WebSocketBuffers _buffers;
35      final private EndPoint _endp;
36      private Buffer _buffer;
37      private final byte[] _mask=new byte[4];
38      private int _m;
39      private boolean _opsent;
40      private final MaskGen _maskGen;
41  
42      public interface MaskGen
43      {
44          void genMask(byte[] mask);
45      }
46      
47      public static class NullMaskGen implements MaskGen
48      {
49          public void genMask(byte[] mask)
50          {
51              mask[0]=mask[1]=mask[2]=mask[3]=0;
52          }
53      }
54      
55      public static class FixedMaskGen implements MaskGen
56      {
57          final byte[] _mask;
58          public FixedMaskGen()
59          {
60              _mask=new byte[]{(byte)0xff,(byte)0xff,(byte)0xff,(byte)0xff};
61          }
62          
63          public FixedMaskGen(byte[] mask)
64          {
65              _mask=mask;
66          }
67          
68          public void genMask(byte[] mask)
69          {
70              mask[0]=_mask[0];
71              mask[1]=_mask[1];
72              mask[2]=_mask[2];
73              mask[3]=_mask[3];
74          }
75      }
76  
77      public static class RandomMaskGen implements MaskGen
78      {
79          final Random _random;
80          public RandomMaskGen()
81          {
82              _random=new SecureRandom(); 
83          }
84          
85          public RandomMaskGen(Random random)
86          {
87              _random=random;
88          }
89          
90          public void genMask(byte[] mask)
91          {
92              _random.nextBytes(mask);
93          }
94      }
95  
96      
97      public WebSocketGeneratorD06(WebSocketBuffers buffers, EndPoint endp)
98      {
99          _buffers=buffers;
100         _endp=endp;
101         _maskGen=null;
102     }
103     
104     public WebSocketGeneratorD06(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
105     {
106         _buffers=buffers;
107         _endp=endp;
108         _maskGen=maskGen;
109     }
110 
111     public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
112     {
113         // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
114         
115         long blockFor=_endp.getMaxIdleTime();
116         
117         if (_buffer==null)
118             _buffer=(_maskGen!=null)?_buffers.getBuffer():_buffers.getDirectBuffer();
119             
120         boolean last=WebSocketConnectionD06.isLastFrame(flags);
121         opcode=(byte)(((0xf&flags)<<4)+0xf&opcode);
122         
123         int space=(_maskGen!=null)?14:10;
124         
125         do
126         {
127             opcode = _opsent?WebSocketConnectionD06.OP_CONTINUATION:opcode;
128             _opsent=true;
129             
130             int payload=length;
131             if (payload+space>_buffer.capacity())
132             {
133                 // We must fragement, so clear FIN bit
134                 opcode&=(byte)0x7F; // Clear the FIN bit
135                 payload=_buffer.capacity()-space;
136             }
137             else if (last)
138                 opcode|=(byte)0x80; // Set the FIN bit
139 
140             // ensure there is space for header
141             if (_buffer.space() <= space)
142                 expelBuffer(blockFor);
143             
144             // write mask
145             if ((_maskGen!=null))
146             {
147                 _maskGen.genMask(_mask);
148                 _m=0;
149                 _buffer.put(_mask);
150             }
151 
152             // write the opcode and length
153             if (payload>0xffff)
154             {
155                 bufferPut(new byte[]{
156                         opcode,
157                         (byte)0x7f,
158                         (byte)((payload>>56)&0x7f),
159                         (byte)((payload>>48)&0xff), 
160                         (byte)((payload>>40)&0xff), 
161                         (byte)((payload>>32)&0xff),
162                         (byte)((payload>>24)&0xff),
163                         (byte)((payload>>16)&0xff),
164                         (byte)((payload>>8)&0xff),
165                         (byte)(payload&0xff)});
166             }
167             else if (payload >=0x7e)
168             {
169                 bufferPut(new byte[]{
170                         opcode,
171                         (byte)0x7e,
172                         (byte)(payload>>8),
173                         (byte)(payload&0xff)});
174             }
175             else
176             {
177                 bufferPut(opcode);
178                 bufferPut((byte)payload);
179             }
180 
181             // write payload
182             int remaining = payload;
183             while (remaining > 0)
184             {
185                 _buffer.compact();
186                 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
187                 
188                 if ((_maskGen!=null))
189                 {
190                     for (int i=0;i<chunk;i++)
191                         bufferPut(content[offset+ (payload-remaining)+i]);
192                 }
193                 else
194                     _buffer.put(content, offset + (payload - remaining), chunk);
195                 
196                 remaining -= chunk;
197                 if (_buffer.space() > 0)
198                 {
199                     // Gently flush the data, issuing a non-blocking write
200                     flushBuffer();
201                 }
202                 else
203                 {
204                     // Forcibly flush the data, issuing a blocking write
205                     expelBuffer(blockFor);
206                     if (remaining == 0)
207                     {
208                         // Gently flush the data, issuing a non-blocking write
209                         flushBuffer();
210                     }
211                 }
212             }
213             offset+=payload;
214             length-=payload;
215         }
216         while (length>0);
217         _opsent=!last;
218     }
219 
220     private synchronized void bufferPut(byte[] data) throws IOException
221     {
222         if (_maskGen!=null)
223             for (int i=0;i<data.length;i++)
224                 data[i]^=_mask[+_m++%4];
225         _buffer.put(data);
226     }
227     
228     private synchronized void bufferPut(byte data) throws IOException
229     {
230         _buffer.put((byte)(data^_mask[+_m++%4]));
231     }
232 
233     public synchronized int flush(int blockFor) throws IOException
234     {
235         return expelBuffer(blockFor);
236     }
237 
238     public synchronized int flush() throws IOException
239     {
240         int flushed = flushBuffer();
241         if (_buffer!=null && _buffer.length()==0)
242         {
243             _buffers.returnBuffer(_buffer);
244             _buffer=null;
245         }
246         return flushed;
247     }
248 
249     private synchronized int flushBuffer() throws IOException
250     {
251         if (!_endp.isOpen())
252             throw new EofException();
253 
254         if (_buffer!=null)
255             return _endp.flush(_buffer);
256 
257         return 0;
258     }
259 
260     private synchronized int expelBuffer(long blockFor) throws IOException
261     {
262         if (_buffer==null)
263             return 0;
264         int result = flushBuffer();
265         _buffer.compact();
266         if (!_endp.isBlocking())
267         {
268             while (_buffer.space()==0)
269             {
270                 // TODO: in case the I/O system signals write ready, but when we attempt to write we cannot
271                 // TODO: we should decrease the blockFor timeout instead of waiting again the whole timeout
272                 boolean ready = _endp.blockWritable(blockFor);
273                 if (!ready)
274                     throw new IOException("Write timeout");
275 
276                 result += flushBuffer();
277                 _buffer.compact();
278             }
279         }
280         return result;
281     }
282 
283     public synchronized boolean isBufferEmpty()
284     {
285         return _buffer==null || _buffer.length()==0;
286     }
287 
288 }