View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.extensions.mux;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.eclipse.jetty.util.StringUtil;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  import org.eclipse.jetty.websocket.api.StatusCode;
31  import org.eclipse.jetty.websocket.api.UpgradeRequest;
32  import org.eclipse.jetty.websocket.api.UpgradeResponse;
33  import org.eclipse.jetty.websocket.api.WebSocketBehavior;
34  import org.eclipse.jetty.websocket.api.WebSocketException;
35  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
36  import org.eclipse.jetty.websocket.api.WriteCallback;
37  import org.eclipse.jetty.websocket.api.extensions.Frame;
38  import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
39  import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
40  import org.eclipse.jetty.websocket.common.LogicalConnection;
41  import org.eclipse.jetty.websocket.common.WebSocketFrame;
42  import org.eclipse.jetty.websocket.common.extensions.mux.add.MuxAddClient;
43  import org.eclipse.jetty.websocket.common.extensions.mux.add.MuxAddServer;
44  import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxAddChannelRequest;
45  import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxAddChannelResponse;
46  import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxDropChannel;
47  import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxFlowControl;
48  import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxNewChannelSlot;
49  
50  /**
51   * Muxer responsible for managing sub-channels.
52   * <p>
53   * Maintains a 1 (incoming and outgoing mux encapsulated frames) to many (per-channel incoming/outgoing standard websocket frames) relationship, along with
54   * routing of {@link MuxControlBlock} events.
55   * <p>
56   * Control Channel events (channel ID == 0) are handled by the Muxer.
57   */
58  public class Muxer implements IncomingFrames, MuxParser.Listener
59  {
60      private static final int CONTROL_CHANNEL_ID = 0;
61  
62      private static final Logger LOG = Log.getLogger(Muxer.class);
63  
64      /**
65       * Map of sub-channels, key is the channel Id.
66       */
67      private Map<Long, MuxChannel> channels = new HashMap<Long, MuxChannel>();
68  
69      private final WebSocketPolicy policy;
70      private final LogicalConnection physicalConnection;
71      private InetSocketAddress remoteAddress;
72      /** Parsing frames destined for sub-channels */
73      private MuxParser parser;
74      /** Generating frames destined for physical connection */
75      private MuxGenerator generator;
76      private MuxAddServer addServer;
77      private MuxAddClient addClient;
78      /** The original request headers, used for delta encoded AddChannelRequest blocks */
79      private UpgradeRequest physicalRequestHeaders;
80      /** The original response headers, used for delta encoded AddChannelResponse blocks */
81      private UpgradeResponse physicalResponseHeaders;
82  
83      public Muxer(final LogicalConnection connection)
84      {
85          this.physicalConnection = connection;
86          this.policy = connection.getPolicy().clonePolicy();
87          this.parser = new MuxParser();
88          this.parser.setEvents(this);
89          this.generator = new MuxGenerator();
90      }
91  
92      public MuxAddClient getAddClient()
93      {
94          return addClient;
95      }
96  
97      public MuxAddServer getAddServer()
98      {
99          return addServer;
100     }
101 
102     public MuxChannel getChannel(long channelId, boolean create)
103     {
104         if (channelId == CONTROL_CHANNEL_ID)
105         {
106             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Invalid Channel ID");
107         }
108 
109         MuxChannel channel = channels.get(channelId);
110         if (channel == null)
111         {
112             if (create)
113             {
114                 channel = new MuxChannel(channelId,this);
115                 channels.put(channelId,channel);
116             }
117             else
118             {
119                 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Unknown Channel ID");
120             }
121         }
122         return channel;
123     }
124 
125     public WebSocketPolicy getPolicy()
126     {
127         return policy;
128     }
129 
130     /**
131      * Get the remote address of the physical connection.
132      * 
133      * @return the remote address of the physical connection
134      */
135     public InetSocketAddress getRemoteAddress()
136     {
137         return this.remoteAddress;
138     }
139 
140     /**
141      * Incoming parser errors
142      */
143     @Override
144     public void incomingError(WebSocketException e)
145     {
146         MuxDropChannel.Reason reason = MuxDropChannel.Reason.PHYSICAL_CONNECTION_FAILED;
147         String phrase = String.format("%s: %s", e.getClass().getName(), e.getMessage());
148         mustFailPhysicalConnection(new MuxPhysicalConnectionException(reason,phrase));
149     }
150 
151     /**
152      * Incoming mux encapsulated frames.
153      */
154     @Override
155     public void incomingFrame(Frame frame)
156     {
157         parser.parse(frame);
158     }
159 
160     /**
161      * Is the muxer and the physical connection still open?
162      * 
163      * @return true if open
164      */
165     public boolean isOpen()
166     {
167         return physicalConnection.isOpen();
168     }
169 
170     public String mergeHeaders(List<String> physicalHeaders, String deltaHeaders)
171     {
172         // TODO Auto-generated method stub
173         return null;
174     }
175 
176     /**
177      * Per spec, the physical connection must be failed.
178      * <p>
179      * <a href="https://tools.ietf.org/html/draft-ietf-hybi-websocket-multiplexing-08#section-18">Section 18. Fail the Physical Connection.</a>
180      * 
181      * <blockquote> To _Fail the Physical Connection_, an endpoint MUST send a DropChannel multiplex control block with objective channel ID of 0 and drop
182      * reason code in the range of 2000-2999, and then _Fail the WebSocket Connection_ on the physical connection with status code of 1011. </blockquote>
183      */
184     private void mustFailPhysicalConnection(MuxPhysicalConnectionException muxe)
185     {
186         // TODO: stop muxer from receiving incoming sub-channel traffic.
187 
188         MuxDropChannel drop = muxe.getMuxDropChannel();
189         LOG.warn(muxe);
190         try
191         {
192             generator.generate(null,drop);
193         }
194         catch (IOException ioe)
195         {
196             LOG.warn("Unable to send mux DropChannel",ioe);
197         }
198 
199         String reason = "Mux[MUST FAIL]" + drop.getPhrase();
200         reason = StringUtil.truncate(reason,WebSocketFrame.MAX_CONTROL_PAYLOAD);
201         this.physicalConnection.close(StatusCode.SERVER_ERROR,reason);
202 
203         // TODO: trigger abnormal close for all sub-channels.
204     }
205 
206     /**
207      * Incoming mux control block, destined for the control channel (id 0)
208      */
209     @Override
210     public void onMuxAddChannelRequest(MuxAddChannelRequest request)
211     {
212         if (policy.getBehavior() == WebSocketBehavior.CLIENT)
213         {
214             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"AddChannelRequest not allowed per spec");
215         }
216 
217         if (request.getRsv() != 0)
218         {
219             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_REQUEST_ENCODING,"RSV Not allowed to be set");
220         }
221 
222         // Pre-allocate channel.
223         long channelId = request.getChannelId();
224         MuxChannel channel = getChannel(channelId, true);
225 
226         // submit to upgrade handshake process
227         try
228         {
229             switch (request.getEncoding())
230             {
231                 case MuxAddChannelRequest.IDENTITY_ENCODING:
232                 {
233                     UpgradeRequest idenReq = MuxRequest.parse(request.getHandshake());
234                     addServer.handshake(this,channel,idenReq);
235                     break;
236                 }
237                 case MuxAddChannelRequest.DELTA_ENCODING:
238                 {
239                     UpgradeRequest baseReq = addServer.getPhysicalHandshakeRequest();
240                     UpgradeRequest deltaReq = MuxRequest.parse(request.getHandshake());
241                     UpgradeRequest mergedReq = MuxRequest.merge(baseReq,deltaReq);
242 
243                     addServer.handshake(this,channel,mergedReq);
244                     break;
245                 }
246                 default:
247                 {
248                     throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.BAD_REQUEST,"Unrecognized request encoding");
249                 }
250             }
251         }
252         catch (MuxPhysicalConnectionException e)
253         {
254             throw e;
255         }
256         catch (Throwable t)
257         {
258             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.BAD_REQUEST,"Unable to parse request",t);
259         }
260     }
261 
262     /**
263      * Incoming mux control block, destined for the control channel (id 0)
264      */
265     @Override
266     public void onMuxAddChannelResponse(MuxAddChannelResponse response)
267     {
268         if (policy.getBehavior() == WebSocketBehavior.SERVER)
269         {
270             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"AddChannelResponse not allowed per spec");
271         }
272 
273         if (response.getRsv() != 0)
274         {
275             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_RESPONSE_ENCODING,"RSV Not allowed to be set");
276         }
277 
278         // Process channel
279         long channelId = response.getChannelId();
280         MuxChannel channel = getChannel(channelId,false);
281 
282         // Process Response headers
283         try
284         {
285             // Parse Response
286 
287             // TODO: Sec-WebSocket-Accept header
288             // TODO: Sec-WebSocket-Extensions header
289             // TODO: Setup extensions
290             // TODO: Setup sessions
291 
292             // Trigger channel open
293             channel.onOpen();
294         }
295         catch (Throwable t)
296         {
297             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.BAD_RESPONSE,"Unable to parse response",t);
298         }
299     }
300 
301     /**
302      * Incoming mux control block, destined for the control channel (id 0)
303      */
304     @Override
305     public void onMuxDropChannel(MuxDropChannel drop)
306     {
307         // Process channel
308         long channelId = drop.getChannelId();
309         MuxChannel channel = getChannel(channelId,false);
310 
311         String reason = "Mux " + drop.toString();
312         reason = StringUtil.truncate(reason,(WebSocketFrame.MAX_CONTROL_PAYLOAD - 2));
313         channel.close(StatusCode.PROTOCOL,reason);
314         // TODO: set channel to inactive?
315     }
316 
317     /**
318      * Incoming mux-unwrapped frames, destined for a sub-channel
319      */
320     @Override
321     public void onMuxedFrame(MuxedFrame frame)
322     {
323         MuxChannel subchannel = channels.get(frame.getChannelId());
324         subchannel.incomingFrame(frame);
325     }
326 
327     @Override
328     public void onMuxException(MuxException e)
329     {
330         if (e instanceof MuxPhysicalConnectionException)
331         {
332             mustFailPhysicalConnection((MuxPhysicalConnectionException)e);
333         }
334 
335         LOG.warn(e);
336         // TODO: handle other (non physical) mux exceptions how?
337     }
338 
339     /**
340      * Incoming mux control block, destined for the control channel (id 0)
341      */
342     @Override
343     public void onMuxFlowControl(MuxFlowControl flow)
344     {
345         if (flow.getSendQuotaSize() > 0x7F_FF_FF_FF_FF_FF_FF_FFL)
346         {
347             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.SEND_QUOTA_OVERFLOW,"Send Quota Overflow");
348         }
349 
350         // Process channel
351         long channelId = flow.getChannelId();
352         MuxChannel channel = getChannel(channelId,false);
353 
354         // TODO: set channel quota
355     }
356 
357     /**
358      * Incoming mux control block, destined for the control channel (id 0)
359      */
360     @Override
361     public void onMuxNewChannelSlot(MuxNewChannelSlot slot)
362     {
363         if (policy.getBehavior() == WebSocketBehavior.SERVER)
364         {
365             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"NewChannelSlot not allowed per spec");
366         }
367 
368         if (slot.isFallback())
369         {
370             if (slot.getNumberOfSlots() == 0)
371             {
372                 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Cannot have 0 number of slots during fallback");
373             }
374             if (slot.getInitialSendQuota() == 0)
375             {
376                 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Cannot have 0 initial send quota during fallback");
377             }
378         }
379 
380         // TODO: handle channel slot
381     }
382 
383     /**
384      * Outgoing frame, without mux encapsulated payload.
385      */
386     public void output(long channelId, Frame frame, WriteCallback callback)
387     {
388         if (LOG.isDebugEnabled())
389         {
390             LOG.debug("output({}, {})",channelId,frame,callback);
391         }
392         generator.generate(channelId,frame,callback);
393     }
394 
395     /**
396      * Write an OP out the physical connection.
397      * 
398      * @param op
399      *            the mux operation to write
400      * @throws IOException
401      */
402     public void output(MuxControlBlock op) throws IOException
403     {
404         generator.generate(null,op);
405     }
406 
407     public void setAddClient(MuxAddClient addClient)
408     {
409         this.addClient = addClient;
410     }
411 
412     public void setAddServer(MuxAddServer addServer)
413     {
414         this.addServer = addServer;
415     }
416 
417     public void setOutgoingFramesHandler(OutgoingFrames outgoing)
418     {
419         this.generator.setOutgoing(outgoing);
420     }
421 
422     /**
423      * Set the remote address of the physical connection.
424      * <p>
425      * This address made available to sub-channels.
426      * 
427      * @param remoteAddress
428      *            the remote address
429      */
430     public void setRemoteAddress(InetSocketAddress remoteAddress)
431     {
432         this.remoteAddress = remoteAddress;
433     }
434 
435     @Override
436     public String toString()
437     {
438         return String.format("Muxer[subChannels.size=%d]",channels.size());
439     }
440 }