View Javadoc

1   // ========================================================================
2   // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.websocket;
15  
16  import java.io.IOException;
17  
18  import org.eclipse.jetty.io.Buffer;
19  import org.eclipse.jetty.io.EndPoint;
20  import org.eclipse.jetty.io.EofException;
21  
22  
23  /* ------------------------------------------------------------ */
24  /** WebSocketGenerator.
25   * This class generates websocket packets.
26   * It is fully synchronized because it is likely that async
27   * threads will call the addMessage methods while other
28   * threads are flushing the generator.
29   */
30  public class WebSocketGeneratorD12 implements WebSocketGenerator
31  {
32      final private WebSocketBuffers _buffers;
33      final private EndPoint _endp;
34      private Buffer _buffer;
35      private final byte[] _mask=new byte[4];
36      private int _m;
37      private boolean _opsent;
38      private final MaskGen _maskGen;
39  
40      public WebSocketGeneratorD12(WebSocketBuffers buffers, EndPoint endp)
41      {
42          _buffers=buffers;
43          _endp=endp;
44          _maskGen=null;
45      }
46      
47      public WebSocketGeneratorD12(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
48      {
49          _buffers=buffers;
50          _endp=endp;
51          _maskGen=maskGen;
52      }
53  
54      public synchronized Buffer getBuffer()
55      {
56          return _buffer;
57      }
58      
59      public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
60      {
61          // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
62          
63          boolean mask=_maskGen!=null;
64          
65          if (_buffer==null)
66              _buffer=mask?_buffers.getBuffer():_buffers.getDirectBuffer();
67              
68          boolean last=WebSocketConnectionD12.isLastFrame(flags);
69          byte orig=opcode;
70          
71          int space=mask?14:10;
72          
73          do
74          {
75              opcode = _opsent?WebSocketConnectionD12.OP_CONTINUATION:opcode;
76              opcode=(byte)(((0xf&flags)<<4)+(0xf&opcode));
77              _opsent=true;
78              
79              int payload=length;
80              if (payload+space>_buffer.capacity())
81              {
82                  // We must fragement, so clear FIN bit
83                  opcode=(byte)(opcode&0x7F); // Clear the FIN bit
84                  payload=_buffer.capacity()-space;
85              }
86              else if (last)
87                  opcode= (byte)(opcode|0x80); // Set the FIN bit
88        
89              // ensure there is space for header
90              if (_buffer.space() <= space)
91              {
92                  flushBuffer();
93                  if (_buffer.space() <= space)
94                      flush();
95              }
96              
97              // write the opcode and length
98              if (payload>0xffff)
99              {
100                 _buffer.put(new byte[]{
101                         opcode,
102                         mask?(byte)0xff:(byte)0x7f,
103                         (byte)((payload>>56)&0x7f),
104                         (byte)((payload>>48)&0xff), 
105                         (byte)((payload>>40)&0xff), 
106                         (byte)((payload>>32)&0xff),
107                         (byte)((payload>>24)&0xff),
108                         (byte)((payload>>16)&0xff),
109                         (byte)((payload>>8)&0xff),
110                         (byte)(payload&0xff)});
111             }
112             else if (payload >=0x7e)
113             {
114                 _buffer.put(new byte[]{
115                         opcode,
116                         mask?(byte)0xfe:(byte)0x7e,
117                         (byte)(payload>>8),
118                         (byte)(payload&0xff)});
119             }
120             else
121             {
122                 _buffer.put(new byte[]{ 
123                         opcode,
124                         (byte)(mask?(0x80|payload):payload)});
125             }
126 
127             // write mask
128             if (mask)
129             {
130                 _maskGen.genMask(_mask);
131                 _m=0;
132                 _buffer.put(_mask);
133             }
134 
135             
136             // write payload
137             int remaining = payload;
138             while (remaining > 0)
139             {
140                 _buffer.compact();
141                 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
142                 
143                 if (mask)
144                 {
145                     for (int i=0;i<chunk;i++)
146                         _buffer.put((byte)(content[offset+ (payload-remaining)+i]^_mask[+_m++%4]));
147                 }
148                 else
149                     _buffer.put(content, offset + (payload - remaining), chunk);
150                 
151                 remaining -= chunk;
152                 if (_buffer.space() > 0)
153                 {
154                     // Gently flush the data, issuing a non-blocking write
155                     flushBuffer();
156                 }
157                 else
158                 {
159                     // Forcibly flush the data, issuing a blocking write
160                     flush();
161                     if (remaining == 0)
162                     {
163                         // Gently flush the data, issuing a non-blocking write
164                         flushBuffer();
165                     }
166                 }
167             }
168             offset+=payload;
169             length-=payload;
170         }
171         while (length>0);
172         _opsent=!last;
173         
174         if (_buffer!=null && _buffer.length()==0)
175         {
176             _buffers.returnBuffer(_buffer);
177             _buffer=null;
178         }
179     }
180 
181     public synchronized int flushBuffer() throws IOException
182     {
183         if (!_endp.isOpen())
184             throw new EofException();
185 
186         if (_buffer!=null)
187             return _endp.flush(_buffer);
188 
189         return 0;
190     }
191 
192     public synchronized int flush() throws IOException
193     {
194         if (_buffer==null)
195             return 0;
196         int result = flushBuffer();
197         
198         if (!_endp.isBlocking())
199         {
200             long now = System.currentTimeMillis();
201             long end=now+_endp.getMaxIdleTime();
202             while (_buffer.length()>0)
203             {
204                 boolean ready = _endp.blockWritable(end-now);
205                 if (!ready)
206                 {
207                     now = System.currentTimeMillis();
208                     if (now<end)
209                         continue;
210                     throw new IOException("Write timeout");
211                 }
212 
213                 result += flushBuffer();
214             }
215         }
216         _buffer.compact();
217         return result;
218     }
219 
220     public synchronized boolean isBufferEmpty()
221     {
222         return _buffer==null || _buffer.length()==0;
223     }
224 
225     public synchronized void returnBuffer()
226     {
227         if (_buffer!=null && _buffer.length()==0)
228         {
229             _buffers.returnBuffer(_buffer);
230             _buffer=null;
231         }
232     }
233 
234 }