View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket;
20  
21  import java.io.IOException;
22  
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.EndPoint;
25  import org.eclipse.jetty.io.EofException;
26  
27  
28  /* ------------------------------------------------------------ */
29  /** WebSocketGenerator.
30   * This class generates websocket packets.
31   * It is fully synchronized because it is likely that async
32   * threads will call the addMessage methods while other
33   * threads are flushing the generator.
34   */
35  public class WebSocketGeneratorD08 implements WebSocketGenerator
36  {
37      final private WebSocketBuffers _buffers;
38      final private EndPoint _endp;
39      private Buffer _buffer;
40      private final byte[] _mask=new byte[4];
41      private int _m;
42      private boolean _opsent;
43      private final MaskGen _maskGen;
44  
45      public WebSocketGeneratorD08(WebSocketBuffers buffers, EndPoint endp)
46      {
47          _buffers=buffers;
48          _endp=endp;
49          _maskGen=null;
50      }
51  
52      public WebSocketGeneratorD08(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
53      {
54          _buffers=buffers;
55          _endp=endp;
56          _maskGen=maskGen;
57      }
58  
59      public synchronized Buffer getBuffer()
60      {
61          return _buffer;
62      }
63  
64      public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
65      {
66          // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
67  
68          boolean mask=_maskGen!=null;
69  
70          if (_buffer==null)
71              _buffer=mask?_buffers.getBuffer():_buffers.getDirectBuffer();
72  
73          boolean last=WebSocketConnectionD08.isLastFrame(flags);
74  
75          int space=mask?14:10;
76  
77          do
78          {
79              opcode = _opsent?WebSocketConnectionD08.OP_CONTINUATION:opcode;
80              opcode=(byte)(((0xf&flags)<<4)+(0xf&opcode));
81              _opsent=true;
82  
83              int payload=length;
84              if (payload+space>_buffer.capacity())
85              {
86                  // We must fragement, so clear FIN bit
87                  opcode=(byte)(opcode&0x7F); // Clear the FIN bit
88                  payload=_buffer.capacity()-space;
89              }
90              else if (last)
91                  opcode= (byte)(opcode|0x80); // Set the FIN bit
92  
93              // ensure there is space for header
94              if (_buffer.space() <= space)
95              {
96                  flushBuffer();
97                  if (_buffer.space() <= space)
98                      flush();
99              }
100 
101             // write the opcode and length
102             if (payload>0xffff)
103             {
104                 _buffer.put(new byte[]{
105                         opcode,
106                         mask?(byte)0xff:(byte)0x7f,
107                         (byte)0,
108                         (byte)0,
109                         (byte)0,
110                         (byte)0,
111                         (byte)((payload>>24)&0xff),
112                         (byte)((payload>>16)&0xff),
113                         (byte)((payload>>8)&0xff),
114                         (byte)(payload&0xff)});
115             }
116             else if (payload >=0x7e)
117             {
118                 _buffer.put(new byte[]{
119                         opcode,
120                         mask?(byte)0xfe:(byte)0x7e,
121                         (byte)(payload>>8),
122                         (byte)(payload&0xff)});
123             }
124             else
125             {
126                 _buffer.put(new byte[]{
127                         opcode,
128                         (byte)(mask?(0x80|payload):payload)});
129             }
130 
131             // write mask
132             if (mask)
133             {
134                 _maskGen.genMask(_mask);
135                 _m=0;
136                 _buffer.put(_mask);
137             }
138 
139 
140             // write payload
141             int remaining = payload;
142             while (remaining > 0)
143             {
144                 _buffer.compact();
145                 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
146 
147                 if (mask)
148                 {
149                     for (int i=0;i<chunk;i++)
150                         _buffer.put((byte)(content[offset+ (payload-remaining)+i]^_mask[+_m++%4]));
151                 }
152                 else
153                     _buffer.put(content, offset + (payload - remaining), chunk);
154 
155                 remaining -= chunk;
156                 if (_buffer.space() > 0)
157                 {
158                     // Gently flush the data, issuing a non-blocking write
159                     flushBuffer();
160                 }
161                 else
162                 {
163                     // Forcibly flush the data, issuing a blocking write
164                     flush();
165                     if (remaining == 0)
166                     {
167                         // Gently flush the data, issuing a non-blocking write
168                         flushBuffer();
169                     }
170                 }
171             }
172             offset+=payload;
173             length-=payload;
174         }
175         while (length>0);
176         _opsent=!last;
177 
178         if (_buffer!=null && _buffer.length()==0)
179         {
180             _buffers.returnBuffer(_buffer);
181             _buffer=null;
182         }
183     }
184 
185     public synchronized int flushBuffer() throws IOException
186     {
187         if (!_endp.isOpen())
188             throw new EofException();
189 
190         if (_buffer!=null)
191             return _endp.flush(_buffer);
192 
193         return 0;
194     }
195 
196     public synchronized int flush() throws IOException
197     {
198         if (_buffer==null)
199             return 0;
200         int result = flushBuffer();
201 
202         if (!_endp.isBlocking())
203         {
204             long now = System.currentTimeMillis();
205             long end=now+_endp.getMaxIdleTime();
206             while (_buffer.length()>0)
207             {
208                 boolean ready = _endp.blockWritable(end-now);
209                 if (!ready)
210                 {
211                     now = System.currentTimeMillis();
212                     if (now<end)
213                         continue;
214                     throw new IOException("Write timeout");
215                 }
216 
217                 result += flushBuffer();
218             }
219         }
220         _buffer.compact();
221         return result;
222     }
223 
224     public synchronized boolean isBufferEmpty()
225     {
226         return _buffer==null || _buffer.length()==0;
227     }
228 
229     public synchronized void returnBuffer()
230     {
231         if (_buffer!=null && _buffer.length()==0)
232         {
233             _buffers.returnBuffer(_buffer);
234             _buffer=null;
235         }
236     }
237 
238 }