1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.extensions;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.ListIterator;
25 import java.util.Queue;
26
27 import org.eclipse.jetty.util.ConcurrentArrayQueue;
28 import org.eclipse.jetty.util.IteratingCallback;
29 import org.eclipse.jetty.util.annotation.ManagedAttribute;
30 import org.eclipse.jetty.util.annotation.ManagedObject;
31 import org.eclipse.jetty.util.component.ContainerLifeCycle;
32 import org.eclipse.jetty.util.log.Log;
33 import org.eclipse.jetty.util.log.Logger;
34 import org.eclipse.jetty.websocket.api.BatchMode;
35 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
36 import org.eclipse.jetty.websocket.api.WriteCallback;
37 import org.eclipse.jetty.websocket.api.extensions.Extension;
38 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
39 import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
40 import org.eclipse.jetty.websocket.api.extensions.Frame;
41 import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
42 import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
43 import org.eclipse.jetty.websocket.common.Generator;
44 import org.eclipse.jetty.websocket.common.Parser;
45
46
47
48
49 @ManagedObject("Extension Stack")
50 public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames, OutgoingFrames
51 {
52 private static final Logger LOG = Log.getLogger(ExtensionStack.class);
53
54 private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>();
55 private final IteratingCallback flusher = new Flusher();
56 private final ExtensionFactory factory;
57 private List<Extension> extensions;
58 private IncomingFrames nextIncoming;
59 private OutgoingFrames nextOutgoing;
60
61 public ExtensionStack(ExtensionFactory factory)
62 {
63 this.factory = factory;
64 }
65
66 public void configure(Generator generator)
67 {
68 generator.configureFromExtensions(extensions);
69 }
70
71 public void configure(Parser parser)
72 {
73 parser.configureFromExtensions(extensions);
74 }
75
76 @Override
77 protected void doStart() throws Exception
78 {
79 super.doStart();
80
81
82 if ((extensions != null) && (extensions.size() > 0))
83 {
84 ListIterator<Extension> exts = extensions.listIterator();
85
86
87 while (exts.hasNext())
88 {
89 Extension ext = exts.next();
90 ext.setNextOutgoingFrames(nextOutgoing);
91 nextOutgoing = ext;
92 }
93
94
95 while (exts.hasPrevious())
96 {
97 Extension ext = exts.previous();
98 ext.setNextIncomingFrames(nextIncoming);
99 nextIncoming = ext;
100 }
101 }
102 }
103
104 @Override
105 public void dump(Appendable out, String indent) throws IOException
106 {
107 super.dump(out,indent);
108
109 IncomingFrames websocket = getLastIncoming();
110 OutgoingFrames network = getLastOutgoing();
111
112 out.append(indent).append(" +- Stack").append(System.lineSeparator());
113 out.append(indent).append(" +- Network : ").append(network.toString()).append(System.lineSeparator());
114 for (Extension ext : extensions)
115 {
116 out.append(indent).append(" +- Extension: ").append(ext.toString()).append(System.lineSeparator());
117 }
118 out.append(indent).append(" +- Websocket: ").append(websocket.toString()).append(System.lineSeparator());
119 }
120
121 @ManagedAttribute(name = "Extension List", readonly = true)
122 public List<Extension> getExtensions()
123 {
124 return extensions;
125 }
126
127 private IncomingFrames getLastIncoming()
128 {
129 IncomingFrames last = nextIncoming;
130 boolean done = false;
131 while (!done)
132 {
133 if (last instanceof AbstractExtension)
134 {
135 last = ((AbstractExtension)last).getNextIncoming();
136 }
137 else
138 {
139 done = true;
140 }
141 }
142 return last;
143 }
144
145 private OutgoingFrames getLastOutgoing()
146 {
147 OutgoingFrames last = nextOutgoing;
148 boolean done = false;
149 while (!done)
150 {
151 if (last instanceof AbstractExtension)
152 {
153 last = ((AbstractExtension)last).getNextOutgoing();
154 }
155 else
156 {
157 done = true;
158 }
159 }
160 return last;
161 }
162
163
164
165
166
167
168 public List<ExtensionConfig> getNegotiatedExtensions()
169 {
170 List<ExtensionConfig> ret = new ArrayList<>();
171 if (extensions == null)
172 {
173 return ret;
174 }
175
176 for (Extension ext : extensions)
177 {
178 if (ext.getName().charAt(0) == '@')
179 {
180
181 continue;
182 }
183 ret.add(ext.getConfig());
184 }
185 return ret;
186 }
187
188 @ManagedAttribute(name = "Next Incoming Frames Handler", readonly = true)
189 public IncomingFrames getNextIncoming()
190 {
191 return nextIncoming;
192 }
193
194 @ManagedAttribute(name = "Next Outgoing Frames Handler", readonly = true)
195 public OutgoingFrames getNextOutgoing()
196 {
197 return nextOutgoing;
198 }
199
200 public boolean hasNegotiatedExtensions()
201 {
202 return (this.extensions != null) && (this.extensions.size() > 0);
203 }
204
205 @Override
206 public void incomingError(Throwable e)
207 {
208 nextIncoming.incomingError(e);
209 }
210
211 @Override
212 public void incomingFrame(Frame frame)
213 {
214 nextIncoming.incomingFrame(frame);
215 }
216
217
218
219
220
221
222
223
224
225 public void negotiate(List<ExtensionConfig> configs)
226 {
227 if (LOG.isDebugEnabled())
228 LOG.debug("Extension Configs={}",configs);
229
230 this.extensions = new ArrayList<>();
231
232 String rsvClaims[] = new String[3];
233
234 for (ExtensionConfig config : configs)
235 {
236 Extension ext = factory.newInstance(config);
237 if (ext == null)
238 {
239
240 continue;
241 }
242
243
244 if (ext.isRsv1User() && (rsvClaims[0] != null))
245 {
246 LOG.debug("Not adding extension {}. Extension {} already claimed RSV1",config,rsvClaims[0]);
247 continue;
248 }
249 if (ext.isRsv2User() && (rsvClaims[1] != null))
250 {
251 LOG.debug("Not adding extension {}. Extension {} already claimed RSV2",config,rsvClaims[1]);
252 continue;
253 }
254 if (ext.isRsv3User() && (rsvClaims[2] != null))
255 {
256 LOG.debug("Not adding extension {}. Extension {} already claimed RSV3",config,rsvClaims[2]);
257 continue;
258 }
259
260
261 extensions.add(ext);
262 addBean(ext);
263
264 if (LOG.isDebugEnabled())
265 LOG.debug("Adding Extension: {}",config);
266
267
268 if (ext.isRsv1User())
269 {
270 rsvClaims[0] = ext.getName();
271 }
272 if (ext.isRsv2User())
273 {
274 rsvClaims[1] = ext.getName();
275 }
276 if (ext.isRsv3User())
277 {
278 rsvClaims[2] = ext.getName();
279 }
280 }
281 }
282
283 @Override
284 public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
285 {
286 FrameEntry entry = new FrameEntry(frame,callback,batchMode);
287 if (LOG.isDebugEnabled())
288 LOG.debug("Queuing {}",entry);
289 entries.offer(entry);
290 flusher.iterate();
291 }
292
293 public void setNextIncoming(IncomingFrames nextIncoming)
294 {
295 this.nextIncoming = nextIncoming;
296 }
297
298 public void setNextOutgoing(OutgoingFrames nextOutgoing)
299 {
300 this.nextOutgoing = nextOutgoing;
301 }
302
303 public void setPolicy(WebSocketPolicy policy)
304 {
305 for (Extension extension : extensions)
306 {
307 if (extension instanceof AbstractExtension)
308 {
309 ((AbstractExtension)extension).setPolicy(policy);
310 }
311 }
312 }
313
314 @Override
315 public String toString()
316 {
317 StringBuilder s = new StringBuilder();
318 s.append("ExtensionStack[");
319 s.append("queueSize=").append(entries.size());
320 s.append(",extensions=");
321 if (extensions == null)
322 {
323 s.append("<null>");
324 }
325 else
326 {
327 s.append('[');
328 boolean delim = false;
329 for (Extension ext : extensions)
330 {
331 if (delim)
332 {
333 s.append(',');
334 }
335 if (ext == null)
336 {
337 s.append("<null>");
338 }
339 else
340 {
341 s.append(ext.getName());
342 }
343 delim = true;
344 }
345 s.append(']');
346 }
347 s.append(",incoming=").append((this.nextIncoming == null)?"<null>":this.nextIncoming.getClass().getName());
348 s.append(",outgoing=").append((this.nextOutgoing == null)?"<null>":this.nextOutgoing.getClass().getName());
349 s.append("]");
350 return s.toString();
351 }
352
353 private static class FrameEntry
354 {
355 private final Frame frame;
356 private final WriteCallback callback;
357 private final BatchMode batchMode;
358
359 private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
360 {
361 this.frame = frame;
362 this.callback = callback;
363 this.batchMode = batchMode;
364 }
365
366 @Override
367 public String toString()
368 {
369 return frame.toString();
370 }
371 }
372
373 private class Flusher extends IteratingCallback implements WriteCallback
374 {
375 private FrameEntry current;
376
377 @Override
378 protected Action process() throws Exception
379 {
380 current = entries.poll();
381 if (current == null)
382 {
383 if (LOG.isDebugEnabled())
384 LOG.debug("Entering IDLE");
385 return Action.IDLE;
386 }
387 if (LOG.isDebugEnabled())
388 LOG.debug("Processing {}",current);
389 nextOutgoing.outgoingFrame(current.frame,this,current.batchMode);
390 return Action.SCHEDULED;
391 }
392
393 @Override
394 protected void onCompleteSuccess()
395 {
396
397 }
398
399 @Override
400 protected void onCompleteFailure(Throwable x)
401 {
402
403
404
405 }
406
407 @Override
408 public void writeSuccess()
409 {
410
411
412 notifyCallbackSuccess(current.callback);
413 succeeded();
414 }
415
416 @Override
417 public void writeFailed(Throwable x)
418 {
419
420
421
422
423
424 notifyCallbackFailure(current.callback,x);
425 succeeded();
426 }
427
428 private void notifyCallbackSuccess(WriteCallback callback)
429 {
430 try
431 {
432 if (callback != null)
433 callback.writeSuccess();
434 }
435 catch (Throwable x)
436 {
437 LOG.debug("Exception while notifying success of callback " + callback,x);
438 }
439 }
440
441 private void notifyCallbackFailure(WriteCallback callback, Throwable failure)
442 {
443 try
444 {
445 if (callback != null)
446 callback.writeFailed(failure);
447 }
448 catch (Throwable x)
449 {
450 LOG.debug("Exception while notifying failure of callback " + callback,x);
451 }
452 }
453 }
454 }