1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.extensions;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.ListIterator;
25 import java.util.Queue;
26
27 import org.eclipse.jetty.util.ConcurrentArrayQueue;
28 import org.eclipse.jetty.util.IteratingCallback;
29 import org.eclipse.jetty.util.annotation.ManagedAttribute;
30 import org.eclipse.jetty.util.annotation.ManagedObject;
31 import org.eclipse.jetty.util.component.ContainerLifeCycle;
32 import org.eclipse.jetty.util.component.LifeCycle;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35 import org.eclipse.jetty.websocket.api.BatchMode;
36 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
37 import org.eclipse.jetty.websocket.api.WriteCallback;
38 import org.eclipse.jetty.websocket.api.extensions.Extension;
39 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
40 import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
41 import org.eclipse.jetty.websocket.api.extensions.Frame;
42 import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
43 import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
44 import org.eclipse.jetty.websocket.common.Generator;
45 import org.eclipse.jetty.websocket.common.Parser;
46
47
48
49
50 @ManagedObject("Extension Stack")
51 public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames, OutgoingFrames
52 {
53 private static final Logger LOG = Log.getLogger(ExtensionStack.class);
54
55 private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>();
56 private final IteratingCallback flusher = new Flusher();
57 private final ExtensionFactory factory;
58 private List<Extension> extensions;
59 private IncomingFrames nextIncoming;
60 private OutgoingFrames nextOutgoing;
61
62 public ExtensionStack(ExtensionFactory factory)
63 {
64 this.factory = factory;
65 }
66
67 public void configure(Generator generator)
68 {
69 generator.configureFromExtensions(extensions);
70 }
71
72 public void configure(Parser parser)
73 {
74 parser.configureFromExtensions(extensions);
75 }
76
77 @Override
78 protected void doStart() throws Exception
79 {
80 super.doStart();
81
82
83 if ((extensions != null) && (extensions.size() > 0))
84 {
85 ListIterator<Extension> exts = extensions.listIterator();
86
87
88 while (exts.hasNext())
89 {
90 Extension ext = exts.next();
91 ext.setNextOutgoingFrames(nextOutgoing);
92 nextOutgoing = ext;
93
94 if (ext instanceof LifeCycle)
95 {
96 addBean(ext,true);
97 }
98 }
99
100
101 while (exts.hasPrevious())
102 {
103 Extension ext = exts.previous();
104 ext.setNextIncomingFrames(nextIncoming);
105 nextIncoming = ext;
106 }
107 }
108 }
109
110 @Override
111 public void dump(Appendable out, String indent) throws IOException
112 {
113 super.dump(out,indent);
114
115 IncomingFrames websocket = getLastIncoming();
116 OutgoingFrames network = getLastOutgoing();
117
118 out.append(indent).append(" +- Stack").append(System.lineSeparator());
119 out.append(indent).append(" +- Network : ").append(network.toString()).append(System.lineSeparator());
120 for (Extension ext : extensions)
121 {
122 out.append(indent).append(" +- Extension: ").append(ext.toString()).append(System.lineSeparator());
123 }
124 out.append(indent).append(" +- Websocket: ").append(websocket.toString()).append(System.lineSeparator());
125 }
126
127 @ManagedAttribute(name = "Extension List", readonly = true)
128 public List<Extension> getExtensions()
129 {
130 return extensions;
131 }
132
133 private IncomingFrames getLastIncoming()
134 {
135 IncomingFrames last = nextIncoming;
136 boolean done = false;
137 while (!done)
138 {
139 if (last instanceof AbstractExtension)
140 {
141 last = ((AbstractExtension)last).getNextIncoming();
142 }
143 else
144 {
145 done = true;
146 }
147 }
148 return last;
149 }
150
151 private OutgoingFrames getLastOutgoing()
152 {
153 OutgoingFrames last = nextOutgoing;
154 boolean done = false;
155 while (!done)
156 {
157 if (last instanceof AbstractExtension)
158 {
159 last = ((AbstractExtension)last).getNextOutgoing();
160 }
161 else
162 {
163 done = true;
164 }
165 }
166 return last;
167 }
168
169
170
171
172
173
174 public List<ExtensionConfig> getNegotiatedExtensions()
175 {
176 List<ExtensionConfig> ret = new ArrayList<>();
177 if (extensions == null)
178 {
179 return ret;
180 }
181
182 for (Extension ext : extensions)
183 {
184 if (ext.getName().charAt(0) == '@')
185 {
186
187 continue;
188 }
189 ret.add(ext.getConfig());
190 }
191 return ret;
192 }
193
194 @ManagedAttribute(name = "Next Incoming Frames Handler", readonly = true)
195 public IncomingFrames getNextIncoming()
196 {
197 return nextIncoming;
198 }
199
200 @ManagedAttribute(name = "Next Outgoing Frames Handler", readonly = true)
201 public OutgoingFrames getNextOutgoing()
202 {
203 return nextOutgoing;
204 }
205
206 public boolean hasNegotiatedExtensions()
207 {
208 return (this.extensions != null) && (this.extensions.size() > 0);
209 }
210
211 @Override
212 public void incomingError(Throwable e)
213 {
214 nextIncoming.incomingError(e);
215 }
216
217 @Override
218 public void incomingFrame(Frame frame)
219 {
220 nextIncoming.incomingFrame(frame);
221 }
222
223
224
225
226
227
228
229
230
231 public void negotiate(List<ExtensionConfig> configs)
232 {
233 if (LOG.isDebugEnabled())
234 LOG.debug("Extension Configs={}",configs);
235
236 this.extensions = new ArrayList<>();
237
238 String rsvClaims[] = new String[3];
239
240 for (ExtensionConfig config : configs)
241 {
242 Extension ext = factory.newInstance(config);
243 if (ext == null)
244 {
245
246 continue;
247 }
248
249
250 if (ext.isRsv1User() && (rsvClaims[0] != null))
251 {
252 LOG.debug("Not adding extension {}. Extension {} already claimed RSV1",config,rsvClaims[0]);
253 continue;
254 }
255 if (ext.isRsv2User() && (rsvClaims[1] != null))
256 {
257 LOG.debug("Not adding extension {}. Extension {} already claimed RSV2",config,rsvClaims[1]);
258 continue;
259 }
260 if (ext.isRsv3User() && (rsvClaims[2] != null))
261 {
262 LOG.debug("Not adding extension {}. Extension {} already claimed RSV3",config,rsvClaims[2]);
263 continue;
264 }
265
266
267 extensions.add(ext);
268 addBean(ext);
269
270 if (LOG.isDebugEnabled())
271 LOG.debug("Adding Extension: {}",config);
272
273
274 if (ext.isRsv1User())
275 {
276 rsvClaims[0] = ext.getName();
277 }
278 if (ext.isRsv2User())
279 {
280 rsvClaims[1] = ext.getName();
281 }
282 if (ext.isRsv3User())
283 {
284 rsvClaims[2] = ext.getName();
285 }
286 }
287 }
288
289 @Override
290 public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
291 {
292 FrameEntry entry = new FrameEntry(frame,callback,batchMode);
293 if (LOG.isDebugEnabled())
294 LOG.debug("Queuing {}",entry);
295 entries.offer(entry);
296 flusher.iterate();
297 }
298
299 public void setNextIncoming(IncomingFrames nextIncoming)
300 {
301 this.nextIncoming = nextIncoming;
302 }
303
304 public void setNextOutgoing(OutgoingFrames nextOutgoing)
305 {
306 this.nextOutgoing = nextOutgoing;
307 }
308
309 public void setPolicy(WebSocketPolicy policy)
310 {
311 for (Extension extension : extensions)
312 {
313 if (extension instanceof AbstractExtension)
314 {
315 ((AbstractExtension)extension).setPolicy(policy);
316 }
317 }
318 }
319
320 @Override
321 public String toString()
322 {
323 StringBuilder s = new StringBuilder();
324 s.append("ExtensionStack[");
325 s.append("queueSize=").append(entries.size());
326 s.append(",extensions=");
327 if (extensions == null)
328 {
329 s.append("<null>");
330 }
331 else
332 {
333 s.append('[');
334 boolean delim = false;
335 for (Extension ext : extensions)
336 {
337 if (delim)
338 {
339 s.append(',');
340 }
341 if (ext == null)
342 {
343 s.append("<null>");
344 }
345 else
346 {
347 s.append(ext.getName());
348 }
349 delim = true;
350 }
351 s.append(']');
352 }
353 s.append(",incoming=").append((this.nextIncoming == null)?"<null>":this.nextIncoming.getClass().getName());
354 s.append(",outgoing=").append((this.nextOutgoing == null)?"<null>":this.nextOutgoing.getClass().getName());
355 s.append("]");
356 return s.toString();
357 }
358
359 private static class FrameEntry
360 {
361 private final Frame frame;
362 private final WriteCallback callback;
363 private final BatchMode batchMode;
364
365 private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
366 {
367 this.frame = frame;
368 this.callback = callback;
369 this.batchMode = batchMode;
370 }
371
372 @Override
373 public String toString()
374 {
375 return frame.toString();
376 }
377 }
378
379 private class Flusher extends IteratingCallback implements WriteCallback
380 {
381 private FrameEntry current;
382
383 @Override
384 protected Action process() throws Exception
385 {
386 current = entries.poll();
387 if (current == null)
388 {
389 if (LOG.isDebugEnabled())
390 LOG.debug("Entering IDLE");
391 return Action.IDLE;
392 }
393 if (LOG.isDebugEnabled())
394 LOG.debug("Processing {}",current);
395 nextOutgoing.outgoingFrame(current.frame,this,current.batchMode);
396 return Action.SCHEDULED;
397 }
398
399 @Override
400 protected void onCompleteSuccess()
401 {
402
403 }
404
405 @Override
406 protected void onCompleteFailure(Throwable x)
407 {
408
409
410
411 }
412
413 @Override
414 public void writeSuccess()
415 {
416
417
418 notifyCallbackSuccess(current.callback);
419 succeeded();
420 }
421
422 @Override
423 public void writeFailed(Throwable x)
424 {
425
426
427
428
429
430 notifyCallbackFailure(current.callback,x);
431 succeeded();
432 }
433
434 private void notifyCallbackSuccess(WriteCallback callback)
435 {
436 try
437 {
438 if (callback != null)
439 callback.writeSuccess();
440 }
441 catch (Throwable x)
442 {
443 LOG.debug("Exception while notifying success of callback " + callback,x);
444 }
445 }
446
447 private void notifyCallbackFailure(WriteCallback callback, Throwable failure)
448 {
449 try
450 {
451 if (callback != null)
452 callback.writeFailed(failure);
453 }
454 catch (Throwable x)
455 {
456 LOG.debug("Exception while notifying failure of callback " + callback,x);
457 }
458 }
459 }
460 }