View Javadoc

1   // ========================================================================
2   // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.websocket;
15  
16  import java.io.IOException;
17  
18  import org.eclipse.jetty.io.Buffer;
19  import org.eclipse.jetty.io.EndPoint;
20  import org.eclipse.jetty.io.EofException;
21  
22  
23  /* ------------------------------------------------------------ */
24  /** WebSocketGenerator.
25   * This class generates websocket packets.
26   * It is fully synchronized because it is likely that async
27   * threads will call the addMessage methods while other
28   * threads are flushing the generator.
29   */
30  public class WebSocketGeneratorD13 implements WebSocketGenerator
31  {
32      final private WebSocketBuffers _buffers;
33      final private EndPoint _endp;
34      private Buffer _buffer;
35      private final byte[] _mask=new byte[4];
36      private int _m;
37      private boolean _opsent;
38      private final MaskGen _maskGen;
39      private boolean _closed;
40  
41      public WebSocketGeneratorD13(WebSocketBuffers buffers, EndPoint endp)
42      {
43          _buffers=buffers;
44          _endp=endp;
45          _maskGen=null;
46      }
47  
48      public WebSocketGeneratorD13(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
49      {
50          _buffers=buffers;
51          _endp=endp;
52          _maskGen=maskGen;
53      }
54  
55      public synchronized Buffer getBuffer()
56      {
57          return _buffer;
58      }
59  
60      public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
61      {
62          // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
63  
64          if (_closed)
65              throw new EofException("Closed");
66          if (opcode==WebSocketConnectionD13.OP_CLOSE)
67              _closed=true;
68          
69          boolean mask=_maskGen!=null;
70  
71          if (_buffer==null)
72              _buffer=mask?_buffers.getBuffer():_buffers.getDirectBuffer();
73  
74          boolean last=WebSocketConnectionD13.isLastFrame(flags);
75  
76          int space=mask?14:10;
77  
78          do
79          {
80              opcode = _opsent?WebSocketConnectionD13.OP_CONTINUATION:opcode;
81              opcode=(byte)(((0xf&flags)<<4)+(0xf&opcode));
82              _opsent=true;
83  
84              int payload=length;
85              if (payload+space>_buffer.capacity())
86              {
87                  // We must fragement, so clear FIN bit
88                  opcode=(byte)(opcode&0x7F); // Clear the FIN bit
89                  payload=_buffer.capacity()-space;
90              }
91              else if (last)
92                  opcode= (byte)(opcode|0x80); // Set the FIN bit
93  
94              // ensure there is space for header
95              if (_buffer.space() <= space)
96              {
97                  flushBuffer();
98                  if (_buffer.space() <= space)
99                      flush();
100             }
101 
102             // write the opcode and length
103             if (payload>0xffff)
104             {
105                 _buffer.put(new byte[]{
106                         opcode,
107                         mask?(byte)0xff:(byte)0x7f,
108                         (byte)0,
109                         (byte)0,
110                         (byte)0,
111                         (byte)0,
112                         (byte)((payload>>24)&0xff),
113                         (byte)((payload>>16)&0xff),
114                         (byte)((payload>>8)&0xff),
115                         (byte)(payload&0xff)});
116             }
117             else if (payload >=0x7e)
118             {
119                 _buffer.put(new byte[]{
120                         opcode,
121                         mask?(byte)0xfe:(byte)0x7e,
122                         (byte)(payload>>8),
123                         (byte)(payload&0xff)});
124             }
125             else
126             {
127                 _buffer.put(new byte[]{
128                         opcode,
129                         (byte)(mask?(0x80|payload):payload)});
130             }
131 
132             // write mask
133             if (mask)
134             {
135                 _maskGen.genMask(_mask);
136                 _m=0;
137                 _buffer.put(_mask);
138             }
139 
140             // write payload
141             int remaining = payload;
142             while (remaining > 0)
143             {
144                 _buffer.compact();
145                 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
146 
147                 if (mask)
148                 {
149                     for (int i=0;i<chunk;i++)
150                         _buffer.put((byte)(content[offset+ (payload-remaining)+i]^_mask[+_m++%4]));
151                 }
152                 else
153                     _buffer.put(content, offset + (payload - remaining), chunk);
154 
155                 remaining -= chunk;
156                 if (_buffer.space() > 0)
157                 {
158                     // Gently flush the data, issuing a non-blocking write
159                     flushBuffer();
160                 }
161                 else
162                 {
163                     // Forcibly flush the data, issuing a blocking write
164                     flush();
165                     if (remaining == 0)
166                     {
167                         // Gently flush the data, issuing a non-blocking write
168                         flushBuffer();
169                     }
170                 }
171             }
172             offset+=payload;
173             length-=payload;
174         }
175         while (length>0);
176         _opsent=!last;
177 
178         if (_buffer!=null && _buffer.length()==0)
179         {
180             _buffers.returnBuffer(_buffer);
181             _buffer=null;
182         }
183     }
184 
185     public synchronized int flushBuffer() throws IOException
186     {
187         if (!_endp.isOpen())
188             throw new EofException();
189 
190         if (_buffer!=null)
191         {
192             int flushed=_buffer.hasContent()?_endp.flush(_buffer):0;
193             if (_closed&&_buffer.length()==0)
194                 _endp.shutdownOutput();
195             return flushed;
196         }
197 
198         return 0;
199     }
200 
201     public synchronized int flush() throws IOException
202     {
203         if (_buffer==null)
204             return 0;
205         int result = flushBuffer();
206 
207         if (!_endp.isBlocking())
208         {
209             long now = System.currentTimeMillis();
210             long end=now+_endp.getMaxIdleTime();
211             while (_buffer.length()>0)
212             {
213                 boolean ready = _endp.blockWritable(end-now);
214                 if (!ready)
215                 {
216                     now = System.currentTimeMillis();
217                     if (now<end)
218                         continue;
219                     throw new IOException("Write timeout");
220                 }
221 
222                 result += flushBuffer();
223             }
224         }
225         _buffer.compact();
226         return result;
227     }
228 
229     public synchronized boolean isBufferEmpty()
230     {
231         return _buffer==null || _buffer.length()==0;
232     }
233 
234     public synchronized void returnBuffer()
235     {
236         if (_buffer!=null && _buffer.length()==0)
237         {
238             _buffers.returnBuffer(_buffer);
239             _buffer=null;
240         }
241     }
242 
243 }