View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.extensions.mux;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.eclipse.jetty.io.ArrayByteBufferPool;
25  import org.eclipse.jetty.io.ByteBufferPool;
26  import org.eclipse.jetty.util.BufferUtil;
27  import org.eclipse.jetty.websocket.api.WriteCallback;
28  import org.eclipse.jetty.websocket.api.extensions.Frame;
29  import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
30  import org.eclipse.jetty.websocket.common.WebSocketFrame;
31  import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxAddChannelRequest;
32  import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxAddChannelResponse;
33  import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxDropChannel;
34  import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxFlowControl;
35  import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxNewChannelSlot;
36  
37  /**
38   * Generate Mux frames destined for the physical connection.
39   */
40  public class MuxGenerator
41  {
42      private static final int CONTROL_BUFFER_SIZE = 2 * 1024;
43      /** 4 bytes for channel ID + 1 for fin/rsv/opcode */
44      private static final int DATA_FRAME_OVERHEAD = 5;
45      private ByteBufferPool bufferPool;
46      private OutgoingFrames outgoing;
47  
48      public MuxGenerator()
49      {
50          this(new ArrayByteBufferPool());
51      }
52  
53      public MuxGenerator(ByteBufferPool bufferPool)
54      {
55          this.bufferPool = bufferPool;
56      }
57  
58      public void generate(long channelId, Frame frame, WriteCallback callback)
59      {
60          ByteBuffer muxPayload = bufferPool.acquire(frame.getPayloadLength() + DATA_FRAME_OVERHEAD,false);
61          BufferUtil.flipToFill(muxPayload);
62  
63          // start building mux payload
64          writeChannelId(muxPayload,channelId);
65          byte b = (byte)(frame.isFin()?0x80:0x00); // fin
66          b |= (byte)(frame.isRsv1()?0x40:0x00); // rsv1
67          b |= (byte)(frame.isRsv2()?0x20:0x00); // rsv2
68          b |= (byte)(frame.isRsv3()?0x10:0x00); // rsv3
69          b |= (byte)(frame.getType().getOpCode() & 0x0F); // opcode
70          muxPayload.put(b);
71          BufferUtil.put(frame.getPayload(),muxPayload);
72  
73          // build muxed frame
74          WebSocketFrame muxFrame = WebSocketFrame.binary();
75          BufferUtil.flipToFlush(muxPayload,0);
76          muxFrame.setPayload(muxPayload);
77          // NOTE: the physical connection will handle masking rules for this frame.
78  
79          // release original buffer (no longer needed)
80          bufferPool.release(frame.getPayload());
81  
82          // send muxed frame down to the physical connection.
83          outgoing.outgoingFrame(muxFrame,callback);
84      }
85  
86      public void generate(WriteCallback callback,MuxControlBlock... blocks) throws IOException
87      {
88          if ((blocks == null) || (blocks.length <= 0))
89          {
90              return; // nothing to do
91          }
92  
93          ByteBuffer payload = bufferPool.acquire(CONTROL_BUFFER_SIZE,false);
94          BufferUtil.flipToFill(payload);
95  
96          writeChannelId(payload,0); // control channel
97  
98          for (MuxControlBlock block : blocks)
99          {
100             switch (block.getOpCode())
101             {
102                 case MuxOp.ADD_CHANNEL_REQUEST:
103                 {
104                     MuxAddChannelRequest op = (MuxAddChannelRequest)block;
105                     byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode
106                     b |= (byte)((op.getRsv() & 0x07) << 2); // rsv
107                     b |= (op.getEncoding() & 0x03); // enc
108                     payload.put(b); // opcode + rsv + enc
109                     writeChannelId(payload,op.getChannelId());
110                     write139Buffer(payload,op.getHandshake());
111                     break;
112                 }
113                 case MuxOp.ADD_CHANNEL_RESPONSE:
114                 {
115                     MuxAddChannelResponse op = (MuxAddChannelResponse)block;
116                     byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode
117                     b |= (op.isFailed()?0x10:0x00); // failure bit
118                     b |= (byte)((op.getRsv() & 0x03) << 2); // rsv
119                     b |= (op.getEncoding() & 0x03); // enc
120                     payload.put(b); // opcode + f + rsv + enc
121                     writeChannelId(payload,op.getChannelId());
122                     if (op.getHandshake() != null)
123                     {
124                         write139Buffer(payload,op.getHandshake());
125                     }
126                     else
127                     {
128                         // no handshake details
129                         write139Size(payload,0);
130                     }
131                     break;
132                 }
133                 case MuxOp.DROP_CHANNEL:
134                 {
135                     MuxDropChannel op = (MuxDropChannel)block;
136                     byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode
137                     b |= (byte)(op.getRsv() & 0x1F); // rsv
138                     payload.put(b); // opcode + rsv
139                     writeChannelId(payload,op.getChannelId());
140                     write139Buffer(payload,op.asReasonBuffer());
141                     break;
142                 }
143                 case MuxOp.FLOW_CONTROL:
144                 {
145                     MuxFlowControl op = (MuxFlowControl)block;
146                     byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode
147                     b |= (byte)(op.getRsv() & 0x1F); // rsv
148                     payload.put(b); // opcode + rsv
149                     writeChannelId(payload,op.getChannelId());
150                     write139Size(payload,op.getSendQuotaSize());
151                     break;
152                 }
153                 case MuxOp.NEW_CHANNEL_SLOT:
154                 {
155                     MuxNewChannelSlot op = (MuxNewChannelSlot)block;
156                     byte b = (byte)((op.getOpCode() & 0x07) << 5); // opcode
157                     b |= (byte)(op.getRsv() & 0x0F) << 1; // rsv
158                     b |= (byte)(op.isFallback()?0x01:0x00); // fallback bit
159                     payload.put(b); // opcode + rsv + fallback bit
160                     write139Size(payload,op.getNumberOfSlots());
161                     write139Size(payload,op.getInitialSendQuota());
162                     break;
163                 }
164             }
165         }
166         BufferUtil.flipToFlush(payload,0);
167         WebSocketFrame frame = WebSocketFrame.binary();
168         frame.setPayload(payload);
169         outgoing.outgoingFrame(frame,callback);
170     }
171 
172     public OutgoingFrames getOutgoing()
173     {
174         return outgoing;
175     }
176 
177     public void setOutgoing(OutgoingFrames outgoing)
178     {
179         this.outgoing = outgoing;
180     }
181 
182     /**
183      * Write a 1/3/9 encoded size, then a byte buffer of that size.
184      * 
185      * @param payload
186      * @param buffer
187      */
188     public void write139Buffer(ByteBuffer payload, ByteBuffer buffer)
189     {
190         write139Size(payload,buffer.remaining());
191         writeBuffer(payload,buffer);
192     }
193 
194     /**
195      * Write a 1/3/9 encoded size.
196      * 
197      * @param payload
198      * @param size
199      */
200     public void write139Size(ByteBuffer payload, long size)
201     {
202         if (size > 0xFF_FF)
203         {
204             // 9 byte encoded
205             payload.put((byte)0x7F);
206             payload.putLong(size);
207             return;
208         }
209 
210         if (size >= 0x7E)
211         {
212             // 3 byte encoded
213             payload.put((byte)0x7E);
214             payload.put((byte)(size >> 8));
215             payload.put((byte)(size & 0xFF));
216             return;
217         }
218 
219         // 1 byte (7 bit) encoded
220         payload.put((byte)(size & 0x7F));
221     }
222 
223     public void writeBuffer(ByteBuffer payload, ByteBuffer buffer)
224     {
225         BufferUtil.put(buffer,payload);
226     }
227 
228     /**
229      * Write multiplexing channel id, using logical channel id encoding (of 1,2,3, or 4 octets)
230      * 
231      * @param payload
232      * @param channelId
233      */
234     public void writeChannelId(ByteBuffer payload, long channelId)
235     {
236         if (channelId > 0x1F_FF_FF_FF)
237         {
238             throw new MuxException("Illegal Channel ID: too big");
239         }
240 
241         if (channelId > 0x1F_FF_FF)
242         {
243             // 29 bit channel id (4 bytes)
244             payload.put((byte)(0xE0 | ((channelId >> 24) & 0x1F)));
245             payload.put((byte)((channelId >> 16) & 0xFF));
246             payload.put((byte)((channelId >> 8) & 0xFF));
247             payload.put((byte)(channelId & 0xFF));
248             return;
249         }
250 
251         if (channelId > 0x3F_FF)
252         {
253             // 21 bit channel id (3 bytes)
254             payload.put((byte)(0xC0 | ((channelId >> 16) & 0x1F)));
255             payload.put((byte)((channelId >> 8) & 0xFF));
256             payload.put((byte)(channelId & 0xFF));
257             return;
258         }
259 
260         if (channelId > 0x7F)
261         {
262             // 14 bit channel id (2 bytes)
263             payload.put((byte)(0x80 | ((channelId >> 8) & 0x3F)));
264             payload.put((byte)(channelId & 0xFF));
265             return;
266         }
267 
268         // 7 bit channel id
269         payload.put((byte)(channelId & 0x7F));
270     }
271 }