1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.mux;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23
24 import org.eclipse.jetty.io.ArrayByteBufferPool;
25 import org.eclipse.jetty.io.ByteBufferPool;
26 import org.eclipse.jetty.util.BufferUtil;
27 import org.eclipse.jetty.websocket.api.WriteCallback;
28 import org.eclipse.jetty.websocket.api.extensions.Frame;
29 import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
30 import org.eclipse.jetty.websocket.common.WebSocketFrame;
31 import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
32 import org.eclipse.jetty.websocket.mux.op.MuxAddChannelRequest;
33 import org.eclipse.jetty.websocket.mux.op.MuxAddChannelResponse;
34 import org.eclipse.jetty.websocket.mux.op.MuxDropChannel;
35 import org.eclipse.jetty.websocket.mux.op.MuxFlowControl;
36 import org.eclipse.jetty.websocket.mux.op.MuxNewChannelSlot;
37
38
39
40
41 public class MuxGenerator
42 {
43 private static final int CONTROL_BUFFER_SIZE = 2 * 1024;
44
45 private static final int DATA_FRAME_OVERHEAD = 5;
46 private ByteBufferPool bufferPool;
47 private OutgoingFrames outgoing;
48
49 public MuxGenerator()
50 {
51 this(new ArrayByteBufferPool());
52 }
53
54 public MuxGenerator(ByteBufferPool bufferPool)
55 {
56 this.bufferPool = bufferPool;
57 }
58
59 public void generate(long channelId, Frame frame, WriteCallback callback)
60 {
61 ByteBuffer muxPayload = bufferPool.acquire(frame.getPayloadLength() + DATA_FRAME_OVERHEAD,false);
62 BufferUtil.flipToFill(muxPayload);
63
64
65 writeChannelId(muxPayload,channelId);
66 byte b = (byte)(frame.isFin()?0x80:0x00);
67 b |= (byte)(frame.isRsv1()?0x40:0x00);
68 b |= (byte)(frame.isRsv2()?0x20:0x00);
69 b |= (byte)(frame.isRsv3()?0x10:0x00);
70 b |= (byte)(frame.getOpCode() & 0x0F);
71 muxPayload.put(b);
72 BufferUtil.put(frame.getPayload(),muxPayload);
73
74
75 WebSocketFrame muxFrame = new BinaryFrame();
76 BufferUtil.flipToFlush(muxPayload,0);
77 muxFrame.setPayload(muxPayload);
78
79
80
81 bufferPool.release(frame.getPayload());
82
83
84 outgoing.outgoingFrame(muxFrame,callback);
85 }
86
87 public void generate(WriteCallback callback,MuxControlBlock... blocks) throws IOException
88 {
89 if ((blocks == null) || (blocks.length <= 0))
90 {
91 return;
92 }
93
94 ByteBuffer payload = bufferPool.acquire(CONTROL_BUFFER_SIZE,false);
95 BufferUtil.flipToFill(payload);
96
97 writeChannelId(payload,0);
98
99 for (MuxControlBlock block : blocks)
100 {
101 switch (block.getOpCode())
102 {
103 case MuxOp.ADD_CHANNEL_REQUEST:
104 {
105 MuxAddChannelRequest op = (MuxAddChannelRequest)block;
106 byte b = (byte)((op.getOpCode() & 0x07) << 5);
107 b |= (byte)((op.getRsv() & 0x07) << 2);
108 b |= (op.getEncoding() & 0x03);
109 payload.put(b);
110 writeChannelId(payload,op.getChannelId());
111 write139Buffer(payload,op.getHandshake());
112 break;
113 }
114 case MuxOp.ADD_CHANNEL_RESPONSE:
115 {
116 MuxAddChannelResponse op = (MuxAddChannelResponse)block;
117 byte b = (byte)((op.getOpCode() & 0x07) << 5);
118 b |= (op.isFailed()?0x10:0x00);
119 b |= (byte)((op.getRsv() & 0x03) << 2);
120 b |= (op.getEncoding() & 0x03);
121 payload.put(b);
122 writeChannelId(payload,op.getChannelId());
123 if (op.getHandshake() != null)
124 {
125 write139Buffer(payload,op.getHandshake());
126 }
127 else
128 {
129
130 write139Size(payload,0);
131 }
132 break;
133 }
134 case MuxOp.DROP_CHANNEL:
135 {
136 MuxDropChannel op = (MuxDropChannel)block;
137 byte b = (byte)((op.getOpCode() & 0x07) << 5);
138 b |= (byte)(op.getRsv() & 0x1F);
139 payload.put(b);
140 writeChannelId(payload,op.getChannelId());
141 write139Buffer(payload,op.asReasonBuffer());
142 break;
143 }
144 case MuxOp.FLOW_CONTROL:
145 {
146 MuxFlowControl op = (MuxFlowControl)block;
147 byte b = (byte)((op.getOpCode() & 0x07) << 5);
148 b |= (byte)(op.getRsv() & 0x1F);
149 payload.put(b);
150 writeChannelId(payload,op.getChannelId());
151 write139Size(payload,op.getSendQuotaSize());
152 break;
153 }
154 case MuxOp.NEW_CHANNEL_SLOT:
155 {
156 MuxNewChannelSlot op = (MuxNewChannelSlot)block;
157 byte b = (byte)((op.getOpCode() & 0x07) << 5);
158 b |= (byte)(op.getRsv() & 0x0F) << 1;
159 b |= (byte)(op.isFallback()?0x01:0x00);
160 payload.put(b);
161 write139Size(payload,op.getNumberOfSlots());
162 write139Size(payload,op.getInitialSendQuota());
163 break;
164 }
165 }
166 }
167 BufferUtil.flipToFlush(payload,0);
168 WebSocketFrame frame = new BinaryFrame();
169 frame.setPayload(payload);
170 outgoing.outgoingFrame(frame,callback);
171 }
172
173 public OutgoingFrames getOutgoing()
174 {
175 return outgoing;
176 }
177
178 public void setOutgoing(OutgoingFrames outgoing)
179 {
180 this.outgoing = outgoing;
181 }
182
183
184
185
186
187
188
189 public void write139Buffer(ByteBuffer payload, ByteBuffer buffer)
190 {
191 write139Size(payload,buffer.remaining());
192 writeBuffer(payload,buffer);
193 }
194
195
196
197
198
199
200
201 public void write139Size(ByteBuffer payload, long size)
202 {
203 if (size > 0xFF_FF)
204 {
205
206 payload.put((byte)0x7F);
207 payload.putLong(size);
208 return;
209 }
210
211 if (size >= 0x7E)
212 {
213
214 payload.put((byte)0x7E);
215 payload.put((byte)(size >> 8));
216 payload.put((byte)(size & 0xFF));
217 return;
218 }
219
220
221 payload.put((byte)(size & 0x7F));
222 }
223
224 public void writeBuffer(ByteBuffer payload, ByteBuffer buffer)
225 {
226 BufferUtil.put(buffer,payload);
227 }
228
229
230
231
232
233
234
235 public void writeChannelId(ByteBuffer payload, long channelId)
236 {
237 if (channelId > 0x1F_FF_FF_FF)
238 {
239 throw new MuxException("Illegal Channel ID: too big");
240 }
241
242 if (channelId > 0x1F_FF_FF)
243 {
244
245 payload.put((byte)(0xE0 | ((channelId >> 24) & 0x1F)));
246 payload.put((byte)((channelId >> 16) & 0xFF));
247 payload.put((byte)((channelId >> 8) & 0xFF));
248 payload.put((byte)(channelId & 0xFF));
249 return;
250 }
251
252 if (channelId > 0x3F_FF)
253 {
254
255 payload.put((byte)(0xC0 | ((channelId >> 16) & 0x1F)));
256 payload.put((byte)((channelId >> 8) & 0xFF));
257 payload.put((byte)(channelId & 0xFF));
258 return;
259 }
260
261 if (channelId > 0x7F)
262 {
263
264 payload.put((byte)(0x80 | ((channelId >> 8) & 0x3F)));
265 payload.put((byte)(channelId & 0xFF));
266 return;
267 }
268
269
270 payload.put((byte)(channelId & 0x7F));
271 }
272 }