View Javadoc

1   // ========================================================================
2   // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.websocket;
15  
16  import java.io.IOException;
17  import java.math.BigInteger;
18  
19  import org.eclipse.jetty.io.Buffer;
20  import org.eclipse.jetty.io.EndPoint;
21  import org.eclipse.jetty.io.EofException;
22  
23  
24  /* ------------------------------------------------------------ */
25  /** WebSocketGenerator.
26   * This class generates websocket packets.
27   * It is fully synchronized because it is likely that async
28   * threads will call the addMessage methods while other
29   * threads are flushing the generator.
30   */
31  public class WebSocketGeneratorD00 implements WebSocketGenerator
32  {
33      final private WebSocketBuffers _buffers;
34      final private EndPoint _endp;
35      private Buffer _buffer;
36  
37      public WebSocketGeneratorD00(WebSocketBuffers buffers, EndPoint endp)
38      {
39          _buffers=buffers;
40          _endp=endp;
41      }
42      
43      public synchronized void addFrame(byte opcode,byte[] content, int offset, int length, int blockFor) throws IOException
44      {
45          if (_buffer==null)
46              _buffer=_buffers.getDirectBuffer();
47  
48          if (_buffer.space() == 0)
49              expelBuffer(blockFor);
50  
51          bufferPut(opcode, blockFor);
52  
53          if (isLengthFrame(opcode))
54          {
55              // Send a length delimited frame
56  
57              // How many bytes we need for the length ?
58              // We have 7 bits available, so log2(length) / 7 + 1
59              // For example, 50000 bytes is 2 8-bytes: 11000011 01010000
60              // but we need to write it in 3 7-bytes 0000011 0000110 1010000
61              // 65536 == 1 00000000 00000000 => 100 0000000 0000000
62              int lengthBytes = new BigInteger(String.valueOf(length)).bitLength() / 7 + 1;
63              for (int i = lengthBytes - 1; i > 0; --i)
64              {
65                  byte lengthByte = (byte)(0x80 | (0x7F & (length >> 7 * i)));
66                  bufferPut(lengthByte, blockFor);
67              }
68              bufferPut((byte)(0x7F & length), blockFor);
69          }
70  
71          int remaining = length;
72          while (remaining > 0)
73          {
74              int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
75              _buffer.put(content, offset + (length - remaining), chunk);
76              remaining -= chunk;
77              if (_buffer.space() > 0)
78              {
79                  if (!isLengthFrame(opcode))
80                      _buffer.put((byte)0xFF);
81                  // Gently flush the data, issuing a non-blocking write
82                  flushBuffer();
83              }
84              else
85              {
86                  // Forcibly flush the data, issuing a blocking write
87                  expelBuffer(blockFor);
88                  if (remaining == 0)
89                  {
90                      if (!isLengthFrame(opcode))
91                          _buffer.put((byte)0xFF);
92                      // Gently flush the data, issuing a non-blocking write
93                      flushBuffer();
94                  }
95              }
96          }
97      }
98  
99      private synchronized boolean isLengthFrame(byte frame)
100     {
101         return (frame & WebSocket.LENGTH_FRAME) == WebSocket.LENGTH_FRAME;
102     }
103 
104     private synchronized void bufferPut(byte datum, long blockFor) throws IOException
105     {
106         if (_buffer==null)
107             _buffer=_buffers.getDirectBuffer();
108         _buffer.put(datum);
109         if (_buffer.space() == 0)
110             expelBuffer(blockFor);
111     }
112 
113     public synchronized void addFrame(byte frame, String content, int blockFor) throws IOException
114     {
115         byte[] bytes = content.getBytes("UTF-8");
116         addFrame(frame, bytes, 0, bytes.length, blockFor);
117     }
118 
119     public synchronized int flush(int blockFor) throws IOException
120     {
121         return expelBuffer(blockFor);
122     }
123 
124     public synchronized int flush() throws IOException
125     {
126         int flushed = flushBuffer();
127         if (_buffer!=null && _buffer.length()==0)
128         {
129             _buffers.returnBuffer(_buffer);
130             _buffer=null;
131         }
132         return flushed;
133     }
134 
135     private synchronized int flushBuffer() throws IOException
136     {
137         if (!_endp.isOpen())
138             throw new EofException();
139 
140         if (_buffer!=null)
141             return _endp.flush(_buffer);
142 
143         return 0;
144     }
145 
146     private synchronized int expelBuffer(long blockFor) throws IOException
147     {
148         if (_buffer==null)
149             return 0;
150         int result = flushBuffer();
151         _buffer.compact();
152         if (!_endp.isBlocking())
153         {
154             while (_buffer.space()==0)
155             {
156                 // TODO: in case the I/O system signals write ready, but when we attempt to write we cannot
157                 // TODO: we should decrease the blockFor timeout instead of waiting again the whole timeout
158                 boolean ready = _endp.blockWritable(blockFor);
159                 if (!ready)
160                     throw new IOException("Write timeout");
161 
162                 result += flushBuffer();
163                 _buffer.compact();
164             }
165         }
166         return result;
167     }
168 
169     public synchronized boolean isBufferEmpty()
170     {
171         return _buffer==null || _buffer.length()==0;
172     }
173 
174     public void addFragment(boolean more, byte opcode, byte[] content, int offset, int length, int maxIdleTime) throws IOException
175     {
176         if (more)
177             throw new UnsupportedOperationException("fragmented");
178         addFrame(opcode,content,offset,length,maxIdleTime);
179     }
180     
181 }