View Javadoc

1   // ========================================================================
2   // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.websocket;
15  
16  import java.io.IOException;
17  
18  import org.eclipse.jetty.io.Buffer;
19  import org.eclipse.jetty.io.EndPoint;
20  import org.eclipse.jetty.io.EofException;
21  
22  
23  /* ------------------------------------------------------------ */
24  /** WebSocketGenerator.
25   * This class generates websocket packets.
26   * It is fully synchronized because it is likely that async
27   * threads will call the addMessage methods while other
28   * threads are flushing the generator.
29   */
30  public class WebSocketGeneratorD08 implements WebSocketGenerator
31  {
32      final private WebSocketBuffers _buffers;
33      final private EndPoint _endp;
34      private Buffer _buffer;
35      private final byte[] _mask=new byte[4];
36      private int _m;
37      private boolean _opsent;
38      private final MaskGen _maskGen;
39  
40      public WebSocketGeneratorD08(WebSocketBuffers buffers, EndPoint endp)
41      {
42          _buffers=buffers;
43          _endp=endp;
44          _maskGen=null;
45      }
46  
47      public WebSocketGeneratorD08(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
48      {
49          _buffers=buffers;
50          _endp=endp;
51          _maskGen=maskGen;
52      }
53  
54      public synchronized Buffer getBuffer()
55      {
56          return _buffer;
57      }
58  
59      public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
60      {
61          // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
62  
63          boolean mask=_maskGen!=null;
64  
65          if (_buffer==null)
66              _buffer=mask?_buffers.getBuffer():_buffers.getDirectBuffer();
67  
68          boolean last=WebSocketConnectionD08.isLastFrame(flags);
69  
70          int space=mask?14:10;
71  
72          do
73          {
74              opcode = _opsent?WebSocketConnectionD08.OP_CONTINUATION:opcode;
75              opcode=(byte)(((0xf&flags)<<4)+(0xf&opcode));
76              _opsent=true;
77  
78              int payload=length;
79              if (payload+space>_buffer.capacity())
80              {
81                  // We must fragement, so clear FIN bit
82                  opcode=(byte)(opcode&0x7F); // Clear the FIN bit
83                  payload=_buffer.capacity()-space;
84              }
85              else if (last)
86                  opcode= (byte)(opcode|0x80); // Set the FIN bit
87  
88              // ensure there is space for header
89              if (_buffer.space() <= space)
90              {
91                  flushBuffer();
92                  if (_buffer.space() <= space)
93                      flush();
94              }
95  
96              // write the opcode and length
97              if (payload>0xffff)
98              {
99                  _buffer.put(new byte[]{
100                         opcode,
101                         mask?(byte)0xff:(byte)0x7f,
102                         (byte)0,
103                         (byte)0,
104                         (byte)0,
105                         (byte)0,
106                         (byte)((payload>>24)&0xff),
107                         (byte)((payload>>16)&0xff),
108                         (byte)((payload>>8)&0xff),
109                         (byte)(payload&0xff)});
110             }
111             else if (payload >=0x7e)
112             {
113                 _buffer.put(new byte[]{
114                         opcode,
115                         mask?(byte)0xfe:(byte)0x7e,
116                         (byte)(payload>>8),
117                         (byte)(payload&0xff)});
118             }
119             else
120             {
121                 _buffer.put(new byte[]{
122                         opcode,
123                         (byte)(mask?(0x80|payload):payload)});
124             }
125 
126             // write mask
127             if (mask)
128             {
129                 _maskGen.genMask(_mask);
130                 _m=0;
131                 _buffer.put(_mask);
132             }
133 
134 
135             // write payload
136             int remaining = payload;
137             while (remaining > 0)
138             {
139                 _buffer.compact();
140                 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
141 
142                 if (mask)
143                 {
144                     for (int i=0;i<chunk;i++)
145                         _buffer.put((byte)(content[offset+ (payload-remaining)+i]^_mask[+_m++%4]));
146                 }
147                 else
148                     _buffer.put(content, offset + (payload - remaining), chunk);
149 
150                 remaining -= chunk;
151                 if (_buffer.space() > 0)
152                 {
153                     // Gently flush the data, issuing a non-blocking write
154                     flushBuffer();
155                 }
156                 else
157                 {
158                     // Forcibly flush the data, issuing a blocking write
159                     flush();
160                     if (remaining == 0)
161                     {
162                         // Gently flush the data, issuing a non-blocking write
163                         flushBuffer();
164                     }
165                 }
166             }
167             offset+=payload;
168             length-=payload;
169         }
170         while (length>0);
171         _opsent=!last;
172 
173         if (_buffer!=null && _buffer.length()==0)
174         {
175             _buffers.returnBuffer(_buffer);
176             _buffer=null;
177         }
178     }
179 
180     public synchronized int flushBuffer() throws IOException
181     {
182         if (!_endp.isOpen())
183             throw new EofException();
184 
185         if (_buffer!=null)
186             return _endp.flush(_buffer);
187 
188         return 0;
189     }
190 
191     public synchronized int flush() throws IOException
192     {
193         if (_buffer==null)
194             return 0;
195         int result = flushBuffer();
196 
197         if (!_endp.isBlocking())
198         {
199             long now = System.currentTimeMillis();
200             long end=now+_endp.getMaxIdleTime();
201             while (_buffer.length()>0)
202             {
203                 boolean ready = _endp.blockWritable(end-now);
204                 if (!ready)
205                 {
206                     now = System.currentTimeMillis();
207                     if (now<end)
208                         continue;
209                     throw new IOException("Write timeout");
210                 }
211 
212                 result += flushBuffer();
213             }
214         }
215         _buffer.compact();
216         return result;
217     }
218 
219     public synchronized boolean isBufferEmpty()
220     {
221         return _buffer==null || _buffer.length()==0;
222     }
223 
224     public synchronized void returnBuffer()
225     {
226         if (_buffer!=null && _buffer.length()==0)
227         {
228             _buffers.returnBuffer(_buffer);
229             _buffer=null;
230         }
231     }
232 
233 }