1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.extensions.mux;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.eclipse.jetty.util.StringUtil;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.websocket.api.StatusCode;
31 import org.eclipse.jetty.websocket.api.UpgradeRequest;
32 import org.eclipse.jetty.websocket.api.UpgradeResponse;
33 import org.eclipse.jetty.websocket.api.WebSocketBehavior;
34 import org.eclipse.jetty.websocket.api.WebSocketException;
35 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
36 import org.eclipse.jetty.websocket.api.WriteCallback;
37 import org.eclipse.jetty.websocket.api.extensions.Frame;
38 import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
39 import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
40 import org.eclipse.jetty.websocket.common.LogicalConnection;
41 import org.eclipse.jetty.websocket.common.WebSocketFrame;
42 import org.eclipse.jetty.websocket.common.extensions.mux.add.MuxAddClient;
43 import org.eclipse.jetty.websocket.common.extensions.mux.add.MuxAddServer;
44 import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxAddChannelRequest;
45 import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxAddChannelResponse;
46 import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxDropChannel;
47 import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxFlowControl;
48 import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxNewChannelSlot;
49
50
51
52
53
54
55
56
57
58 public class Muxer implements IncomingFrames, MuxParser.Listener
59 {
60 private static final int CONTROL_CHANNEL_ID = 0;
61
62 private static final Logger LOG = Log.getLogger(Muxer.class);
63
64
65
66
67 private Map<Long, MuxChannel> channels = new HashMap<Long, MuxChannel>();
68
69 private final WebSocketPolicy policy;
70 private final LogicalConnection physicalConnection;
71 private InetSocketAddress remoteAddress;
72
73 private MuxParser parser;
74
75 private MuxGenerator generator;
76 private MuxAddServer addServer;
77 private MuxAddClient addClient;
78
79 private UpgradeRequest physicalRequestHeaders;
80
81 private UpgradeResponse physicalResponseHeaders;
82
83 public Muxer(final LogicalConnection connection)
84 {
85 this.physicalConnection = connection;
86 this.policy = connection.getPolicy().clonePolicy();
87 this.parser = new MuxParser();
88 this.parser.setEvents(this);
89 this.generator = new MuxGenerator();
90 }
91
92 public MuxAddClient getAddClient()
93 {
94 return addClient;
95 }
96
97 public MuxAddServer getAddServer()
98 {
99 return addServer;
100 }
101
102 public MuxChannel getChannel(long channelId, boolean create)
103 {
104 if (channelId == CONTROL_CHANNEL_ID)
105 {
106 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Invalid Channel ID");
107 }
108
109 MuxChannel channel = channels.get(channelId);
110 if (channel == null)
111 {
112 if (create)
113 {
114 channel = new MuxChannel(channelId,this);
115 channels.put(channelId,channel);
116 }
117 else
118 {
119 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Unknown Channel ID");
120 }
121 }
122 return channel;
123 }
124
125 public WebSocketPolicy getPolicy()
126 {
127 return policy;
128 }
129
130
131
132
133
134
135 public InetSocketAddress getRemoteAddress()
136 {
137 return this.remoteAddress;
138 }
139
140
141
142
143 @Override
144 public void incomingError(WebSocketException e)
145 {
146 MuxDropChannel.Reason reason = MuxDropChannel.Reason.PHYSICAL_CONNECTION_FAILED;
147 String phrase = String.format("%s: %s", e.getClass().getName(), e.getMessage());
148 mustFailPhysicalConnection(new MuxPhysicalConnectionException(reason,phrase));
149 }
150
151
152
153
154 @Override
155 public void incomingFrame(Frame frame)
156 {
157 parser.parse(frame);
158 }
159
160
161
162
163
164
165 public boolean isOpen()
166 {
167 return physicalConnection.isOpen();
168 }
169
170 public String mergeHeaders(List<String> physicalHeaders, String deltaHeaders)
171 {
172
173 return null;
174 }
175
176
177
178
179
180
181
182
183
184 private void mustFailPhysicalConnection(MuxPhysicalConnectionException muxe)
185 {
186
187
188 MuxDropChannel drop = muxe.getMuxDropChannel();
189 LOG.warn(muxe);
190 try
191 {
192 generator.generate(null,drop);
193 }
194 catch (IOException ioe)
195 {
196 LOG.warn("Unable to send mux DropChannel",ioe);
197 }
198
199 String reason = "Mux[MUST FAIL]" + drop.getPhrase();
200 reason = StringUtil.truncate(reason,WebSocketFrame.MAX_CONTROL_PAYLOAD);
201 this.physicalConnection.close(StatusCode.SERVER_ERROR,reason);
202
203
204 }
205
206
207
208
209 @Override
210 public void onMuxAddChannelRequest(MuxAddChannelRequest request)
211 {
212 if (policy.getBehavior() == WebSocketBehavior.CLIENT)
213 {
214 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"AddChannelRequest not allowed per spec");
215 }
216
217 if (request.getRsv() != 0)
218 {
219 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_REQUEST_ENCODING,"RSV Not allowed to be set");
220 }
221
222
223 long channelId = request.getChannelId();
224 MuxChannel channel = getChannel(channelId, true);
225
226
227 try
228 {
229 switch (request.getEncoding())
230 {
231 case MuxAddChannelRequest.IDENTITY_ENCODING:
232 {
233 UpgradeRequest idenReq = MuxRequest.parse(request.getHandshake());
234 addServer.handshake(this,channel,idenReq);
235 break;
236 }
237 case MuxAddChannelRequest.DELTA_ENCODING:
238 {
239 UpgradeRequest baseReq = addServer.getPhysicalHandshakeRequest();
240 UpgradeRequest deltaReq = MuxRequest.parse(request.getHandshake());
241 UpgradeRequest mergedReq = MuxRequest.merge(baseReq,deltaReq);
242
243 addServer.handshake(this,channel,mergedReq);
244 break;
245 }
246 default:
247 {
248 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.BAD_REQUEST,"Unrecognized request encoding");
249 }
250 }
251 }
252 catch (MuxPhysicalConnectionException e)
253 {
254 throw e;
255 }
256 catch (Throwable t)
257 {
258 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.BAD_REQUEST,"Unable to parse request",t);
259 }
260 }
261
262
263
264
265 @Override
266 public void onMuxAddChannelResponse(MuxAddChannelResponse response)
267 {
268 if (policy.getBehavior() == WebSocketBehavior.SERVER)
269 {
270 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"AddChannelResponse not allowed per spec");
271 }
272
273 if (response.getRsv() != 0)
274 {
275 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_RESPONSE_ENCODING,"RSV Not allowed to be set");
276 }
277
278
279 long channelId = response.getChannelId();
280 MuxChannel channel = getChannel(channelId,false);
281
282
283 try
284 {
285
286
287
288
289
290
291
292
293 channel.onOpen();
294 }
295 catch (Throwable t)
296 {
297 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.BAD_RESPONSE,"Unable to parse response",t);
298 }
299 }
300
301
302
303
304 @Override
305 public void onMuxDropChannel(MuxDropChannel drop)
306 {
307
308 long channelId = drop.getChannelId();
309 MuxChannel channel = getChannel(channelId,false);
310
311 String reason = "Mux " + drop.toString();
312 reason = StringUtil.truncate(reason,(WebSocketFrame.MAX_CONTROL_PAYLOAD - 2));
313 channel.close(StatusCode.PROTOCOL,reason);
314
315 }
316
317
318
319
320 @Override
321 public void onMuxedFrame(MuxedFrame frame)
322 {
323 MuxChannel subchannel = channels.get(frame.getChannelId());
324 subchannel.incomingFrame(frame);
325 }
326
327 @Override
328 public void onMuxException(MuxException e)
329 {
330 if (e instanceof MuxPhysicalConnectionException)
331 {
332 mustFailPhysicalConnection((MuxPhysicalConnectionException)e);
333 }
334
335 LOG.warn(e);
336
337 }
338
339
340
341
342 @Override
343 public void onMuxFlowControl(MuxFlowControl flow)
344 {
345 if (flow.getSendQuotaSize() > 0x7F_FF_FF_FF_FF_FF_FF_FFL)
346 {
347 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.SEND_QUOTA_OVERFLOW,"Send Quota Overflow");
348 }
349
350
351 long channelId = flow.getChannelId();
352 MuxChannel channel = getChannel(channelId,false);
353
354
355 }
356
357
358
359
360 @Override
361 public void onMuxNewChannelSlot(MuxNewChannelSlot slot)
362 {
363 if (policy.getBehavior() == WebSocketBehavior.SERVER)
364 {
365 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"NewChannelSlot not allowed per spec");
366 }
367
368 if (slot.isFallback())
369 {
370 if (slot.getNumberOfSlots() == 0)
371 {
372 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Cannot have 0 number of slots during fallback");
373 }
374 if (slot.getInitialSendQuota() == 0)
375 {
376 throw new MuxPhysicalConnectionException(MuxDropChannel.Reason.UNKNOWN_MUX_CONTROL_BLOCK,"Cannot have 0 initial send quota during fallback");
377 }
378 }
379
380
381 }
382
383
384
385
386 public void output(long channelId, Frame frame, WriteCallback callback)
387 {
388 if (LOG.isDebugEnabled())
389 {
390 LOG.debug("output({}, {})",channelId,frame,callback);
391 }
392 generator.generate(channelId,frame,callback);
393 }
394
395
396
397
398
399
400
401
402 public void output(MuxControlBlock op) throws IOException
403 {
404 generator.generate(null,op);
405 }
406
407 public void setAddClient(MuxAddClient addClient)
408 {
409 this.addClient = addClient;
410 }
411
412 public void setAddServer(MuxAddServer addServer)
413 {
414 this.addServer = addServer;
415 }
416
417 public void setOutgoingFramesHandler(OutgoingFrames outgoing)
418 {
419 this.generator.setOutgoing(outgoing);
420 }
421
422
423
424
425
426
427
428
429
430 public void setRemoteAddress(InetSocketAddress remoteAddress)
431 {
432 this.remoteAddress = remoteAddress;
433 }
434
435 @Override
436 public String toString()
437 {
438 return String.format("Muxer[subChannels.size=%d]",channels.size());
439 }
440 }