View Javadoc

1   /*******************************************************************************
2    * Copyright (c) 2011 Intalio, Inc.
3    * ======================================================================
4    * All rights reserved. This program and the accompanying materials
5    * are made available under the terms of the Eclipse Public License v1.0
6    * and Apache License v2.0 which accompanies this distribution.
7    *
8    *   The Eclipse Public License is available at
9    *   http://www.eclipse.org/legal/epl-v10.html
10   *
11   *   The Apache License v2.0 is available at
12   *   http://www.opensource.org/licenses/apache2.0.php
13   *
14   * You may elect to redistribute this code under either of these licenses.
15   *******************************************************************************/
16  // ========================================================================
17  // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
18  // ------------------------------------------------------------------------
19  // All rights reserved. This program and the accompanying materials
20  // are made available under the terms of the Eclipse Public License v1.0
21  // and Apache License v2.0 which accompanies this distribution.
22  // The Eclipse Public License is available at
23  // http://www.eclipse.org/legal/epl-v10.html
24  // The Apache License v2.0 is available at
25  // http://www.opensource.org/licenses/apache2.0.php
26  // You may elect to redistribute this code under either of these licenses.
27  // ========================================================================
28  
29  package org.eclipse.jetty.websocket;
30  
31  import java.io.IOException;
32  
33  import org.eclipse.jetty.io.Buffer;
34  import org.eclipse.jetty.io.EndPoint;
35  import org.eclipse.jetty.io.EofException;
36  
37  
38  /* ------------------------------------------------------------ */
39  /** WebSocketGenerator.
40   * This class generates websocket packets.
41   * It is fully synchronized because it is likely that async
42   * threads will call the addMessage methods while other
43   * threads are flushing the generator.
44   */
45  public class WebSocketGeneratorD06 implements WebSocketGenerator
46  {
47      final private WebSocketBuffers _buffers;
48      final private EndPoint _endp;
49      private Buffer _buffer;
50      private final byte[] _mask=new byte[4];
51      private int _m;
52      private boolean _opsent;
53      private final MaskGen _maskGen;
54  
55      public WebSocketGeneratorD06(WebSocketBuffers buffers, EndPoint endp)
56      {
57          _buffers=buffers;
58          _endp=endp;
59          _maskGen=null;
60      }
61  
62      public WebSocketGeneratorD06(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
63      {
64          _buffers=buffers;
65          _endp=endp;
66          _maskGen=maskGen;
67      }
68  
69      public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
70      {
71          // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
72  
73          long blockFor=_endp.getMaxIdleTime();
74  
75          if (_buffer==null)
76              _buffer=(_maskGen!=null)?_buffers.getBuffer():_buffers.getDirectBuffer();
77  
78          boolean last=WebSocketConnectionD06.isLastFrame(flags);
79          opcode=(byte)(((0xf&flags)<<4)+0xf&opcode);
80  
81          int space=(_maskGen!=null)?14:10;
82  
83          do
84          {
85              opcode = _opsent?WebSocketConnectionD06.OP_CONTINUATION:opcode;
86              _opsent=true;
87  
88              int payload=length;
89              if (payload+space>_buffer.capacity())
90              {
91                  // We must fragement, so clear FIN bit
92                  opcode&=(byte)0x7F; // Clear the FIN bit
93                  payload=_buffer.capacity()-space;
94              }
95              else if (last)
96                  opcode|=(byte)0x80; // Set the FIN bit
97  
98              // ensure there is space for header
99              if (_buffer.space() <= space)
100                 expelBuffer(blockFor);
101 
102             // write mask
103             if ((_maskGen!=null))
104             {
105                 _maskGen.genMask(_mask);
106                 _m=0;
107                 _buffer.put(_mask);
108             }
109 
110             // write the opcode and length
111             if (payload>0xffff)
112             {
113                 bufferPut(new byte[]{
114                         opcode,
115                         (byte)0x7f,
116                         (byte)0,
117                         (byte)0,
118                         (byte)0,
119                         (byte)0,
120                         (byte)((payload>>24)&0xff),
121                         (byte)((payload>>16)&0xff),
122                         (byte)((payload>>8)&0xff),
123                         (byte)(payload&0xff)});
124             }
125             else if (payload >=0x7e)
126             {
127                 bufferPut(new byte[]{
128                         opcode,
129                         (byte)0x7e,
130                         (byte)(payload>>8),
131                         (byte)(payload&0xff)});
132             }
133             else
134             {
135                 bufferPut(opcode);
136                 bufferPut((byte)payload);
137             }
138 
139             // write payload
140             int remaining = payload;
141             while (remaining > 0)
142             {
143                 _buffer.compact();
144                 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
145 
146                 if ((_maskGen!=null))
147                 {
148                     for (int i=0;i<chunk;i++)
149                         bufferPut(content[offset+ (payload-remaining)+i]);
150                 }
151                 else
152                     _buffer.put(content, offset + (payload - remaining), chunk);
153 
154                 remaining -= chunk;
155                 if (_buffer.space() > 0)
156                 {
157                     // Gently flush the data, issuing a non-blocking write
158                     flushBuffer();
159                 }
160                 else
161                 {
162                     // Forcibly flush the data, issuing a blocking write
163                     expelBuffer(blockFor);
164                     if (remaining == 0)
165                     {
166                         // Gently flush the data, issuing a non-blocking write
167                         flushBuffer();
168                     }
169                 }
170             }
171             offset+=payload;
172             length-=payload;
173         }
174         while (length>0);
175         _opsent=!last;
176     }
177 
178     private synchronized void bufferPut(byte[] data) throws IOException
179     {
180         if (_maskGen!=null)
181             for (int i=0;i<data.length;i++)
182                 data[i]^=_mask[+_m++%4];
183         _buffer.put(data);
184     }
185 
186     private synchronized void bufferPut(byte data) throws IOException
187     {
188         _buffer.put((byte)(data^_mask[+_m++%4]));
189     }
190 
191     public synchronized int flush(int blockFor) throws IOException
192     {
193         return expelBuffer(blockFor);
194     }
195 
196     public synchronized int flush() throws IOException
197     {
198         int flushed = flushBuffer();
199         if (_buffer!=null && _buffer.length()==0)
200         {
201             _buffers.returnBuffer(_buffer);
202             _buffer=null;
203         }
204         return flushed;
205     }
206 
207     private synchronized int flushBuffer() throws IOException
208     {
209         if (!_endp.isOpen())
210             throw new EofException();
211 
212         if (_buffer!=null)
213             return _endp.flush(_buffer);
214 
215         return 0;
216     }
217 
218     private synchronized int expelBuffer(long blockFor) throws IOException
219     {
220         if (_buffer==null)
221             return 0;
222         int result = flushBuffer();
223         _buffer.compact();
224         if (!_endp.isBlocking())
225         {
226             while (_buffer.space()==0)
227             {
228                 boolean ready = _endp.blockWritable(blockFor);
229                 if (!ready)
230                     throw new IOException("Write timeout");
231 
232                 result += flushBuffer();
233                 _buffer.compact();
234             }
235         }
236         return result;
237     }
238 
239     public synchronized boolean isBufferEmpty()
240     {
241         return _buffer==null || _buffer.length()==0;
242     }
243 
244 }