View Javadoc

1   // ========================================================================
2   // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.websocket;
15  
16  import java.io.IOException;
17  import java.security.SecureRandom;
18  import java.util.Random;
19  
20  import org.eclipse.jetty.io.Buffer;
21  import org.eclipse.jetty.io.EndPoint;
22  import org.eclipse.jetty.io.EofException;
23  
24  
25  /* ------------------------------------------------------------ */
26  /** WebSocketGenerator.
27   * This class generates websocket packets.
28   * It is fully synchronized because it is likely that async
29   * threads will call the addMessage methods while other
30   * threads are flushing the generator.
31   */
32  public class WebSocketGeneratorD06 implements WebSocketGenerator
33  {
34      final private WebSocketBuffers _buffers;
35      final private EndPoint _endp;
36      private Buffer _buffer;
37      private final byte[] _mask=new byte[4];
38      private int _m;
39      private boolean _opsent;
40      private final MaskGen _maskGen;
41      
42      public WebSocketGeneratorD06(WebSocketBuffers buffers, EndPoint endp)
43      {
44          _buffers=buffers;
45          _endp=endp;
46          _maskGen=null;
47      }
48      
49      public WebSocketGeneratorD06(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
50      {
51          _buffers=buffers;
52          _endp=endp;
53          _maskGen=maskGen;
54      }
55  
56      public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
57      {
58          // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
59          
60          long blockFor=_endp.getMaxIdleTime();
61          
62          if (_buffer==null)
63              _buffer=(_maskGen!=null)?_buffers.getBuffer():_buffers.getDirectBuffer();
64              
65          boolean last=WebSocketConnectionD06.isLastFrame(flags);
66          opcode=(byte)(((0xf&flags)<<4)+0xf&opcode);
67          
68          int space=(_maskGen!=null)?14:10;
69          
70          do
71          {
72              opcode = _opsent?WebSocketConnectionD06.OP_CONTINUATION:opcode;
73              _opsent=true;
74              
75              int payload=length;
76              if (payload+space>_buffer.capacity())
77              {
78                  // We must fragement, so clear FIN bit
79                  opcode&=(byte)0x7F; // Clear the FIN bit
80                  payload=_buffer.capacity()-space;
81              }
82              else if (last)
83                  opcode|=(byte)0x80; // Set the FIN bit
84  
85              // ensure there is space for header
86              if (_buffer.space() <= space)
87                  expelBuffer(blockFor);
88              
89              // write mask
90              if ((_maskGen!=null))
91              {
92                  _maskGen.genMask(_mask);
93                  _m=0;
94                  _buffer.put(_mask);
95              }
96  
97              // write the opcode and length
98              if (payload>0xffff)
99              {
100                 bufferPut(new byte[]{
101                         opcode,
102                         (byte)0x7f,
103                         (byte)((payload>>56)&0x7f),
104                         (byte)((payload>>48)&0xff), 
105                         (byte)((payload>>40)&0xff), 
106                         (byte)((payload>>32)&0xff),
107                         (byte)((payload>>24)&0xff),
108                         (byte)((payload>>16)&0xff),
109                         (byte)((payload>>8)&0xff),
110                         (byte)(payload&0xff)});
111             }
112             else if (payload >=0x7e)
113             {
114                 bufferPut(new byte[]{
115                         opcode,
116                         (byte)0x7e,
117                         (byte)(payload>>8),
118                         (byte)(payload&0xff)});
119             }
120             else
121             {
122                 bufferPut(opcode);
123                 bufferPut((byte)payload);
124             }
125 
126             // write payload
127             int remaining = payload;
128             while (remaining > 0)
129             {
130                 _buffer.compact();
131                 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
132                 
133                 if ((_maskGen!=null))
134                 {
135                     for (int i=0;i<chunk;i++)
136                         bufferPut(content[offset+ (payload-remaining)+i]);
137                 }
138                 else
139                     _buffer.put(content, offset + (payload - remaining), chunk);
140                 
141                 remaining -= chunk;
142                 if (_buffer.space() > 0)
143                 {
144                     // Gently flush the data, issuing a non-blocking write
145                     flushBuffer();
146                 }
147                 else
148                 {
149                     // Forcibly flush the data, issuing a blocking write
150                     expelBuffer(blockFor);
151                     if (remaining == 0)
152                     {
153                         // Gently flush the data, issuing a non-blocking write
154                         flushBuffer();
155                     }
156                 }
157             }
158             offset+=payload;
159             length-=payload;
160         }
161         while (length>0);
162         _opsent=!last;
163     }
164 
165     private synchronized void bufferPut(byte[] data) throws IOException
166     {
167         if (_maskGen!=null)
168             for (int i=0;i<data.length;i++)
169                 data[i]^=_mask[+_m++%4];
170         _buffer.put(data);
171     }
172     
173     private synchronized void bufferPut(byte data) throws IOException
174     {
175         _buffer.put((byte)(data^_mask[+_m++%4]));
176     }
177 
178     public synchronized int flush(int blockFor) throws IOException
179     {
180         return expelBuffer(blockFor);
181     }
182 
183     public synchronized int flush() throws IOException
184     {
185         int flushed = flushBuffer();
186         if (_buffer!=null && _buffer.length()==0)
187         {
188             _buffers.returnBuffer(_buffer);
189             _buffer=null;
190         }
191         return flushed;
192     }
193 
194     private synchronized int flushBuffer() throws IOException
195     {
196         if (!_endp.isOpen())
197             throw new EofException();
198 
199         if (_buffer!=null)
200             return _endp.flush(_buffer);
201 
202         return 0;
203     }
204 
205     private synchronized int expelBuffer(long blockFor) throws IOException
206     {
207         if (_buffer==null)
208             return 0;
209         int result = flushBuffer();
210         _buffer.compact();
211         if (!_endp.isBlocking())
212         {
213             while (_buffer.space()==0)
214             {
215                 // TODO: in case the I/O system signals write ready, but when we attempt to write we cannot
216                 // TODO: we should decrease the blockFor timeout instead of waiting again the whole timeout
217                 boolean ready = _endp.blockWritable(blockFor);
218                 if (!ready)
219                     throw new IOException("Write timeout");
220 
221                 result += flushBuffer();
222                 _buffer.compact();
223             }
224         }
225         return result;
226     }
227 
228     public synchronized boolean isBufferEmpty()
229     {
230         return _buffer==null || _buffer.length()==0;
231     }
232 
233 }