View Javadoc

1   // ========================================================================
2   // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.websocket;
15  
16  import java.io.IOException;
17  
18  import org.eclipse.jetty.io.Buffer;
19  import org.eclipse.jetty.io.EndPoint;
20  import org.eclipse.jetty.io.EofException;
21  
22  
23  /* ------------------------------------------------------------ */
24  /** WebSocketGenerator.
25   * This class generates websocket packets.
26   * It is fully synchronized because it is likely that async
27   * threads will call the addMessage methods while other
28   * threads are flushing the generator.
29   */
30  public class WebSocketGeneratorD01 implements WebSocketGenerator
31  {
32      final private WebSocketBuffers _buffers;
33      final private EndPoint _endp;
34      private Buffer _buffer;
35  
36      public WebSocketGeneratorD01(WebSocketBuffers buffers, EndPoint endp)
37      {
38          _buffers=buffers;
39          _endp=endp;
40      }
41  
42      public synchronized void addFrame(byte opcode,byte[] content, int blockFor) throws IOException
43      {
44          addFrame(opcode,content,0,content.length,blockFor);
45      }
46      
47  
48      public synchronized void addFrame(byte opcode,byte[] content, int offset, int length, int blockFor) throws IOException
49      {
50          addFragment(false,opcode,content,offset,length,blockFor);
51      }
52  
53      public synchronized void addFragment(boolean more, byte opcode, byte[] content, int offset, int length, int blockFor) throws IOException
54      {
55          if (_buffer==null)
56              _buffer=_buffers.getDirectBuffer();
57  
58          if (_buffer.space() == 0)
59              expelBuffer(blockFor);
60          
61          opcode = (byte)(opcode & 0x0f);
62          
63          while (length>0)
64          {
65              // slice a fragment off
66              int fragment=length;
67              if (fragment+10>_buffer.capacity())
68              {
69                  fragment=_buffer.capacity()-10;
70                  bufferPut((byte)(0x80|opcode), blockFor);
71              }
72              else if (more)
73                  bufferPut((byte)(0x80|opcode), blockFor);
74              else
75                  bufferPut(opcode, blockFor);
76  
77              if (fragment>0xffff)
78              {
79                  bufferPut((byte)0x7f, blockFor);
80                  bufferPut((byte)((fragment>>56)&0x7f), blockFor);
81                  bufferPut((byte)((fragment>>48)&0xff), blockFor);
82                  bufferPut((byte)((fragment>>40)&0xff), blockFor);
83                  bufferPut((byte)((fragment>>32)&0xff), blockFor);
84                  bufferPut((byte)((fragment>>24)&0xff), blockFor);
85                  bufferPut((byte)((fragment>>16)&0xff), blockFor);
86                  bufferPut((byte)((fragment>>8)&0xff), blockFor);
87                  bufferPut((byte)(fragment&0xff), blockFor);
88              }
89              else if (fragment >=0x7e)
90              {
91                  bufferPut((byte)126, blockFor);
92                  bufferPut((byte)(fragment>>8), blockFor);
93                  bufferPut((byte)(fragment&0xff), blockFor);
94              }
95              else
96              {
97                  bufferPut((byte)fragment, blockFor);
98              }
99  
100             int remaining = fragment;
101             while (remaining > 0)
102             {
103                 _buffer.compact();
104                 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
105                 _buffer.put(content, offset + (fragment - remaining), chunk);
106                 remaining -= chunk;
107                 if (_buffer.space() > 0)
108                 {
109                     // Gently flush the data, issuing a non-blocking write
110                     flushBuffer();
111                 }
112                 else
113                 {
114                     // Forcibly flush the data, issuing a blocking write
115                     expelBuffer(blockFor);
116                     if (remaining == 0)
117                     {
118                         // Gently flush the data, issuing a non-blocking write
119                         flushBuffer();
120                     }
121                 }
122             }
123             offset+=fragment;
124             length-=fragment;
125         }
126     }
127 
128     private synchronized void bufferPut(byte datum, long blockFor) throws IOException
129     {
130         if (_buffer==null)
131             _buffer=_buffers.getDirectBuffer();
132         _buffer.put(datum);
133         if (_buffer.space() == 0)
134             expelBuffer(blockFor);
135     }
136 
137     public synchronized void addFrame(byte frame, String content, int blockFor) throws IOException
138     {
139         byte[] bytes = content.getBytes("UTF-8");
140         addFrame(frame, bytes, 0, bytes.length, blockFor);
141     }
142 
143     public synchronized int flush(int blockFor) throws IOException
144     {
145         return expelBuffer(blockFor);
146     }
147 
148     public synchronized int flush() throws IOException
149     {
150         int flushed = flushBuffer();
151         if (_buffer!=null && _buffer.length()==0)
152         {
153             _buffers.returnBuffer(_buffer);
154             _buffer=null;
155         }
156         return flushed;
157     }
158 
159     private synchronized int flushBuffer() throws IOException
160     {
161         if (!_endp.isOpen())
162             throw new EofException();
163 
164         if (_buffer!=null)
165             return _endp.flush(_buffer);
166 
167         return 0;
168     }
169 
170     private synchronized int expelBuffer(long blockFor) throws IOException
171     {
172         if (_buffer==null)
173             return 0;
174         int result = flushBuffer();
175         _buffer.compact();
176         if (!_endp.isBlocking())
177         {
178             while (_buffer.space()==0)
179             {
180                 // TODO: in case the I/O system signals write ready, but when we attempt to write we cannot
181                 // TODO: we should decrease the blockFor timeout instead of waiting again the whole timeout
182                 boolean ready = _endp.blockWritable(blockFor);
183                 if (!ready)
184                     throw new IOException("Write timeout");
185 
186                 result += flushBuffer();
187                 _buffer.compact();
188             }
189         }
190         return result;
191     }
192 
193     public synchronized boolean isBufferEmpty()
194     {
195         return _buffer==null || _buffer.length()==0;
196     }
197 
198 }