View Javadoc

1   /*******************************************************************************
2    * Copyright (c) 2011 Intalio, Inc.
3    * ======================================================================
4    * All rights reserved. This program and the accompanying materials
5    * are made available under the terms of the Eclipse Public License v1.0
6    * and Apache License v2.0 which accompanies this distribution.
7    *
8    *   The Eclipse Public License is available at
9    *   http://www.eclipse.org/legal/epl-v10.html
10   *
11   *   The Apache License v2.0 is available at
12   *   http://www.opensource.org/licenses/apache2.0.php
13   *
14   * You may elect to redistribute this code under either of these licenses.
15   *******************************************************************************/
16  // ========================================================================
17  // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
18  // ------------------------------------------------------------------------
19  // All rights reserved. This program and the accompanying materials
20  // are made available under the terms of the Eclipse Public License v1.0
21  // and Apache License v2.0 which accompanies this distribution.
22  // The Eclipse Public License is available at 
23  // http://www.eclipse.org/legal/epl-v10.html
24  // The Apache License v2.0 is available at
25  // http://www.opensource.org/licenses/apache2.0.php
26  // You may elect to redistribute this code under either of these licenses. 
27  // ========================================================================
28  
29  package org.eclipse.jetty.websocket;
30  
31  import java.io.IOException;
32  import java.math.BigInteger;
33  
34  import org.eclipse.jetty.io.Buffer;
35  import org.eclipse.jetty.io.EndPoint;
36  import org.eclipse.jetty.io.EofException;
37  
38  
39  /* ------------------------------------------------------------ */
40  /** WebSocketGenerator.
41   * This class generates websocket packets.
42   * It is fully synchronized because it is likely that async
43   * threads will call the addMessage methods while other
44   * threads are flushing the generator.
45   */
46  public class WebSocketGeneratorD00 implements WebSocketGenerator
47  {
48      final private WebSocketBuffers _buffers;
49      final private EndPoint _endp;
50      private Buffer _buffer;
51  
52      public WebSocketGeneratorD00(WebSocketBuffers buffers, EndPoint endp)
53      {
54          _buffers=buffers;
55          _endp=endp;
56      }
57      
58      public synchronized void addFrame(byte flags, byte opcode,byte[] content, int offset, int length) throws IOException
59      {
60          long blockFor=_endp.getMaxIdleTime();
61          
62          if (_buffer==null)
63              _buffer=_buffers.getDirectBuffer();
64  
65          if (_buffer.space() == 0)
66              expelBuffer(blockFor);
67  
68          bufferPut(opcode, blockFor);
69  
70          if (isLengthFrame(opcode))
71          {
72              // Send a length delimited frame
73  
74              // How many bytes we need for the length ?
75              // We have 7 bits available, so log2(length) / 7 + 1
76              // For example, 50000 bytes is 2 8-bytes: 11000011 01010000
77              // but we need to write it in 3 7-bytes 0000011 0000110 1010000
78              // 65536 == 1 00000000 00000000 => 100 0000000 0000000
79              int lengthBytes = new BigInteger(String.valueOf(length)).bitLength() / 7 + 1;
80              for (int i = lengthBytes - 1; i > 0; --i)
81              {
82                  byte lengthByte = (byte)(0x80 | (0x7F & (length >> 7 * i)));
83                  bufferPut(lengthByte, blockFor);
84              }
85              bufferPut((byte)(0x7F & length), blockFor);
86          }
87  
88          int remaining = length;
89          while (remaining > 0)
90          {
91              int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
92              _buffer.put(content, offset + (length - remaining), chunk);
93              remaining -= chunk;
94              if (_buffer.space() > 0)
95              {
96                  if (!isLengthFrame(opcode))
97                      _buffer.put((byte)0xFF);
98                  // Gently flush the data, issuing a non-blocking write
99                  flushBuffer();
100             }
101             else
102             {
103                 // Forcibly flush the data, issuing a blocking write
104                 expelBuffer(blockFor);
105                 if (remaining == 0)
106                 {
107                     if (!isLengthFrame(opcode))
108                         _buffer.put((byte)0xFF);
109                     // Gently flush the data, issuing a non-blocking write
110                     flushBuffer();
111                 }
112             }
113         }
114     }
115 
116     private synchronized boolean isLengthFrame(byte frame)
117     {
118         return (frame & WebSocketConnectionD00.LENGTH_FRAME) == WebSocketConnectionD00.LENGTH_FRAME;
119     }
120 
121     private synchronized void bufferPut(byte datum, long blockFor) throws IOException
122     {
123         if (_buffer==null)
124             _buffer=_buffers.getDirectBuffer();
125         _buffer.put(datum);
126         if (_buffer.space() == 0)
127             expelBuffer(blockFor);
128     }
129 
130     public synchronized int flush(int blockFor) throws IOException
131     {
132         return expelBuffer(blockFor);
133     }
134 
135     public synchronized int flush() throws IOException
136     {
137         int flushed = flushBuffer();
138         if (_buffer!=null && _buffer.length()==0)
139         {
140             _buffers.returnBuffer(_buffer);
141             _buffer=null;
142         }
143         return flushed;
144     }
145 
146     private synchronized int flushBuffer() throws IOException
147     {
148         if (!_endp.isOpen())
149             throw new EofException();
150 
151         if (_buffer!=null && _buffer.hasContent())
152             return _endp.flush(_buffer);
153 
154         return 0;
155     }
156 
157     private synchronized int expelBuffer(long blockFor) throws IOException
158     {
159         if (_buffer==null)
160             return 0;
161         int result = flushBuffer();
162         _buffer.compact();
163         if (!_endp.isBlocking())
164         {
165             while (_buffer.space()==0)
166             {
167                 boolean ready = _endp.blockWritable(blockFor);
168                 if (!ready)
169                     throw new IOException("Write timeout");
170 
171                 result += flushBuffer();
172                 _buffer.compact();
173             }
174         }
175         return result;
176     }
177 
178     public synchronized boolean isBufferEmpty()
179     {
180         return _buffer==null || _buffer.length()==0;
181     }
182     
183 }