View Javadoc

1   /*******************************************************************************
2    * Copyright (c) 2011 Intalio, Inc.
3    * ======================================================================
4    * All rights reserved. This program and the accompanying materials
5    * are made available under the terms of the Eclipse Public License v1.0
6    * and Apache License v2.0 which accompanies this distribution.
7    *
8    *   The Eclipse Public License is available at
9    *   http://www.eclipse.org/legal/epl-v10.html
10   *
11   *   The Apache License v2.0 is available at
12   *   http://www.opensource.org/licenses/apache2.0.php
13   *
14   * You may elect to redistribute this code under either of these licenses.
15   *******************************************************************************/
16  // ========================================================================
17  // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
18  // ------------------------------------------------------------------------
19  // All rights reserved. This program and the accompanying materials
20  // are made available under the terms of the Eclipse Public License v1.0
21  // and Apache License v2.0 which accompanies this distribution.
22  // The Eclipse Public License is available at
23  // http://www.eclipse.org/legal/epl-v10.html
24  // The Apache License v2.0 is available at
25  // http://www.opensource.org/licenses/apache2.0.php
26  // You may elect to redistribute this code under either of these licenses.
27  // ========================================================================
28  
29  package org.eclipse.jetty.websocket;
30  
31  import java.io.IOException;
32  
33  import org.eclipse.jetty.io.Buffer;
34  import org.eclipse.jetty.io.EndPoint;
35  import org.eclipse.jetty.io.EofException;
36  
37  
38  /* ------------------------------------------------------------ */
39  /** WebSocketGenerator.
40   * This class generates websocket packets.
41   * It is fully synchronized because it is likely that async
42   * threads will call the addMessage methods while other
43   * threads are flushing the generator.
44   */
45  public class WebSocketGeneratorD08 implements WebSocketGenerator
46  {
47      final private WebSocketBuffers _buffers;
48      final private EndPoint _endp;
49      private Buffer _buffer;
50      private final byte[] _mask=new byte[4];
51      private int _m;
52      private boolean _opsent;
53      private final MaskGen _maskGen;
54  
55      public WebSocketGeneratorD08(WebSocketBuffers buffers, EndPoint endp)
56      {
57          _buffers=buffers;
58          _endp=endp;
59          _maskGen=null;
60      }
61  
62      public WebSocketGeneratorD08(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
63      {
64          _buffers=buffers;
65          _endp=endp;
66          _maskGen=maskGen;
67      }
68  
69      public synchronized Buffer getBuffer()
70      {
71          return _buffer;
72      }
73  
74      public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
75      {
76          // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
77  
78          boolean mask=_maskGen!=null;
79  
80          if (_buffer==null)
81              _buffer=mask?_buffers.getBuffer():_buffers.getDirectBuffer();
82  
83          boolean last=WebSocketConnectionD08.isLastFrame(flags);
84  
85          int space=mask?14:10;
86  
87          do
88          {
89              opcode = _opsent?WebSocketConnectionD08.OP_CONTINUATION:opcode;
90              opcode=(byte)(((0xf&flags)<<4)+(0xf&opcode));
91              _opsent=true;
92  
93              int payload=length;
94              if (payload+space>_buffer.capacity())
95              {
96                  // We must fragement, so clear FIN bit
97                  opcode=(byte)(opcode&0x7F); // Clear the FIN bit
98                  payload=_buffer.capacity()-space;
99              }
100             else if (last)
101                 opcode= (byte)(opcode|0x80); // Set the FIN bit
102 
103             // ensure there is space for header
104             if (_buffer.space() <= space)
105             {
106                 flushBuffer();
107                 if (_buffer.space() <= space)
108                     flush();
109             }
110 
111             // write the opcode and length
112             if (payload>0xffff)
113             {
114                 _buffer.put(new byte[]{
115                         opcode,
116                         mask?(byte)0xff:(byte)0x7f,
117                         (byte)0,
118                         (byte)0,
119                         (byte)0,
120                         (byte)0,
121                         (byte)((payload>>24)&0xff),
122                         (byte)((payload>>16)&0xff),
123                         (byte)((payload>>8)&0xff),
124                         (byte)(payload&0xff)});
125             }
126             else if (payload >=0x7e)
127             {
128                 _buffer.put(new byte[]{
129                         opcode,
130                         mask?(byte)0xfe:(byte)0x7e,
131                         (byte)(payload>>8),
132                         (byte)(payload&0xff)});
133             }
134             else
135             {
136                 _buffer.put(new byte[]{
137                         opcode,
138                         (byte)(mask?(0x80|payload):payload)});
139             }
140 
141             // write mask
142             if (mask)
143             {
144                 _maskGen.genMask(_mask);
145                 _m=0;
146                 _buffer.put(_mask);
147             }
148 
149 
150             // write payload
151             int remaining = payload;
152             while (remaining > 0)
153             {
154                 _buffer.compact();
155                 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
156 
157                 if (mask)
158                 {
159                     for (int i=0;i<chunk;i++)
160                         _buffer.put((byte)(content[offset+ (payload-remaining)+i]^_mask[+_m++%4]));
161                 }
162                 else
163                     _buffer.put(content, offset + (payload - remaining), chunk);
164 
165                 remaining -= chunk;
166                 if (_buffer.space() > 0)
167                 {
168                     // Gently flush the data, issuing a non-blocking write
169                     flushBuffer();
170                 }
171                 else
172                 {
173                     // Forcibly flush the data, issuing a blocking write
174                     flush();
175                     if (remaining == 0)
176                     {
177                         // Gently flush the data, issuing a non-blocking write
178                         flushBuffer();
179                     }
180                 }
181             }
182             offset+=payload;
183             length-=payload;
184         }
185         while (length>0);
186         _opsent=!last;
187 
188         if (_buffer!=null && _buffer.length()==0)
189         {
190             _buffers.returnBuffer(_buffer);
191             _buffer=null;
192         }
193     }
194 
195     public synchronized int flushBuffer() throws IOException
196     {
197         if (!_endp.isOpen())
198             throw new EofException();
199 
200         if (_buffer!=null)
201             return _endp.flush(_buffer);
202 
203         return 0;
204     }
205 
206     public synchronized int flush() throws IOException
207     {
208         if (_buffer==null)
209             return 0;
210         int result = flushBuffer();
211 
212         if (!_endp.isBlocking())
213         {
214             long now = System.currentTimeMillis();
215             long end=now+_endp.getMaxIdleTime();
216             while (_buffer.length()>0)
217             {
218                 boolean ready = _endp.blockWritable(end-now);
219                 if (!ready)
220                 {
221                     now = System.currentTimeMillis();
222                     if (now<end)
223                         continue;
224                     throw new IOException("Write timeout");
225                 }
226 
227                 result += flushBuffer();
228             }
229         }
230         _buffer.compact();
231         return result;
232     }
233 
234     public synchronized boolean isBufferEmpty()
235     {
236         return _buffer==null || _buffer.length()==0;
237     }
238 
239     public synchronized void returnBuffer()
240     {
241         if (_buffer!=null && _buffer.length()==0)
242         {
243             _buffers.returnBuffer(_buffer);
244             _buffer=null;
245         }
246     }
247 
248 }