View Javadoc

1   package org.eclipse.jetty.websocket;
2   
3   import java.io.IOException;
4   import java.math.BigInteger;
5   
6   import org.eclipse.jetty.io.Buffer;
7   import org.eclipse.jetty.io.EndPoint;
8   
9   
10  /* ------------------------------------------------------------ */
11  /** WebSocketGenerator.
12   * This class generates websocket packets.
13   * It is fully synchronized because it is likely that async
14   * threads will call the addMessage methods while other
15   * threads are flushing the generator.
16   */
17  public class WebSocketGenerator
18  {
19      final private WebSocketBuffers _buffers;
20      final private EndPoint _endp;
21      private Buffer _buffer;
22  
23      public WebSocketGenerator(WebSocketBuffers buffers, EndPoint endp)
24      {
25          _buffers=buffers;
26          _endp=endp;
27      }
28  
29      public synchronized void addFrame(byte frame,byte[] content, int blockFor) throws IOException
30      {
31          addFrame(frame,content,0,content.length,blockFor);
32      }
33  
34      public synchronized void addFrame(byte frame,byte[] content, int offset, int length, int blockFor) throws IOException
35      {
36          if (_buffer==null)
37              _buffer=_buffers.getDirectBuffer();
38  
39          if (_buffer.space() == 0)
40              expelBuffer(blockFor);
41  
42          bufferPut(frame, blockFor);
43  
44          if (isLengthFrame(frame))
45          {
46              // Send a length delimited frame
47  
48              // How many bytes we need for the length ?
49              // We have 7 bits available, so log2(length) / 7 + 1
50              // For example, 50000 bytes is 2 8-bytes: 11000011 01010000
51              // but we need to write it in 3 7-bytes 0000011 0000110 1010000
52              // 65536 == 1 00000000 00000000 => 100 0000000 0000000
53              int lengthBytes = new BigInteger(String.valueOf(length)).bitLength() / 7 + 1;
54              for (int i = lengthBytes - 1; i > 0; --i)
55              {
56                  byte lengthByte = (byte)(0x80 | (0x7F & (length >> 7 * i)));
57                  bufferPut(lengthByte, blockFor);
58              }
59              bufferPut((byte)(0x7F & length), blockFor);
60          }
61  
62          int remaining = length;
63          while (remaining > 0)
64          {
65              int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
66              _buffer.put(content, offset + (length - remaining), chunk);
67              remaining -= chunk;
68              if (_buffer.space() > 0)
69              {
70                  if (!isLengthFrame(frame))
71                      _buffer.put((byte)0xFF);
72                  // Gently flush the data, issuing a non-blocking write
73                  flushBuffer();
74              }
75              else
76              {
77                  // Forcibly flush the data, issuing a blocking write
78                  expelBuffer(blockFor);
79                  if (remaining == 0)
80                  {
81                      if (!isLengthFrame(frame))
82                          _buffer.put((byte)0xFF);
83                      // Gently flush the data, issuing a non-blocking write
84                      flushBuffer();
85                  }
86              }
87          }
88      }
89  
90      private synchronized boolean isLengthFrame(byte frame)
91      {
92          return (frame & WebSocket.LENGTH_FRAME) == WebSocket.LENGTH_FRAME;
93      }
94  
95      private synchronized void bufferPut(byte datum, long blockFor) throws IOException
96      {
97          _buffer.put(datum);
98          if (_buffer.space() == 0)
99              expelBuffer(blockFor);
100     }
101 
102     public synchronized void addFrame(byte frame, String content, int blockFor) throws IOException
103     {
104         byte[] bytes = content.getBytes("UTF-8");
105         addFrame(frame, bytes, 0, bytes.length, blockFor);
106     }
107 
108     public synchronized int flush(long blockFor) throws IOException
109     {
110         return expelBuffer(blockFor);
111     }
112 
113     public synchronized int flush() throws IOException
114     {
115         int flushed = flushBuffer();
116         if (_buffer!=null && _buffer.length()==0)
117         {
118             _buffers.returnBuffer(_buffer);
119             _buffer=null;
120         }
121         return flushed;
122     }
123 
124     private synchronized int flushBuffer() throws IOException
125     {
126         if (!_endp.isOpen())
127             throw new IOException("Closed");
128 
129         if (_buffer!=null)
130             return _endp.flush(_buffer);
131 
132         return 0;
133     }
134 
135     private synchronized int expelBuffer(long blockFor) throws IOException
136     {
137         int result = flushBuffer();
138         _buffer.compact();
139         if (!_endp.isBlocking())
140         {
141             while (_buffer.space()==0)
142             {
143                 // TODO: in case the I/O system signals write ready, but when we attempt to write we cannot
144                 // TODO: we should decrease the blockFor timeout instead of waiting again the whole timeout
145                 boolean ready = _endp.blockWritable(blockFor);
146                 if (!ready)
147                     throw new IOException("Write timeout");
148 
149                 result += flushBuffer();
150                 _buffer.compact();
151             }
152         }
153         return result;
154     }
155 
156     public synchronized boolean isBufferEmpty()
157     {
158         return _buffer==null || _buffer.length()==0;
159     }
160 }