View Javadoc

1   // ========================================================================
2   // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.websocket;
15  
16  import java.io.IOException;
17  import java.math.BigInteger;
18  
19  import org.eclipse.jetty.io.Buffer;
20  import org.eclipse.jetty.io.EndPoint;
21  import org.eclipse.jetty.io.EofException;
22  
23  
24  /* ------------------------------------------------------------ */
25  /** WebSocketGenerator.
26   * This class generates websocket packets.
27   * It is fully synchronized because it is likely that async
28   * threads will call the addMessage methods while other
29   * threads are flushing the generator.
30   */
31  public class WebSocketGeneratorD00 implements WebSocketGenerator
32  {
33      final private WebSocketBuffers _buffers;
34      final private EndPoint _endp;
35      private Buffer _buffer;
36  
37      public WebSocketGeneratorD00(WebSocketBuffers buffers, EndPoint endp)
38      {
39          _buffers=buffers;
40          _endp=endp;
41      }
42      
43      public synchronized void addFrame(byte flags, byte opcode,byte[] content, int offset, int length) throws IOException
44      {
45          long blockFor=_endp.getMaxIdleTime();
46          
47          if (_buffer==null)
48              _buffer=_buffers.getDirectBuffer();
49  
50          if (_buffer.space() == 0)
51              expelBuffer(blockFor);
52  
53          bufferPut(opcode, blockFor);
54  
55          if (isLengthFrame(opcode))
56          {
57              // Send a length delimited frame
58  
59              // How many bytes we need for the length ?
60              // We have 7 bits available, so log2(length) / 7 + 1
61              // For example, 50000 bytes is 2 8-bytes: 11000011 01010000
62              // but we need to write it in 3 7-bytes 0000011 0000110 1010000
63              // 65536 == 1 00000000 00000000 => 100 0000000 0000000
64              int lengthBytes = new BigInteger(String.valueOf(length)).bitLength() / 7 + 1;
65              for (int i = lengthBytes - 1; i > 0; --i)
66              {
67                  byte lengthByte = (byte)(0x80 | (0x7F & (length >> 7 * i)));
68                  bufferPut(lengthByte, blockFor);
69              }
70              bufferPut((byte)(0x7F & length), blockFor);
71          }
72  
73          int remaining = length;
74          while (remaining > 0)
75          {
76              int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
77              _buffer.put(content, offset + (length - remaining), chunk);
78              remaining -= chunk;
79              if (_buffer.space() > 0)
80              {
81                  if (!isLengthFrame(opcode))
82                      _buffer.put((byte)0xFF);
83                  // Gently flush the data, issuing a non-blocking write
84                  flushBuffer();
85              }
86              else
87              {
88                  // Forcibly flush the data, issuing a blocking write
89                  expelBuffer(blockFor);
90                  if (remaining == 0)
91                  {
92                      if (!isLengthFrame(opcode))
93                          _buffer.put((byte)0xFF);
94                      // Gently flush the data, issuing a non-blocking write
95                      flushBuffer();
96                  }
97              }
98          }
99      }
100 
101     private synchronized boolean isLengthFrame(byte frame)
102     {
103         return (frame & WebSocketConnectionD00.LENGTH_FRAME) == WebSocketConnectionD00.LENGTH_FRAME;
104     }
105 
106     private synchronized void bufferPut(byte datum, long blockFor) throws IOException
107     {
108         if (_buffer==null)
109             _buffer=_buffers.getDirectBuffer();
110         _buffer.put(datum);
111         if (_buffer.space() == 0)
112             expelBuffer(blockFor);
113     }
114 
115     public synchronized int flush(int blockFor) throws IOException
116     {
117         return expelBuffer(blockFor);
118     }
119 
120     public synchronized int flush() throws IOException
121     {
122         int flushed = flushBuffer();
123         if (_buffer!=null && _buffer.length()==0)
124         {
125             _buffers.returnBuffer(_buffer);
126             _buffer=null;
127         }
128         return flushed;
129     }
130 
131     private synchronized int flushBuffer() throws IOException
132     {
133         if (!_endp.isOpen())
134             throw new EofException();
135 
136         if (_buffer!=null && _buffer.hasContent())
137             return _endp.flush(_buffer);
138 
139         return 0;
140     }
141 
142     private synchronized int expelBuffer(long blockFor) throws IOException
143     {
144         if (_buffer==null)
145             return 0;
146         int result = flushBuffer();
147         _buffer.compact();
148         if (!_endp.isBlocking())
149         {
150             while (_buffer.space()==0)
151             {
152                 boolean ready = _endp.blockWritable(blockFor);
153                 if (!ready)
154                     throw new IOException("Write timeout");
155 
156                 result += flushBuffer();
157                 _buffer.compact();
158             }
159         }
160         return result;
161     }
162 
163     public synchronized boolean isBufferEmpty()
164     {
165         return _buffer==null || _buffer.length()==0;
166     }
167     
168 }