View Javadoc

1   // ========================================================================
2   // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.websocket;
15  
16  import java.io.IOException;
17  
18  import org.eclipse.jetty.io.Buffer;
19  import org.eclipse.jetty.io.EndPoint;
20  import org.eclipse.jetty.io.EofException;
21  
22  
23  /* ------------------------------------------------------------ */
24  /** WebSocketGenerator.
25   * This class generates websocket packets.
26   * It is fully synchronized because it is likely that async
27   * threads will call the addMessage methods while other
28   * threads are flushing the generator.
29   */
30  public class WebSocketGeneratorD06 implements WebSocketGenerator
31  {
32      final private WebSocketBuffers _buffers;
33      final private EndPoint _endp;
34      private Buffer _buffer;
35      private final byte[] _mask=new byte[4];
36      private int _m;
37      private boolean _opsent;
38      private final MaskGen _maskGen;
39  
40      public WebSocketGeneratorD06(WebSocketBuffers buffers, EndPoint endp)
41      {
42          _buffers=buffers;
43          _endp=endp;
44          _maskGen=null;
45      }
46  
47      public WebSocketGeneratorD06(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
48      {
49          _buffers=buffers;
50          _endp=endp;
51          _maskGen=maskGen;
52      }
53  
54      public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
55      {
56          // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
57  
58          long blockFor=_endp.getMaxIdleTime();
59  
60          if (_buffer==null)
61              _buffer=(_maskGen!=null)?_buffers.getBuffer():_buffers.getDirectBuffer();
62  
63          boolean last=WebSocketConnectionD06.isLastFrame(flags);
64          opcode=(byte)(((0xf&flags)<<4)+0xf&opcode);
65  
66          int space=(_maskGen!=null)?14:10;
67  
68          do
69          {
70              opcode = _opsent?WebSocketConnectionD06.OP_CONTINUATION:opcode;
71              _opsent=true;
72  
73              int payload=length;
74              if (payload+space>_buffer.capacity())
75              {
76                  // We must fragement, so clear FIN bit
77                  opcode&=(byte)0x7F; // Clear the FIN bit
78                  payload=_buffer.capacity()-space;
79              }
80              else if (last)
81                  opcode|=(byte)0x80; // Set the FIN bit
82  
83              // ensure there is space for header
84              if (_buffer.space() <= space)
85                  expelBuffer(blockFor);
86  
87              // write mask
88              if ((_maskGen!=null))
89              {
90                  _maskGen.genMask(_mask);
91                  _m=0;
92                  _buffer.put(_mask);
93              }
94  
95              // write the opcode and length
96              if (payload>0xffff)
97              {
98                  bufferPut(new byte[]{
99                          opcode,
100                         (byte)0x7f,
101                         (byte)0,
102                         (byte)0,
103                         (byte)0,
104                         (byte)0,
105                         (byte)((payload>>24)&0xff),
106                         (byte)((payload>>16)&0xff),
107                         (byte)((payload>>8)&0xff),
108                         (byte)(payload&0xff)});
109             }
110             else if (payload >=0x7e)
111             {
112                 bufferPut(new byte[]{
113                         opcode,
114                         (byte)0x7e,
115                         (byte)(payload>>8),
116                         (byte)(payload&0xff)});
117             }
118             else
119             {
120                 bufferPut(opcode);
121                 bufferPut((byte)payload);
122             }
123 
124             // write payload
125             int remaining = payload;
126             while (remaining > 0)
127             {
128                 _buffer.compact();
129                 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
130 
131                 if ((_maskGen!=null))
132                 {
133                     for (int i=0;i<chunk;i++)
134                         bufferPut(content[offset+ (payload-remaining)+i]);
135                 }
136                 else
137                     _buffer.put(content, offset + (payload - remaining), chunk);
138 
139                 remaining -= chunk;
140                 if (_buffer.space() > 0)
141                 {
142                     // Gently flush the data, issuing a non-blocking write
143                     flushBuffer();
144                 }
145                 else
146                 {
147                     // Forcibly flush the data, issuing a blocking write
148                     expelBuffer(blockFor);
149                     if (remaining == 0)
150                     {
151                         // Gently flush the data, issuing a non-blocking write
152                         flushBuffer();
153                     }
154                 }
155             }
156             offset+=payload;
157             length-=payload;
158         }
159         while (length>0);
160         _opsent=!last;
161     }
162 
163     private synchronized void bufferPut(byte[] data) throws IOException
164     {
165         if (_maskGen!=null)
166             for (int i=0;i<data.length;i++)
167                 data[i]^=_mask[+_m++%4];
168         _buffer.put(data);
169     }
170 
171     private synchronized void bufferPut(byte data) throws IOException
172     {
173         _buffer.put((byte)(data^_mask[+_m++%4]));
174     }
175 
176     public synchronized int flush(int blockFor) throws IOException
177     {
178         return expelBuffer(blockFor);
179     }
180 
181     public synchronized int flush() throws IOException
182     {
183         int flushed = flushBuffer();
184         if (_buffer!=null && _buffer.length()==0)
185         {
186             _buffers.returnBuffer(_buffer);
187             _buffer=null;
188         }
189         return flushed;
190     }
191 
192     private synchronized int flushBuffer() throws IOException
193     {
194         if (!_endp.isOpen())
195             throw new EofException();
196 
197         if (_buffer!=null)
198             return _endp.flush(_buffer);
199 
200         return 0;
201     }
202 
203     private synchronized int expelBuffer(long blockFor) throws IOException
204     {
205         if (_buffer==null)
206             return 0;
207         int result = flushBuffer();
208         _buffer.compact();
209         if (!_endp.isBlocking())
210         {
211             while (_buffer.space()==0)
212             {
213                 boolean ready = _endp.blockWritable(blockFor);
214                 if (!ready)
215                     throw new IOException("Write timeout");
216 
217                 result += flushBuffer();
218                 _buffer.compact();
219             }
220         }
221         return result;
222     }
223 
224     public synchronized boolean isBufferEmpty()
225     {
226         return _buffer==null || _buffer.length()==0;
227     }
228 
229 }