View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.extensions.fragment;
20  
21  
22  import java.nio.ByteBuffer;
23  import java.util.Queue;
24  
25  import org.eclipse.jetty.util.ConcurrentArrayQueue;
26  import org.eclipse.jetty.util.IteratingCallback;
27  import org.eclipse.jetty.util.log.Log;
28  import org.eclipse.jetty.util.log.Logger;
29  import org.eclipse.jetty.websocket.api.BatchMode;
30  import org.eclipse.jetty.websocket.api.WriteCallback;
31  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
32  import org.eclipse.jetty.websocket.api.extensions.Frame;
33  import org.eclipse.jetty.websocket.common.OpCode;
34  import org.eclipse.jetty.websocket.common.extensions.AbstractExtension;
35  import org.eclipse.jetty.websocket.common.frames.DataFrame;
36  
37  /**
38   * Fragment Extension
39   */
40  public class FragmentExtension extends AbstractExtension
41  {
42      private static final Logger LOG = Log.getLogger(FragmentExtension.class);
43  
44      private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>();
45      private final IteratingCallback flusher = new Flusher();
46      private int maxLength;
47  
48      @Override
49      public String getName()
50      {
51          return "fragment";
52      }
53  
54      @Override
55      public void incomingFrame(Frame frame)
56      {
57          nextIncomingFrame(frame);
58      }
59  
60      @Override
61      public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
62      {
63          ByteBuffer payload = frame.getPayload();
64          int length = payload != null ? payload.remaining() : 0;
65          if (OpCode.isControlFrame(frame.getOpCode()) || maxLength <= 0 || length <= maxLength)
66          {
67              nextOutgoingFrame(frame, callback, batchMode);
68              return;
69          }
70  
71          FrameEntry entry = new FrameEntry(frame, callback, batchMode);
72          if (LOG.isDebugEnabled())
73              LOG.debug("Queuing {}", entry);
74          entries.offer(entry);
75          flusher.iterate();
76      }
77  
78      @Override
79      public void setConfig(ExtensionConfig config)
80      {
81          super.setConfig(config);
82          maxLength = config.getParameter("maxLength", -1);
83      }
84  
85      private static class FrameEntry
86      {
87          private final Frame frame;
88          private final WriteCallback callback;
89          private final BatchMode batchMode;
90  
91          private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
92          {
93              this.frame = frame;
94              this.callback = callback;
95              this.batchMode = batchMode;
96          }
97  
98          @Override
99          public String toString()
100         {
101             return frame.toString();
102         }
103     }
104 
105     private class Flusher extends IteratingCallback implements WriteCallback
106     {
107         private FrameEntry current;
108         private boolean finished = true;
109 
110         @Override
111         protected Action process() throws Exception
112         {
113             if (finished)
114             {
115                 current = entries.poll();
116                 LOG.debug("Processing {}", current);
117                 if (current == null)
118                     return Action.IDLE;
119                 fragment(current, true);
120             }
121             else
122             {
123                 fragment(current, false);
124             }
125             return Action.SCHEDULED;
126         }
127 
128         private void fragment(FrameEntry entry, boolean first)
129         {
130             Frame frame = entry.frame;
131             ByteBuffer payload = frame.getPayload();
132             int remaining = payload.remaining();
133             int length = Math.min(remaining, maxLength);
134             finished = length == remaining;
135 
136             boolean continuation = frame.getType().isContinuation() || !first;
137             DataFrame fragment = new DataFrame(frame, continuation);
138             boolean fin = frame.isFin() && finished;
139             fragment.setFin(fin);
140 
141             int limit = payload.limit();
142             int newLimit = payload.position() + length;
143             payload.limit(newLimit);
144             ByteBuffer payloadFragment = payload.slice();
145             payload.limit(limit);
146             fragment.setPayload(payloadFragment);
147             if (LOG.isDebugEnabled())
148                 LOG.debug("Fragmented {}->{}", frame, fragment);
149             payload.position(newLimit);
150 
151             nextOutgoingFrame(fragment, this, entry.batchMode);
152         }
153 
154         @Override
155         protected void onCompleteSuccess()
156         {
157             // This IteratingCallback never completes.
158         }
159         
160         @Override
161         protected void onCompleteFailure(Throwable x)
162         {
163             // This IteratingCallback never fails.
164             // The callback are those provided by WriteCallback (implemented
165             // below) and even in case of writeFailed() we call succeeded().
166         }
167         
168         @Override
169         public void writeSuccess()
170         {
171             // Notify first then call succeeded(), otherwise
172             // write callbacks may be invoked out of order.
173             notifyCallbackSuccess(current.callback);
174             succeeded();
175         }
176 
177         @Override
178         public void writeFailed(Throwable x)
179         {
180             // Notify first, the call succeeded() to drain the queue.
181             // We don't want to call failed(x) because that will put
182             // this flusher into a final state that cannot be exited,
183             // and the failure of a frame may not mean that the whole
184             // connection is now invalid.
185             notifyCallbackFailure(current.callback, x);
186             succeeded();
187         }
188 
189         private void notifyCallbackSuccess(WriteCallback callback)
190         {
191             try
192             {
193                 if (callback != null)
194                     callback.writeSuccess();
195             }
196             catch (Throwable x)
197             {
198                 if (LOG.isDebugEnabled())
199                     LOG.debug("Exception while notifying success of callback " + callback, x);
200             }
201         }
202 
203         private void notifyCallbackFailure(WriteCallback callback, Throwable failure)
204         {
205             try
206             {
207                 if (callback != null)
208                     callback.writeFailed(failure);
209             }
210             catch (Throwable x)
211             {
212                 if (LOG.isDebugEnabled())
213                     LOG.debug("Exception while notifying failure of callback " + callback, x);
214             }
215         }
216     }
217 }