View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket;
20  
21  import java.io.IOException;
22  
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.EndPoint;
25  import org.eclipse.jetty.io.EofException;
26  
27  
28  /* ------------------------------------------------------------ */
29  /** WebSocketGenerator.
30   * This class generates websocket packets.
31   * It is fully synchronized because it is likely that async
32   * threads will call the addMessage methods while other
33   * threads are flushing the generator.
34   */
35  public class WebSocketGeneratorRFC6455 implements WebSocketGenerator
36  {
37      final private WebSocketBuffers _buffers;
38      final private EndPoint _endp;
39      private Buffer _buffer;
40      private final byte[] _mask=new byte[4];
41      private int _m;
42      private boolean _opsent;
43      private final MaskGen _maskGen;
44      private boolean _closed;
45  
46      public WebSocketGeneratorRFC6455(WebSocketBuffers buffers, EndPoint endp)
47      {
48          _buffers=buffers;
49          _endp=endp;
50          _maskGen=null;
51      }
52  
53      public WebSocketGeneratorRFC6455(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
54      {
55          _buffers=buffers;
56          _endp=endp;
57          _maskGen=maskGen;
58      }
59  
60      public synchronized Buffer getBuffer()
61      {
62          return _buffer;
63      }
64  
65      public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
66      {
67          // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
68  
69          if (_closed)
70              throw new EofException("Closed");
71          if (opcode==WebSocketConnectionRFC6455.OP_CLOSE)
72              _closed=true;
73  
74          boolean mask=_maskGen!=null;
75  
76          if (_buffer==null)
77              _buffer=mask?_buffers.getBuffer():_buffers.getDirectBuffer();
78  
79          boolean last=WebSocketConnectionRFC6455.isLastFrame(flags);
80  
81          int space=mask?14:10;
82  
83          do
84          {
85              opcode = _opsent?WebSocketConnectionRFC6455.OP_CONTINUATION:opcode;
86              opcode=(byte)(((0xf&flags)<<4)+(0xf&opcode));
87              _opsent=true;
88  
89              int payload=length;
90              if (payload+space>_buffer.capacity())
91              {
92                  // We must fragement, so clear FIN bit
93                  opcode=(byte)(opcode&0x7F); // Clear the FIN bit
94                  payload=_buffer.capacity()-space;
95              }
96              else if (last)
97                  opcode= (byte)(opcode|0x80); // Set the FIN bit
98  
99              // ensure there is space for header
100             if (_buffer.space() <= space)
101             {
102                 flushBuffer();
103                 if (_buffer.space() <= space)
104                     flush();
105             }
106 
107             // write the opcode and length
108             if (payload>0xffff)
109             {
110                 _buffer.put(new byte[]{
111                         opcode,
112                         mask?(byte)0xff:(byte)0x7f,
113                         (byte)0,
114                         (byte)0,
115                         (byte)0,
116                         (byte)0,
117                         (byte)((payload>>24)&0xff),
118                         (byte)((payload>>16)&0xff),
119                         (byte)((payload>>8)&0xff),
120                         (byte)(payload&0xff)});
121             }
122             else if (payload >=0x7e)
123             {
124                 _buffer.put(new byte[]{
125                         opcode,
126                         mask?(byte)0xfe:(byte)0x7e,
127                         (byte)(payload>>8),
128                         (byte)(payload&0xff)});
129             }
130             else
131             {
132                 _buffer.put(new byte[]{
133                         opcode,
134                         (byte)(mask?(0x80|payload):payload)});
135             }
136 
137             // write mask
138             if (mask)
139             {
140                 _maskGen.genMask(_mask);
141                 _m=0;
142                 _buffer.put(_mask);
143             }
144 
145             // write payload
146             int remaining = payload;
147             while (remaining > 0)
148             {
149                 _buffer.compact();
150                 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
151 
152                 if (mask)
153                 {
154                     for (int i=0;i<chunk;i++)
155                         _buffer.put((byte)(content[offset+ (payload-remaining)+i]^_mask[+_m++%4]));
156                 }
157                 else
158                     _buffer.put(content, offset + (payload - remaining), chunk);
159 
160                 remaining -= chunk;
161                 if (_buffer.space() > 0)
162                 {
163                     // Gently flush the data, issuing a non-blocking write
164                     flushBuffer();
165                 }
166                 else
167                 {
168                     // Forcibly flush the data, issuing a blocking write
169                     flush();
170                     if (remaining == 0)
171                     {
172                         // Gently flush the data, issuing a non-blocking write
173                         flushBuffer();
174                     }
175                 }
176             }
177             offset+=payload;
178             length-=payload;
179         }
180         while (length>0);
181         _opsent=!last;
182 
183         if (_buffer!=null && _buffer.length()==0)
184         {
185             _buffers.returnBuffer(_buffer);
186             _buffer=null;
187         }
188     }
189 
190     public synchronized int flushBuffer() throws IOException
191     {
192         if (!_endp.isOpen())
193             throw new EofException();
194 
195         if (_buffer!=null)
196         {
197             int flushed=_buffer.hasContent()?_endp.flush(_buffer):0;
198             if (_closed&&_buffer.length()==0)
199                 _endp.shutdownOutput();
200             return flushed;
201         }
202 
203         return 0;
204     }
205 
206     public synchronized int flush() throws IOException
207     {
208         if (_buffer==null)
209             return 0;
210         int result = flushBuffer();
211 
212         if (!_endp.isBlocking())
213         {
214             long now = System.currentTimeMillis();
215             long end=now+_endp.getMaxIdleTime();
216             while (_buffer.length()>0)
217             {
218                 boolean ready = _endp.blockWritable(end-now);
219                 if (!ready)
220                 {
221                     now = System.currentTimeMillis();
222                     if (now<end)
223                         continue;
224                     throw new IOException("Write timeout");
225                 }
226 
227                 result += flushBuffer();
228             }
229         }
230         _buffer.compact();
231         return result;
232     }
233 
234     public synchronized boolean isBufferEmpty()
235     {
236         return _buffer==null || _buffer.length()==0;
237     }
238 
239     public synchronized void returnBuffer()
240     {
241         if (_buffer!=null && _buffer.length()==0)
242         {
243             _buffers.returnBuffer(_buffer);
244             _buffer=null;
245         }
246     }
247 
248     @Override
249     public String toString()
250     {
251         // Do NOT use synchronized (this)
252         // because it's very easy to deadlock when debugging is enabled.
253         // We do a best effort to print the right toString() and that's it.
254         Buffer buffer = _buffer;
255         return String.format("%s@%x closed=%b buffer=%d",
256                 getClass().getSimpleName(),
257                 hashCode(),
258                 _closed,
259                 buffer == null ? -1 : buffer.length());
260     }
261 }