View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket;
20  
21  import java.io.IOException;
22  
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.EndPoint;
25  import org.eclipse.jetty.io.EofException;
26  
27  
28  /* ------------------------------------------------------------ */
29  /** WebSocketGenerator.
30   * This class generates websocket packets.
31   * It is fully synchronized because it is likely that async
32   * threads will call the addMessage methods while other
33   * threads are flushing the generator.
34   */
35  public class WebSocketGeneratorD06 implements WebSocketGenerator
36  {
37      final private WebSocketBuffers _buffers;
38      final private EndPoint _endp;
39      private Buffer _buffer;
40      private final byte[] _mask=new byte[4];
41      private int _m;
42      private boolean _opsent;
43      private final MaskGen _maskGen;
44  
45      public WebSocketGeneratorD06(WebSocketBuffers buffers, EndPoint endp)
46      {
47          _buffers=buffers;
48          _endp=endp;
49          _maskGen=null;
50      }
51  
52      public WebSocketGeneratorD06(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
53      {
54          _buffers=buffers;
55          _endp=endp;
56          _maskGen=maskGen;
57      }
58  
59      public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
60      {
61          // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
62  
63          long blockFor=_endp.getMaxIdleTime();
64  
65          if (_buffer==null)
66              _buffer=(_maskGen!=null)?_buffers.getBuffer():_buffers.getDirectBuffer();
67  
68          boolean last=WebSocketConnectionD06.isLastFrame(flags);
69          opcode=(byte)(((0xf&flags)<<4)+0xf&opcode);
70  
71          int space=(_maskGen!=null)?14:10;
72  
73          do
74          {
75              opcode = _opsent?WebSocketConnectionD06.OP_CONTINUATION:opcode;
76              _opsent=true;
77  
78              int payload=length;
79              if (payload+space>_buffer.capacity())
80              {
81                  // We must fragement, so clear FIN bit
82                  opcode&=(byte)0x7F; // Clear the FIN bit
83                  payload=_buffer.capacity()-space;
84              }
85              else if (last)
86                  opcode|=(byte)0x80; // Set the FIN bit
87  
88              // ensure there is space for header
89              if (_buffer.space() <= space)
90                  expelBuffer(blockFor);
91  
92              // write mask
93              if ((_maskGen!=null))
94              {
95                  _maskGen.genMask(_mask);
96                  _m=0;
97                  _buffer.put(_mask);
98              }
99  
100             // write the opcode and length
101             if (payload>0xffff)
102             {
103                 bufferPut(new byte[]{
104                         opcode,
105                         (byte)0x7f,
106                         (byte)0,
107                         (byte)0,
108                         (byte)0,
109                         (byte)0,
110                         (byte)((payload>>24)&0xff),
111                         (byte)((payload>>16)&0xff),
112                         (byte)((payload>>8)&0xff),
113                         (byte)(payload&0xff)});
114             }
115             else if (payload >=0x7e)
116             {
117                 bufferPut(new byte[]{
118                         opcode,
119                         (byte)0x7e,
120                         (byte)(payload>>8),
121                         (byte)(payload&0xff)});
122             }
123             else
124             {
125                 bufferPut(opcode);
126                 bufferPut((byte)payload);
127             }
128 
129             // write payload
130             int remaining = payload;
131             while (remaining > 0)
132             {
133                 _buffer.compact();
134                 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
135 
136                 if ((_maskGen!=null))
137                 {
138                     for (int i=0;i<chunk;i++)
139                         bufferPut(content[offset+ (payload-remaining)+i]);
140                 }
141                 else
142                     _buffer.put(content, offset + (payload - remaining), chunk);
143 
144                 remaining -= chunk;
145                 if (_buffer.space() > 0)
146                 {
147                     // Gently flush the data, issuing a non-blocking write
148                     flushBuffer();
149                 }
150                 else
151                 {
152                     // Forcibly flush the data, issuing a blocking write
153                     expelBuffer(blockFor);
154                     if (remaining == 0)
155                     {
156                         // Gently flush the data, issuing a non-blocking write
157                         flushBuffer();
158                     }
159                 }
160             }
161             offset+=payload;
162             length-=payload;
163         }
164         while (length>0);
165         _opsent=!last;
166     }
167 
168     private synchronized void bufferPut(byte[] data) throws IOException
169     {
170         if (_maskGen!=null)
171             for (int i=0;i<data.length;i++)
172                 data[i]^=_mask[+_m++%4];
173         _buffer.put(data);
174     }
175 
176     private synchronized void bufferPut(byte data) throws IOException
177     {
178         _buffer.put((byte)(data^_mask[+_m++%4]));
179     }
180 
181     public synchronized int flush(int blockFor) throws IOException
182     {
183         return expelBuffer(blockFor);
184     }
185 
186     public synchronized int flush() throws IOException
187     {
188         int flushed = flushBuffer();
189         if (_buffer!=null && _buffer.length()==0)
190         {
191             _buffers.returnBuffer(_buffer);
192             _buffer=null;
193         }
194         return flushed;
195     }
196 
197     private synchronized int flushBuffer() throws IOException
198     {
199         if (!_endp.isOpen())
200             throw new EofException();
201 
202         if (_buffer!=null)
203             return _endp.flush(_buffer);
204 
205         return 0;
206     }
207 
208     private synchronized int expelBuffer(long blockFor) throws IOException
209     {
210         if (_buffer==null)
211             return 0;
212         int result = flushBuffer();
213         _buffer.compact();
214         if (!_endp.isBlocking())
215         {
216             while (_buffer.space()==0)
217             {
218                 boolean ready = _endp.blockWritable(blockFor);
219                 if (!ready)
220                     throw new IOException("Write timeout");
221 
222                 result += flushBuffer();
223                 _buffer.compact();
224             }
225         }
226         return result;
227     }
228 
229     public synchronized boolean isBufferEmpty()
230     {
231         return _buffer==null || _buffer.length()==0;
232     }
233 
234 }