View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.mux;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.eclipse.jetty.util.StringUtil;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  import org.eclipse.jetty.websocket.api.StatusCode;
31  import org.eclipse.jetty.websocket.api.UpgradeRequest;
32  import org.eclipse.jetty.websocket.api.UpgradeResponse;
33  import org.eclipse.jetty.websocket.api.WebSocketBehavior;
34  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
35  import org.eclipse.jetty.websocket.api.WriteCallback;
36  import org.eclipse.jetty.websocket.api.extensions.Frame;
37  import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
38  import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
39  import org.eclipse.jetty.websocket.common.LogicalConnection;
40  import org.eclipse.jetty.websocket.common.frames.ControlFrame;
41  import org.eclipse.jetty.websocket.mux.add.MuxAddClient;
42  import org.eclipse.jetty.websocket.mux.add.MuxAddServer;
43  import org.eclipse.jetty.websocket.mux.op.MuxAddChannelRequest;
44  import org.eclipse.jetty.websocket.mux.op.MuxAddChannelResponse;
45  import org.eclipse.jetty.websocket.mux.op.MuxDropChannel;
46  import org.eclipse.jetty.websocket.mux.op.MuxFlowControl;
47  import org.eclipse.jetty.websocket.mux.op.MuxNewChannelSlot;
48  
49  /**
50   * Muxer responsible for managing sub-channels.
51   * <p>
52   * Maintains a 1 (incoming and outgoing mux encapsulated frames) to many (per-channel incoming/outgoing standard websocket frames) relationship, along with
53   * routing of {@link MuxControlBlock} events.
54   * <p>
55   * Control Channel events (channel ID == 0) are handled by the Muxer.
56   */
57  public class Muxer implements IncomingFrames, MuxParser.Listener
58  {
59      private static final int CONTROL_CHANNEL_ID = 0;
60  
61      private static final Logger LOG = Log.getLogger(Muxer.class);
62  
63      /**
64       * Map of sub-channels, key is the channel Id.
65       */
66      private Map<Long, MuxChannel> channels = new HashMap<Long, MuxChannel>();
67  
68      private final WebSocketPolicy policy;
69      private final LogicalConnection physicalConnection;
70      private InetSocketAddress remoteAddress;
71      /** Parsing frames destined for sub-channels */
72      private MuxParser parser;
73      /** Generating frames destined for physical connection */
74      private MuxGenerator generator;
75      private MuxAddServer addServer;
76      private MuxAddClient addClient;
77      /** The original request headers, used for delta encoded AddChannelRequest blocks */
78      private UpgradeRequest physicalRequestHeaders;
79      /** The original response headers, used for delta encoded AddChannelResponse blocks */
80      private UpgradeResponse physicalResponseHeaders;
81  
82      public Muxer(final LogicalConnection connection)
83      {
84          this.physicalConnection = connection;
85          this.policy = connection.getPolicy().clonePolicy();
86          this.parser = new MuxParser();
87          this.parser.setEvents(this);
88          this.generator = new MuxGenerator();
89      }
90  
91      public MuxAddClient getAddClient()
92      {
93          return addClient;
94      }
95  
96      public MuxAddServer getAddServer()
97      {
98          return addServer;
99      }
100 
101     public MuxChannel getChannel(long channelId, boolean create)
102     {
103         if (channelId == CONTROL_CHANNEL_ID)
104         {
105             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Invalid Channel ID");
106         }
107 
108         MuxChannel channel = channels.get(channelId);
109         if (channel == null)
110         {
111             if (create)
112             {
113                 channel = new MuxChannel(channelId,this);
114                 channels.put(channelId,channel);
115             }
116             else
117             {
118                 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Unknown Channel ID");
119             }
120         }
121         return channel;
122     }
123 
124     public WebSocketPolicy getPolicy()
125     {
126         return policy;
127     }
128 
129     /**
130      * Get the remote address of the physical connection.
131      * 
132      * @return the remote address of the physical connection
133      */
134     public InetSocketAddress getRemoteAddress()
135     {
136         return this.remoteAddress;
137     }
138 
139     /**
140      * Incoming parser errors
141      */
142     @Override
143     public void incomingError(Throwable e)
144     {
145         MuxDropChannel.Reason reason = MuxDropChannel.Reason.PHYSICAL_CONNECTION_FAILED;
146         String phrase = String.format("%s: %s", e.getClass().getName(), e.getMessage());
147         mustFailPhysicalConnection(new MuxPhysicalConnectionException(reason,phrase));
148     }
149 
150     /**
151      * Incoming mux encapsulated frames.
152      */
153     @Override
154     public void incomingFrame(Frame frame)
155     {
156         parser.parse(frame);
157     }
158 
159     /**
160      * Is the muxer and the physical connection still open?
161      * 
162      * @return true if open
163      */
164     public boolean isOpen()
165     {
166         return physicalConnection.isOpen();
167     }
168 
169     public String mergeHeaders(List<String> physicalHeaders, String deltaHeaders)
170     {
171         // TODO Auto-generated method stub
172         return null;
173     }
174 
175     /**
176      * Per spec, the physical connection must be failed.
177      * <p>
178      * <a href="https://tools.ietf.org/html/draft-ietf-hybi-websocket-multiplexing-08#section-18">Section 18. Fail the Physical Connection.</a>
179      * 
180      * <blockquote> To _Fail the Physical Connection_, an endpoint MUST send a DropChannel multiplex control block with objective channel ID of 0 and drop
181      * reason code in the range of 2000-2999, and then _Fail the WebSocket Connection_ on the physical connection with status code of 1011. </blockquote>
182      */
183     private void mustFailPhysicalConnection(MuxPhysicalConnectionException muxe)
184     {
185         // TODO: stop muxer from receiving incoming sub-channel traffic.
186 
187         MuxDropChannel drop = muxe.getMuxDropChannel();
188         LOG.warn(muxe);
189         try
190         {
191             generator.generate(null,drop);
192         }
193         catch (IOException ioe)
194         {
195             LOG.warn("Unable to send mux DropChannel",ioe);
196         }
197 
198         String reason = "Mux[MUST FAIL]" + drop.getPhrase();
199         reason = StringUtil.truncate(reason,ControlFrame.MAX_CONTROL_PAYLOAD);
200         this.physicalConnection.close(StatusCode.SERVER_ERROR,reason);
201 
202         // TODO: trigger abnormal close for all sub-channels.
203     }
204 
205     /**
206      * Incoming mux control block, destined for the control channel (id 0)
207      */
208     @Override
209     public void onMuxAddChannelRequest(MuxAddChannelRequest request)
210     {
211         if (policy.getBehavior() == WebSocketBehavior.CLIENT)
212         {
213             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"AddChannelRequest not allowed per spec");
214         }
215 
216         if (request.getRsv() != 0)
217         {
218             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_REQUEST_ENCODING,"RSV Not allowed to be set");
219         }
220 
221         // Pre-allocate channel.
222         long channelId = request.getChannelId();
223         MuxChannel channel = getChannel(channelId, true);
224 
225         // submit to upgrade handshake process
226         try
227         {
228             switch (request.getEncoding())
229             {
230                 case MuxAddChannelRequest.IDENTITY_ENCODING:
231                 {
232                     UpgradeRequest idenReq = MuxRequest.parse(request.getHandshake());
233                     addServer.handshake(this,channel,idenReq);
234                     break;
235                 }
236                 case MuxAddChannelRequest.DELTA_ENCODING:
237                 {
238                     UpgradeRequest baseReq = addServer.getPhysicalHandshakeRequest();
239                     UpgradeRequest deltaReq = MuxRequest.parse(request.getHandshake());
240                     UpgradeRequest mergedReq = MuxRequest.merge(baseReq,deltaReq);
241 
242                     addServer.handshake(this,channel,mergedReq);
243                     break;
244                 }
245                 default:
246                 {
247                     throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.BAD_REQUEST,"Unrecognized request encoding");
248                 }
249             }
250         }
251         catch (MuxPhysicalConnectionException e)
252         {
253             throw e;
254         }
255         catch (Throwable t)
256         {
257             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.BAD_REQUEST,"Unable to parse request",t);
258         }
259     }
260 
261     /**
262      * Incoming mux control block, destined for the control channel (id 0)
263      */
264     @Override
265     public void onMuxAddChannelResponse(MuxAddChannelResponse response)
266     {
267         if (policy.getBehavior() == WebSocketBehavior.SERVER)
268         {
269             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"AddChannelResponse not allowed per spec");
270         }
271 
272         if (response.getRsv() != 0)
273         {
274             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_RESPONSE_ENCODING,"RSV Not allowed to be set");
275         }
276 
277         // Process channel
278         long channelId = response.getChannelId();
279         MuxChannel channel = getChannel(channelId,false);
280 
281         // Process Response headers
282         try
283         {
284             // Parse Response
285 
286             // TODO: Sec-WebSocket-Accept header
287             // TODO: Sec-WebSocket-Extensions header
288             // TODO: Setup extensions
289             // TODO: Setup sessions
290 
291             // Trigger channel open
292             channel.onOpen();
293         }
294         catch (Throwable t)
295         {
296             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.BAD_RESPONSE,"Unable to parse response",t);
297         }
298     }
299 
300     /**
301      * Incoming mux control block, destined for the control channel (id 0)
302      */
303     @Override
304     public void onMuxDropChannel(MuxDropChannel drop)
305     {
306         // Process channel
307         long channelId = drop.getChannelId();
308         MuxChannel channel = getChannel(channelId,false);
309 
310         String reason = "Mux " + drop.toString();
311         reason = StringUtil.truncate(reason,(ControlFrame.MAX_CONTROL_PAYLOAD - 2));
312         channel.close(StatusCode.PROTOCOL,reason);
313         // TODO: set channel to inactive?
314     }
315 
316     /**
317      * Incoming mux-unwrapped frames, destined for a sub-channel
318      */
319     @Override
320     public void onMuxedFrame(MuxedFrame frame)
321     {
322         MuxChannel subchannel = channels.get(frame.getChannelId());
323         subchannel.incomingFrame(frame);
324     }
325 
326     @Override
327     public void onMuxException(MuxException e)
328     {
329         if (e instanceof MuxPhysicalConnectionException)
330         {
331             mustFailPhysicalConnection((MuxPhysicalConnectionException)e);
332         }
333 
334         LOG.warn(e);
335         // TODO: handle other (non physical) mux exceptions how?
336     }
337 
338     /**
339      * Incoming mux control block, destined for the control channel (id 0)
340      */
341     @Override
342     public void onMuxFlowControl(MuxFlowControl flow)
343     {
344         if (flow.getSendQuotaSize() > 0x7F_FF_FF_FF_FF_FF_FF_FFL)
345         {
346             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.SEND_QUOTA_OVERFLOW,"Send Quota Overflow");
347         }
348 
349         // Process channel
350         long channelId = flow.getChannelId();
351         MuxChannel channel = getChannel(channelId,false);
352 
353         // TODO: set channel quota
354     }
355 
356     /**
357      * Incoming mux control block, destined for the control channel (id 0)
358      */
359     @Override
360     public void onMuxNewChannelSlot(MuxNewChannelSlot slot)
361     {
362         if (policy.getBehavior() == WebSocketBehavior.SERVER)
363         {
364             throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"NewChannelSlot not allowed per spec");
365         }
366 
367         if (slot.isFallback())
368         {
369             if (slot.getNumberOfSlots() == 0)
370             {
371                 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Cannot have 0 number of slots during fallback");
372             }
373             if (slot.getInitialSendQuota() == 0)
374             {
375                 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Cannot have 0 initial send quota during fallback");
376             }
377         }
378 
379         // TODO: handle channel slot
380     }
381 
382     /**
383      * Outgoing frame, without mux encapsulated payload.
384      */
385     public void output(long channelId, Frame frame, WriteCallback callback)
386     {
387         if (LOG.isDebugEnabled())
388         {
389             LOG.debug("output({}, {})",channelId,frame,callback);
390         }
391         generator.generate(channelId,frame,callback);
392     }
393 
394     /**
395      * Write an OP out the physical connection.
396      * 
397      * @param op
398      *            the mux operation to write
399      * @throws IOException
400      */
401     public void output(MuxControlBlock op) throws IOException
402     {
403         generator.generate(null,op);
404     }
405 
406     public void setAddClient(MuxAddClient addClient)
407     {
408         this.addClient = addClient;
409     }
410 
411     public void setAddServer(MuxAddServer addServer)
412     {
413         this.addServer = addServer;
414     }
415 
416     public void setOutgoingFramesHandler(OutgoingFrames outgoing)
417     {
418         this.generator.setOutgoing(outgoing);
419     }
420 
421     /**
422      * Set the remote address of the physical connection.
423      * <p>
424      * This address made available to sub-channels.
425      * 
426      * @param remoteAddress
427      *            the remote address
428      */
429     public void setRemoteAddress(InetSocketAddress remoteAddress)
430     {
431         this.remoteAddress = remoteAddress;
432     }
433 
434     @Override
435     public String toString()
436     {
437         return String.format("Muxer[subChannels.size=%d]",channels.size());
438     }
439 }