1 /*******************************************************************************
2 * Copyright (c) 2011 Intalio, Inc.
3 * ======================================================================
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * and Apache License v2.0 which accompanies this distribution.
7 *
8 * The Eclipse Public License is available at
9 * http://www.eclipse.org/legal/epl-v10.html
10 *
11 * The Apache License v2.0 is available at
12 * http://www.opensource.org/licenses/apache2.0.php
13 *
14 * You may elect to redistribute this code under either of these licenses.
15 *******************************************************************************/
16 // ========================================================================
17 // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
18 // ------------------------------------------------------------------------
19 // All rights reserved. This program and the accompanying materials
20 // are made available under the terms of the Eclipse Public License v1.0
21 // and Apache License v2.0 which accompanies this distribution.
22 // The Eclipse Public License is available at
23 // http://www.eclipse.org/legal/epl-v10.html
24 // The Apache License v2.0 is available at
25 // http://www.opensource.org/licenses/apache2.0.php
26 // You may elect to redistribute this code under either of these licenses.
27 // ========================================================================
28
29 package org.eclipse.jetty.websocket;
30
31 import java.io.IOException;
32 import java.math.BigInteger;
33
34 import org.eclipse.jetty.io.Buffer;
35 import org.eclipse.jetty.io.EndPoint;
36 import org.eclipse.jetty.io.EofException;
37
38
39 /* ------------------------------------------------------------ */
40 /** WebSocketGenerator.
41 * This class generates websocket packets.
42 * It is fully synchronized because it is likely that async
43 * threads will call the addMessage methods while other
44 * threads are flushing the generator.
45 */
46 public class WebSocketGeneratorD00 implements WebSocketGenerator
47 {
48 final private WebSocketBuffers _buffers;
49 final private EndPoint _endp;
50 private Buffer _buffer;
51
52 public WebSocketGeneratorD00(WebSocketBuffers buffers, EndPoint endp)
53 {
54 _buffers=buffers;
55 _endp=endp;
56 }
57
58 public synchronized void addFrame(byte flags, byte opcode,byte[] content, int offset, int length) throws IOException
59 {
60 long blockFor=_endp.getMaxIdleTime();
61
62 if (_buffer==null)
63 _buffer=_buffers.getDirectBuffer();
64
65 if (_buffer.space() == 0)
66 expelBuffer(blockFor);
67
68 bufferPut(opcode, blockFor);
69
70 if (isLengthFrame(opcode))
71 {
72 // Send a length delimited frame
73
74 // How many bytes we need for the length ?
75 // We have 7 bits available, so log2(length) / 7 + 1
76 // For example, 50000 bytes is 2 8-bytes: 11000011 01010000
77 // but we need to write it in 3 7-bytes 0000011 0000110 1010000
78 // 65536 == 1 00000000 00000000 => 100 0000000 0000000
79 int lengthBytes = new BigInteger(String.valueOf(length)).bitLength() / 7 + 1;
80 for (int i = lengthBytes - 1; i > 0; --i)
81 {
82 byte lengthByte = (byte)(0x80 | (0x7F & (length >> 7 * i)));
83 bufferPut(lengthByte, blockFor);
84 }
85 bufferPut((byte)(0x7F & length), blockFor);
86 }
87
88 int remaining = length;
89 while (remaining > 0)
90 {
91 int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
92 _buffer.put(content, offset + (length - remaining), chunk);
93 remaining -= chunk;
94 if (_buffer.space() > 0)
95 {
96 if (!isLengthFrame(opcode))
97 _buffer.put((byte)0xFF);
98 // Gently flush the data, issuing a non-blocking write
99 flushBuffer();
100 }
101 else
102 {
103 // Forcibly flush the data, issuing a blocking write
104 expelBuffer(blockFor);
105 if (remaining == 0)
106 {
107 if (!isLengthFrame(opcode))
108 _buffer.put((byte)0xFF);
109 // Gently flush the data, issuing a non-blocking write
110 flushBuffer();
111 }
112 }
113 }
114 }
115
116 private synchronized boolean isLengthFrame(byte frame)
117 {
118 return (frame & WebSocketConnectionD00.LENGTH_FRAME) == WebSocketConnectionD00.LENGTH_FRAME;
119 }
120
121 private synchronized void bufferPut(byte datum, long blockFor) throws IOException
122 {
123 if (_buffer==null)
124 _buffer=_buffers.getDirectBuffer();
125 _buffer.put(datum);
126 if (_buffer.space() == 0)
127 expelBuffer(blockFor);
128 }
129
130 public synchronized int flush(int blockFor) throws IOException
131 {
132 return expelBuffer(blockFor);
133 }
134
135 public synchronized int flush() throws IOException
136 {
137 int flushed = flushBuffer();
138 if (_buffer!=null && _buffer.length()==0)
139 {
140 _buffers.returnBuffer(_buffer);
141 _buffer=null;
142 }
143 return flushed;
144 }
145
146 private synchronized int flushBuffer() throws IOException
147 {
148 if (!_endp.isOpen())
149 throw new EofException();
150
151 if (_buffer!=null && _buffer.hasContent())
152 return _endp.flush(_buffer);
153
154 return 0;
155 }
156
157 private synchronized int expelBuffer(long blockFor) throws IOException
158 {
159 if (_buffer==null)
160 return 0;
161 int result = flushBuffer();
162 _buffer.compact();
163 if (!_endp.isBlocking())
164 {
165 while (_buffer.space()==0)
166 {
167 boolean ready = _endp.blockWritable(blockFor);
168 if (!ready)
169 throw new IOException("Write timeout");
170
171 result += flushBuffer();
172 _buffer.compact();
173 }
174 }
175 return result;
176 }
177
178 public synchronized boolean isBufferEmpty()
179 {
180 return _buffer==null || _buffer.length()==0;
181 }
182
183 }