1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.mux;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.eclipse.jetty.util.StringUtil;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.websocket.api.StatusCode;
31 import org.eclipse.jetty.websocket.api.UpgradeRequest;
32 import org.eclipse.jetty.websocket.api.UpgradeResponse;
33 import org.eclipse.jetty.websocket.api.WebSocketBehavior;
34 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
35 import org.eclipse.jetty.websocket.api.WriteCallback;
36 import org.eclipse.jetty.websocket.api.extensions.Frame;
37 import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
38 import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
39 import org.eclipse.jetty.websocket.common.LogicalConnection;
40 import org.eclipse.jetty.websocket.common.frames.ControlFrame;
41 import org.eclipse.jetty.websocket.mux.add.MuxAddClient;
42 import org.eclipse.jetty.websocket.mux.add.MuxAddServer;
43 import org.eclipse.jetty.websocket.mux.op.MuxAddChannelRequest;
44 import org.eclipse.jetty.websocket.mux.op.MuxAddChannelResponse;
45 import org.eclipse.jetty.websocket.mux.op.MuxDropChannel;
46 import org.eclipse.jetty.websocket.mux.op.MuxFlowControl;
47 import org.eclipse.jetty.websocket.mux.op.MuxNewChannelSlot;
48
49
50
51
52
53
54
55
56
57 public class Muxer implements IncomingFrames, MuxParser.Listener
58 {
59 private static final int CONTROL_CHANNEL_ID = 0;
60
61 private static final Logger LOG = Log.getLogger(Muxer.class);
62
63
64
65
66 private Map<Long, MuxChannel> channels = new HashMap<Long, MuxChannel>();
67
68 private final WebSocketPolicy policy;
69 private final LogicalConnection physicalConnection;
70 private InetSocketAddress remoteAddress;
71
72 private MuxParser parser;
73
74 private MuxGenerator generator;
75 private MuxAddServer addServer;
76 private MuxAddClient addClient;
77
78 private UpgradeRequest physicalRequestHeaders;
79
80 private UpgradeResponse physicalResponseHeaders;
81
82 public Muxer(final LogicalConnection connection)
83 {
84 this.physicalConnection = connection;
85 this.policy = connection.getPolicy().clonePolicy();
86 this.parser = new MuxParser();
87 this.parser.setEvents(this);
88 this.generator = new MuxGenerator();
89 }
90
91 public MuxAddClient getAddClient()
92 {
93 return addClient;
94 }
95
96 public MuxAddServer getAddServer()
97 {
98 return addServer;
99 }
100
101 public MuxChannel getChannel(long channelId, boolean create)
102 {
103 if (channelId == CONTROL_CHANNEL_ID)
104 {
105 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Invalid Channel ID");
106 }
107
108 MuxChannel channel = channels.get(channelId);
109 if (channel == null)
110 {
111 if (create)
112 {
113 channel = new MuxChannel(channelId,this);
114 channels.put(channelId,channel);
115 }
116 else
117 {
118 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Unknown Channel ID");
119 }
120 }
121 return channel;
122 }
123
124 public WebSocketPolicy getPolicy()
125 {
126 return policy;
127 }
128
129
130
131
132
133
134 public InetSocketAddress getRemoteAddress()
135 {
136 return this.remoteAddress;
137 }
138
139
140
141
142 @Override
143 public void incomingError(Throwable e)
144 {
145 MuxDropChannel.Reason reason = MuxDropChannel.Reason.PHYSICAL_CONNECTION_FAILED;
146 String phrase = String.format("%s: %s", e.getClass().getName(), e.getMessage());
147 mustFailPhysicalConnection(new MuxPhysicalConnectionException(reason,phrase));
148 }
149
150
151
152
153 @Override
154 public void incomingFrame(Frame frame)
155 {
156 parser.parse(frame);
157 }
158
159
160
161
162
163
164 public boolean isOpen()
165 {
166 return physicalConnection.isOpen();
167 }
168
169 public String mergeHeaders(List<String> physicalHeaders, String deltaHeaders)
170 {
171
172 return null;
173 }
174
175
176
177
178
179
180
181
182
183 private void mustFailPhysicalConnection(MuxPhysicalConnectionException muxe)
184 {
185
186
187 MuxDropChannel drop = muxe.getMuxDropChannel();
188 LOG.warn(muxe);
189 try
190 {
191 generator.generate(null,drop);
192 }
193 catch (IOException ioe)
194 {
195 LOG.warn("Unable to send mux DropChannel",ioe);
196 }
197
198 String reason = "Mux[MUST FAIL]" + drop.getPhrase();
199 reason = StringUtil.truncate(reason,ControlFrame.MAX_CONTROL_PAYLOAD);
200 this.physicalConnection.close(StatusCode.SERVER_ERROR,reason);
201
202
203 }
204
205
206
207
208 @Override
209 public void onMuxAddChannelRequest(MuxAddChannelRequest request)
210 {
211 if (policy.getBehavior() == WebSocketBehavior.CLIENT)
212 {
213 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"AddChannelRequest not allowed per spec");
214 }
215
216 if (request.getRsv() != 0)
217 {
218 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_REQUEST_ENCODING,"RSV Not allowed to be set");
219 }
220
221
222 long channelId = request.getChannelId();
223 MuxChannel channel = getChannel(channelId, true);
224
225
226 try
227 {
228 switch (request.getEncoding())
229 {
230 case MuxAddChannelRequest.IDENTITY_ENCODING:
231 {
232 UpgradeRequest idenReq = MuxRequest.parse(request.getHandshake());
233 addServer.handshake(this,channel,idenReq);
234 break;
235 }
236 case MuxAddChannelRequest.DELTA_ENCODING:
237 {
238 UpgradeRequest baseReq = addServer.getPhysicalHandshakeRequest();
239 UpgradeRequest deltaReq = MuxRequest.parse(request.getHandshake());
240 UpgradeRequest mergedReq = MuxRequest.merge(baseReq,deltaReq);
241
242 addServer.handshake(this,channel,mergedReq);
243 break;
244 }
245 default:
246 {
247 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.BAD_REQUEST,"Unrecognized request encoding");
248 }
249 }
250 }
251 catch (MuxPhysicalConnectionException e)
252 {
253 throw e;
254 }
255 catch (Throwable t)
256 {
257 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.BAD_REQUEST,"Unable to parse request",t);
258 }
259 }
260
261
262
263
264 @Override
265 public void onMuxAddChannelResponse(MuxAddChannelResponse response)
266 {
267 if (policy.getBehavior() == WebSocketBehavior.SERVER)
268 {
269 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"AddChannelResponse not allowed per spec");
270 }
271
272 if (response.getRsv() != 0)
273 {
274 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_RESPONSE_ENCODING,"RSV Not allowed to be set");
275 }
276
277
278 long channelId = response.getChannelId();
279 MuxChannel channel = getChannel(channelId,false);
280
281
282 try
283 {
284
285
286
287
288
289
290
291
292 channel.onOpen();
293 }
294 catch (Throwable t)
295 {
296 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.BAD_RESPONSE,"Unable to parse response",t);
297 }
298 }
299
300
301
302
303 @Override
304 public void onMuxDropChannel(MuxDropChannel drop)
305 {
306
307 long channelId = drop.getChannelId();
308 MuxChannel channel = getChannel(channelId,false);
309
310 String reason = "Mux " + drop.toString();
311 reason = StringUtil.truncate(reason,(ControlFrame.MAX_CONTROL_PAYLOAD - 2));
312 channel.close(StatusCode.PROTOCOL,reason);
313
314 }
315
316
317
318
319 @Override
320 public void onMuxedFrame(MuxedFrame frame)
321 {
322 MuxChannel subchannel = channels.get(frame.getChannelId());
323 subchannel.incomingFrame(frame);
324 }
325
326 @Override
327 public void onMuxException(MuxException e)
328 {
329 if (e instanceof MuxPhysicalConnectionException)
330 {
331 mustFailPhysicalConnection((MuxPhysicalConnectionException)e);
332 }
333
334 LOG.warn(e);
335
336 }
337
338
339
340
341 @Override
342 public void onMuxFlowControl(MuxFlowControl flow)
343 {
344 if (flow.getSendQuotaSize() > 0x7F_FF_FF_FF_FF_FF_FF_FFL)
345 {
346 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.SEND_QUOTA_OVERFLOW,"Send Quota Overflow");
347 }
348
349
350 long channelId = flow.getChannelId();
351 MuxChannel channel = getChannel(channelId,false);
352
353
354 }
355
356
357
358
359 @Override
360 public void onMuxNewChannelSlot(MuxNewChannelSlot slot)
361 {
362 if (policy.getBehavior() == WebSocketBehavior.SERVER)
363 {
364 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"NewChannelSlot not allowed per spec");
365 }
366
367 if (slot.isFallback())
368 {
369 if (slot.getNumberOfSlots() == 0)
370 {
371 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Cannot have 0 number of slots during fallback");
372 }
373 if (slot.getInitialSendQuota() == 0)
374 {
375 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Cannot have 0 initial send quota during fallback");
376 }
377 }
378
379
380 }
381
382
383
384
385 public void output(long channelId, Frame frame, WriteCallback callback)
386 {
387 if (LOG.isDebugEnabled())
388 {
389 LOG.debug("output({}, {})",channelId,frame,callback);
390 }
391 generator.generate(channelId,frame,callback);
392 }
393
394
395
396
397
398
399
400
401 public void output(MuxControlBlock op) throws IOException
402 {
403 generator.generate(null,op);
404 }
405
406 public void setAddClient(MuxAddClient addClient)
407 {
408 this.addClient = addClient;
409 }
410
411 public void setAddServer(MuxAddServer addServer)
412 {
413 this.addServer = addServer;
414 }
415
416 public void setOutgoingFramesHandler(OutgoingFrames outgoing)
417 {
418 this.generator.setOutgoing(outgoing);
419 }
420
421
422
423
424
425
426
427
428
429 public void setRemoteAddress(InetSocketAddress remoteAddress)
430 {
431 this.remoteAddress = remoteAddress;
432 }
433
434 @Override
435 public String toString()
436 {
437 return String.format("Muxer[subChannels.size=%d]",channels.size());
438 }
439 }