View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.extensions;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.ListIterator;
25  import java.util.Queue;
26  
27  import org.eclipse.jetty.util.ConcurrentArrayQueue;
28  import org.eclipse.jetty.util.IteratingCallback;
29  import org.eclipse.jetty.util.annotation.ManagedAttribute;
30  import org.eclipse.jetty.util.annotation.ManagedObject;
31  import org.eclipse.jetty.util.component.ContainerLifeCycle;
32  import org.eclipse.jetty.util.component.LifeCycle;
33  import org.eclipse.jetty.util.log.Log;
34  import org.eclipse.jetty.util.log.Logger;
35  import org.eclipse.jetty.websocket.api.BatchMode;
36  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
37  import org.eclipse.jetty.websocket.api.WriteCallback;
38  import org.eclipse.jetty.websocket.api.extensions.Extension;
39  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
40  import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
41  import org.eclipse.jetty.websocket.api.extensions.Frame;
42  import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
43  import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
44  import org.eclipse.jetty.websocket.common.Generator;
45  import org.eclipse.jetty.websocket.common.Parser;
46  
47  /**
48   * Represents the stack of Extensions.
49   */
50  @ManagedObject("Extension Stack")
51  public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames, OutgoingFrames
52  {
53      private static final Logger LOG = Log.getLogger(ExtensionStack.class);
54  
55      private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>();
56      private final IteratingCallback flusher = new Flusher();
57      private final ExtensionFactory factory;
58      private List<Extension> extensions;
59      private IncomingFrames nextIncoming;
60      private OutgoingFrames nextOutgoing;
61  
62      public ExtensionStack(ExtensionFactory factory)
63      {
64          this.factory = factory;
65      }
66  
67      public void configure(Generator generator)
68      {
69          generator.configureFromExtensions(extensions);
70      }
71  
72      public void configure(Parser parser)
73      {
74          parser.configureFromExtensions(extensions);
75      }
76  
77      @Override
78      protected void doStart() throws Exception
79      {
80          super.doStart();
81  
82          // Wire up Extensions
83          if ((extensions != null) && (extensions.size() > 0))
84          {
85              ListIterator<Extension> exts = extensions.listIterator();
86  
87              // Connect outgoings
88              while (exts.hasNext())
89              {
90                  Extension ext = exts.next();
91                  ext.setNextOutgoingFrames(nextOutgoing);
92                  nextOutgoing = ext;
93                  
94                  if (ext instanceof LifeCycle)
95                  {
96                      addBean(ext,true);
97                  }
98              }
99  
100             // Connect incomings
101             while (exts.hasPrevious())
102             {
103                 Extension ext = exts.previous();
104                 ext.setNextIncomingFrames(nextIncoming);
105                 nextIncoming = ext;
106             }
107         }
108     }
109 
110     @Override
111     public void dump(Appendable out, String indent) throws IOException
112     {
113         super.dump(out,indent);
114 
115         IncomingFrames websocket = getLastIncoming();
116         OutgoingFrames network = getLastOutgoing();
117 
118         out.append(indent).append(" +- Stack").append(System.lineSeparator());
119         out.append(indent).append("     +- Network  : ").append(network.toString()).append(System.lineSeparator());
120         for (Extension ext : extensions)
121         {
122             out.append(indent).append("     +- Extension: ").append(ext.toString()).append(System.lineSeparator());
123         }
124         out.append(indent).append("     +- Websocket: ").append(websocket.toString()).append(System.lineSeparator());
125     }
126 
127     @ManagedAttribute(name = "Extension List", readonly = true)
128     public List<Extension> getExtensions()
129     {
130         return extensions;
131     }
132 
133     private IncomingFrames getLastIncoming()
134     {
135         IncomingFrames last = nextIncoming;
136         boolean done = false;
137         while (!done)
138         {
139             if (last instanceof AbstractExtension)
140             {
141                 last = ((AbstractExtension)last).getNextIncoming();
142             }
143             else
144             {
145                 done = true;
146             }
147         }
148         return last;
149     }
150 
151     private OutgoingFrames getLastOutgoing()
152     {
153         OutgoingFrames last = nextOutgoing;
154         boolean done = false;
155         while (!done)
156         {
157             if (last instanceof AbstractExtension)
158             {
159                 last = ((AbstractExtension)last).getNextOutgoing();
160             }
161             else
162             {
163                 done = true;
164             }
165         }
166         return last;
167     }
168 
169     /**
170      * Get the list of negotiated extensions, each entry being a full "name; params" extension configuration
171      * 
172      * @return list of negotiated extensions
173      */
174     public List<ExtensionConfig> getNegotiatedExtensions()
175     {
176         List<ExtensionConfig> ret = new ArrayList<>();
177         if (extensions == null)
178         {
179             return ret;
180         }
181 
182         for (Extension ext : extensions)
183         {
184             if (ext.getName().charAt(0) == '@')
185             {
186                 // special, internal-only extensions, not present on negotiation level
187                 continue;
188             }
189             ret.add(ext.getConfig());
190         }
191         return ret;
192     }
193 
194     @ManagedAttribute(name = "Next Incoming Frames Handler", readonly = true)
195     public IncomingFrames getNextIncoming()
196     {
197         return nextIncoming;
198     }
199 
200     @ManagedAttribute(name = "Next Outgoing Frames Handler", readonly = true)
201     public OutgoingFrames getNextOutgoing()
202     {
203         return nextOutgoing;
204     }
205 
206     public boolean hasNegotiatedExtensions()
207     {
208         return (this.extensions != null) && (this.extensions.size() > 0);
209     }
210 
211     @Override
212     public void incomingError(Throwable e)
213     {
214         nextIncoming.incomingError(e);
215     }
216 
217     @Override
218     public void incomingFrame(Frame frame)
219     {
220         nextIncoming.incomingFrame(frame);
221     }
222 
223     /**
224      * Perform the extension negotiation.
225      * <p>
226      * For the list of negotiated extensions, use {@link #getNegotiatedExtensions()}
227      * 
228      * @param configs
229      *            the configurations being requested
230      */
231     public void negotiate(List<ExtensionConfig> configs)
232     {
233         if (LOG.isDebugEnabled())
234             LOG.debug("Extension Configs={}",configs);
235 
236         this.extensions = new ArrayList<>();
237 
238         String rsvClaims[] = new String[3];
239 
240         for (ExtensionConfig config : configs)
241         {
242             Extension ext = factory.newInstance(config);
243             if (ext == null)
244             {
245                 // Extension not present on this side
246                 continue;
247             }
248 
249             // Check RSV
250             if (ext.isRsv1User() && (rsvClaims[0] != null))
251             {
252                 LOG.debug("Not adding extension {}. Extension {} already claimed RSV1",config,rsvClaims[0]);
253                 continue;
254             }
255             if (ext.isRsv2User() && (rsvClaims[1] != null))
256             {
257                 LOG.debug("Not adding extension {}. Extension {} already claimed RSV2",config,rsvClaims[1]);
258                 continue;
259             }
260             if (ext.isRsv3User() && (rsvClaims[2] != null))
261             {
262                 LOG.debug("Not adding extension {}. Extension {} already claimed RSV3",config,rsvClaims[2]);
263                 continue;
264             }
265 
266             // Add Extension
267             extensions.add(ext);
268             addBean(ext);
269 
270             if (LOG.isDebugEnabled())
271                 LOG.debug("Adding Extension: {}",config);
272 
273             // Record RSV Claims
274             if (ext.isRsv1User())
275             {
276                 rsvClaims[0] = ext.getName();
277             }
278             if (ext.isRsv2User())
279             {
280                 rsvClaims[1] = ext.getName();
281             }
282             if (ext.isRsv3User())
283             {
284                 rsvClaims[2] = ext.getName();
285             }
286         }
287     }
288 
289     @Override
290     public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
291     {
292         FrameEntry entry = new FrameEntry(frame,callback,batchMode);
293         if (LOG.isDebugEnabled())
294             LOG.debug("Queuing {}",entry);
295         entries.offer(entry);
296         flusher.iterate();
297     }
298 
299     public void setNextIncoming(IncomingFrames nextIncoming)
300     {
301         this.nextIncoming = nextIncoming;
302     }
303 
304     public void setNextOutgoing(OutgoingFrames nextOutgoing)
305     {
306         this.nextOutgoing = nextOutgoing;
307     }
308 
309     public void setPolicy(WebSocketPolicy policy)
310     {
311         for (Extension extension : extensions)
312         {
313             if (extension instanceof AbstractExtension)
314             {
315                 ((AbstractExtension)extension).setPolicy(policy);
316             }
317         }
318     }
319 
320     @Override
321     public String toString()
322     {
323         StringBuilder s = new StringBuilder();
324         s.append("ExtensionStack[");
325         s.append("queueSize=").append(entries.size());
326         s.append(",extensions=");
327         if (extensions == null)
328         {
329             s.append("<null>");
330         }
331         else
332         {
333             s.append('[');
334             boolean delim = false;
335             for (Extension ext : extensions)
336             {
337                 if (delim)
338                 {
339                     s.append(',');
340                 }
341                 if (ext == null)
342                 {
343                     s.append("<null>");
344                 }
345                 else
346                 {
347                     s.append(ext.getName());
348                 }
349                 delim = true;
350             }
351             s.append(']');
352         }
353         s.append(",incoming=").append((this.nextIncoming == null)?"<null>":this.nextIncoming.getClass().getName());
354         s.append(",outgoing=").append((this.nextOutgoing == null)?"<null>":this.nextOutgoing.getClass().getName());
355         s.append("]");
356         return s.toString();
357     }
358 
359     private static class FrameEntry
360     {
361         private final Frame frame;
362         private final WriteCallback callback;
363         private final BatchMode batchMode;
364 
365         private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
366         {
367             this.frame = frame;
368             this.callback = callback;
369             this.batchMode = batchMode;
370         }
371 
372         @Override
373         public String toString()
374         {
375             return frame.toString();
376         }
377     }
378 
379     private class Flusher extends IteratingCallback implements WriteCallback
380     {
381         private FrameEntry current;
382 
383         @Override
384         protected Action process() throws Exception
385         {
386             current = entries.poll();
387             if (current == null)
388             {
389                 if (LOG.isDebugEnabled())
390                     LOG.debug("Entering IDLE");
391                 return Action.IDLE;
392             }
393             if (LOG.isDebugEnabled())
394                 LOG.debug("Processing {}",current);
395             nextOutgoing.outgoingFrame(current.frame,this,current.batchMode);
396             return Action.SCHEDULED;
397         }
398 
399         @Override
400         protected void onCompleteSuccess()
401         {
402             // This IteratingCallback never completes.
403         }
404         
405         @Override
406         protected void onCompleteFailure(Throwable x)
407         {
408             // This IteratingCallback never fails.
409             // The callback are those provided by WriteCallback (implemented
410             // below) and even in case of writeFailed() we call succeeded().
411         }
412         
413         @Override
414         public void writeSuccess()
415         {
416             // Notify first then call succeeded(), otherwise
417             // write callbacks may be invoked out of order.
418             notifyCallbackSuccess(current.callback);
419             succeeded();
420         }
421 
422         @Override
423         public void writeFailed(Throwable x)
424         {
425             // Notify first, the call succeeded() to drain the queue.
426             // We don't want to call failed(x) because that will put
427             // this flusher into a final state that cannot be exited,
428             // and the failure of a frame may not mean that the whole
429             // connection is now invalid.
430             notifyCallbackFailure(current.callback,x);
431             succeeded();
432         }
433 
434         private void notifyCallbackSuccess(WriteCallback callback)
435         {
436             try
437             {
438                 if (callback != null)
439                     callback.writeSuccess();
440             }
441             catch (Throwable x)
442             {
443                 LOG.debug("Exception while notifying success of callback " + callback,x);
444             }
445         }
446 
447         private void notifyCallbackFailure(WriteCallback callback, Throwable failure)
448         {
449             try
450             {
451                 if (callback != null)
452                     callback.writeFailed(failure);
453             }
454             catch (Throwable x)
455             {
456                 LOG.debug("Exception while notifying failure of callback " + callback,x);
457             }
458         }
459     }
460 }