View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.mux;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.eclipse.jetty.io.ArrayByteBufferPool;
25  import org.eclipse.jetty.io.ByteBufferPool;
26  import org.eclipse.jetty.util.BufferUtil;
27  import org.eclipse.jetty.websocket.api.WriteCallback;
28  import org.eclipse.jetty.websocket.api.extensions.Frame;
29  import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
30  import org.eclipse.jetty.websocket.common.WebSocketFrame;
31  import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
32  import org.eclipse.jetty.websocket.mux.op.MuxAddChannelRequest;
33  import org.eclipse.jetty.websocket.mux.op.MuxAddChannelResponse;
34  import org.eclipse.jetty.websocket.mux.op.MuxDropChannel;
35  import org.eclipse.jetty.websocket.mux.op.MuxFlowControl;
36  import org.eclipse.jetty.websocket.mux.op.MuxNewChannelSlot;
37  
38  /**
39   * Generate Mux frames destined for the physical connection.
40   */
41  public class MuxGenerator
42  {
43      private static final int CONTROL_BUFFER_SIZE = 2 * 1024;
44      /** 4 bytes for channel ID + 1 for fin/rsv/opcode */
45      private static final int DATA_FRAME_OVERHEAD = 5;
46      private ByteBufferPool bufferPool;
47      private OutgoingFrames outgoing;
48  
49      public MuxGenerator()
50      {
51          this(new ArrayByteBufferPool());
52      }
53  
54      public MuxGenerator(ByteBufferPool bufferPool)
55      {
56          this.bufferPool = bufferPool;
57      }
58  
59      public void generate(long channelId, Frame frame, WriteCallback callback)
60      {
61          ByteBuffer muxPayload = bufferPool.acquire(frame.getPayloadLength() + DATA_FRAME_OVERHEAD,false);
62          BufferUtil.flipToFill(muxPayload);
63  
64          // start building mux payload
65          writeChannelId(muxPayload,channelId);
66          byte b = (byte)(frame.isFin()?0x80:0x00); // fin
67          b |= (byte)(frame.isRsv1()?0x40:0x00); // rsv1
68          b |= (byte)(frame.isRsv2()?0x20:0x00); // rsv2
69          b |= (byte)(frame.isRsv3()?0x10:0x00); // rsv3
70          b |= (byte)(frame.getOpCode() & 0x0F); // opcode
71          muxPayload.put(b);
72          BufferUtil.put(frame.getPayload(),muxPayload);
73  
74          // build muxed frame
75          WebSocketFrame muxFrame = new BinaryFrame();
76          BufferUtil.flipToFlush(muxPayload,0);
77          muxFrame.setPayload(muxPayload);
78          // NOTE: the physical connection will handle masking rules for this frame.
79  
80          // release original buffer (no longer needed)
81          bufferPool.release(frame.getPayload());
82  
83          // send muxed frame down to the physical connection.
84          outgoing.outgoingFrame(muxFrame,callback);
85      }
86  
87      public void generate(WriteCallback callback,MuxControlBlock... blocks) throws IOException
88      {
89          if ((blocks == null) || (blocks.length <= 0))
90          {
91              return; // nothing to do
92          }
93  
94          ByteBuffer payload = bufferPool.acquire(CONTROL_BUFFER_SIZE,false);
95          BufferUtil.flipToFill(payload);
96  
97          writeChannelId(payload,0); // control channel
98  
99          for (MuxControlBlock block : blocks)
100         {
101             switch (block.getOpCode())
102             {
103                 case MuxOp.ADD_CHANNEL_REQUEST:
104                 {
105                     MuxAddChannelRequest op = (MuxAddChannelRequest)block;
106                     byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode
107                     b |= (byte)((op.getRsv() & 0x07) << 2); // rsv
108                     b |= (op.getEncoding() & 0x03); // enc
109                     payload.put(b); // opcode + rsv + enc
110                     writeChannelId(payload,op.getChannelId());
111                     write139Buffer(payload,op.getHandshake());
112                     break;
113                 }
114                 case MuxOp.ADD_CHANNEL_RESPONSE:
115                 {
116                     MuxAddChannelResponse op = (MuxAddChannelResponse)block;
117                     byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode
118                     b |= (op.isFailed()?0x10:0x00); // failure bit
119                     b |= (byte)((op.getRsv() & 0x03) << 2); // rsv
120                     b |= (op.getEncoding() & 0x03); // enc
121                     payload.put(b); // opcode + f + rsv + enc
122                     writeChannelId(payload,op.getChannelId());
123                     if (op.getHandshake() != null)
124                     {
125                         write139Buffer(payload,op.getHandshake());
126                     }
127                     else
128                     {
129                         // no handshake details
130                         write139Size(payload,0);
131                     }
132                     break;
133                 }
134                 case MuxOp.DROP_CHANNEL:
135                 {
136                     MuxDropChannel op = (MuxDropChannel)block;
137                     byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode
138                     b |= (byte)(op.getRsv() & 0x1F); // rsv
139                     payload.put(b); // opcode + rsv
140                     writeChannelId(payload,op.getChannelId());
141                     write139Buffer(payload,op.asReasonBuffer());
142                     break;
143                 }
144                 case MuxOp.FLOW_CONTROL:
145                 {
146                     MuxFlowControl op = (MuxFlowControl)block;
147                     byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode
148                     b |= (byte)(op.getRsv() & 0x1F); // rsv
149                     payload.put(b); // opcode + rsv
150                     writeChannelId(payload,op.getChannelId());
151                     write139Size(payload,op.getSendQuotaSize());
152                     break;
153                 }
154                 case MuxOp.NEW_CHANNEL_SLOT:
155                 {
156                     MuxNewChannelSlot op = (MuxNewChannelSlot)block;
157                     byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode
158                     b |= (byte)(op.getRsv() & 0x0F) << 1; // rsv
159                     b |= (byte)(op.isFallback()?0x01:0x00); // fallback bit
160                     payload.put(b); // opcode + rsv + fallback bit
161                     write139Size(payload,op.getNumberOfSlots());
162                     write139Size(payload,op.getInitialSendQuota());
163                     break;
164                 }
165             }
166         }
167         BufferUtil.flipToFlush(payload,0);
168         WebSocketFrame frame = new BinaryFrame();
169         frame.setPayload(payload);
170         outgoing.outgoingFrame(frame,callback);
171     }
172 
173     public OutgoingFrames getOutgoing()
174     {
175         return outgoing;
176     }
177 
178     public void setOutgoing(OutgoingFrames outgoing)
179     {
180         this.outgoing = outgoing;
181     }
182 
183     /**
184      * Write a 1/3/9 encoded size, then a byte buffer of that size.
185      * 
186      * @param payload
187      * @param buffer
188      */
189     public void write139Buffer(ByteBuffer payload, ByteBuffer buffer)
190     {
191         write139Size(payload,buffer.remaining());
192         writeBuffer(payload,buffer);
193     }
194 
195     /**
196      * Write a 1/3/9 encoded size.
197      * 
198      * @param payload
199      * @param size
200      */
201     public void write139Size(ByteBuffer payload, long size)
202     {
203         if (size > 0xFF_FF)
204         {
205             // 9 byte encoded
206             payload.put((byte)0x7F);
207             payload.putLong(size);
208             return;
209         }
210 
211         if (size >= 0x7E)
212         {
213             // 3 byte encoded
214             payload.put((byte)0x7E);
215             payload.put((byte)(size >> 8));
216             payload.put((byte)(size & 0xFF));
217             return;
218         }
219 
220         // 1 byte (7 bit) encoded
221         payload.put((byte)(size & 0x7F));
222     }
223 
224     public void writeBuffer(ByteBuffer payload, ByteBuffer buffer)
225     {
226         BufferUtil.put(buffer,payload);
227     }
228 
229     /**
230      * Write multiplexing channel id, using logical channel id encoding (of 1,2,3, or 4 octets)
231      * 
232      * @param payload
233      * @param channelId
234      */
235     public void writeChannelId(ByteBuffer payload, long channelId)
236     {
237         if (channelId > 0x1F_FF_FF_FF)
238         {
239             throw new MuxException("Illegal Channel ID: too big");
240         }
241 
242         if (channelId > 0x1F_FF_FF)
243         {
244             // 29 bit channel id (4 bytes)
245             payload.put((byte)(0xE0 | ((channelId >> 24) & 0x1F)));
246             payload.put((byte)((channelId >> 16) & 0xFF));
247             payload.put((byte)((channelId >> 8) & 0xFF));
248             payload.put((byte)(channelId & 0xFF));
249             return;
250         }
251 
252         if (channelId > 0x3F_FF)
253         {
254             // 21 bit channel id (3 bytes)
255             payload.put((byte)(0xC0 | ((channelId >> 16) & 0x1F)));
256             payload.put((byte)((channelId >> 8) & 0xFF));
257             payload.put((byte)(channelId & 0xFF));
258             return;
259         }
260 
261         if (channelId > 0x7F)
262         {
263             // 14 bit channel id (2 bytes)
264             payload.put((byte)(0x80 | ((channelId >> 8) & 0x3F)));
265             payload.put((byte)(channelId & 0xFF));
266             return;
267         }
268 
269         // 7 bit channel id
270         payload.put((byte)(channelId & 0x7F));
271     }
272 }