View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.nio.ByteBuffer;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Objects;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  
28  import org.eclipse.jetty.io.ByteBufferPool;
29  import org.eclipse.jetty.io.EndPoint;
30  import org.eclipse.jetty.util.ArrayQueue;
31  import org.eclipse.jetty.util.BufferUtil;
32  import org.eclipse.jetty.util.Callback;
33  import org.eclipse.jetty.util.IteratingCallback;
34  import org.eclipse.jetty.util.log.Log;
35  import org.eclipse.jetty.util.log.Logger;
36  import org.eclipse.jetty.websocket.api.BatchMode;
37  import org.eclipse.jetty.websocket.api.WriteCallback;
38  import org.eclipse.jetty.websocket.api.extensions.Frame;
39  import org.eclipse.jetty.websocket.common.Generator;
40  import org.eclipse.jetty.websocket.common.OpCode;
41  import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
42  
43  /**
44   * Interface for working with bytes destined for {@link EndPoint#write(Callback, ByteBuffer...)}
45   */
46  public class FrameFlusher
47  {
48      public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
49      private static final Logger LOG = Log.getLogger(FrameFlusher.class);
50  
51      private final ByteBufferPool bufferPool;
52      private final EndPoint endpoint;
53      private final int bufferSize;
54      private final Generator generator;
55      private final int maxGather;
56      private final Object lock = new Object();
57      private final ArrayQueue<FrameEntry> queue = new ArrayQueue<>(16, 16, lock);
58      private final Flusher flusher = new Flusher();
59      private final AtomicBoolean closed = new AtomicBoolean();
60      private volatile Throwable failure;
61  
62      public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
63      {
64          this.bufferPool = bufferPool;
65          this.endpoint = endpoint;
66          this.bufferSize = bufferSize;
67          this.generator = Objects.requireNonNull(generator);
68          this.maxGather = maxGather;
69      }
70  
71      public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
72      {
73          if (closed.get())
74          {
75              notifyCallbackFailure(callback, new EOFException("Connection has been closed locally"));
76              return;
77          }
78          if (flusher.isFailed())
79          {
80              notifyCallbackFailure(callback, failure);
81              return;
82          }
83  
84          FrameEntry entry = new FrameEntry(frame, callback, batchMode);
85  
86          synchronized (lock)
87          {
88              switch (frame.getOpCode())
89              {
90                  case OpCode.PING:
91                  {
92                      // Prepend PINGs so they are processed first.
93                      queue.add(0, entry);
94                      break;
95                  }
96                  case OpCode.CLOSE:
97                  {
98                      // There may be a chance that other frames are
99                      // added after this close frame, but we will
100                     // fail them later to keep it simple here.
101                     closed.set(true);
102                     queue.add(entry);
103                     break;
104                 }
105                 default:
106                 {
107                     queue.add(entry);
108                     break;
109                 }
110             }
111         }
112 
113         if (LOG.isDebugEnabled())
114             LOG.debug("{} queued {}", this, entry);
115 
116         flusher.iterate();
117     }
118 
119     public void close()
120     {
121         if (closed.compareAndSet(false, true))
122         {
123             LOG.debug("{} closing {}", this);
124             EOFException eof = new EOFException("Connection has been closed locally");
125             flusher.failed(eof);
126 
127             // Fail also queued entries.
128             List<FrameEntry> entries = new ArrayList<>();
129             synchronized (lock)
130             {
131                 entries.addAll(queue);
132                 queue.clear();
133             }
134             // Notify outside sync block.
135             for (FrameEntry entry : entries)
136                 notifyCallbackFailure(entry.callback, eof);
137         }
138     }
139 
140     protected void onFailure(Throwable x)
141     {
142         LOG.warn(x);
143     }
144 
145     protected void notifyCallbackSuccess(WriteCallback callback)
146     {
147         try
148         {
149             if (callback != null)
150                 callback.writeSuccess();
151         }
152         catch (Throwable x)
153         {
154             LOG.debug("Exception while notifying success of callback " + callback, x);
155         }
156     }
157 
158     protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
159     {
160         try
161         {
162             if (callback != null)
163                 callback.writeFailed(failure);
164         }
165         catch (Throwable x)
166         {
167             LOG.debug("Exception while notifying failure of callback " + callback, x);
168         }
169     }
170 
171     @Override
172     public String toString()
173     {
174         ByteBuffer aggregate = flusher.aggregate;
175         return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",
176                 getClass().getSimpleName(),
177                 queue.size(),
178                 aggregate == null ? 0 : aggregate.position(),
179                 failure);
180     }
181 
182     private class Flusher extends IteratingCallback
183     {
184         private final List<FrameEntry> entries = new ArrayList<>(maxGather);
185         private final List<ByteBuffer> buffers = new ArrayList<>(maxGather * 2 + 1);
186         private ByteBuffer aggregate;
187         private BatchMode batchMode;
188 
189         @Override
190         protected Action process() throws Exception
191         {
192             int space = aggregate == null ? bufferSize : BufferUtil.space(aggregate);
193             BatchMode currentBatchMode = BatchMode.AUTO;
194             synchronized (lock)
195             {
196                 while (entries.size() <= maxGather && !queue.isEmpty())
197                 {
198                     FrameEntry entry = queue.remove(0);
199                     currentBatchMode = BatchMode.max(currentBatchMode, entry.batchMode);
200 
201                     // Force flush if we need to.
202                     if (entry.frame == FLUSH_FRAME)
203                         currentBatchMode = BatchMode.OFF;
204 
205                     int payloadLength = BufferUtil.length(entry.frame.getPayload());
206                     int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
207 
208                     // If it is a "big" frame, avoid copying into the aggregate buffer.
209                     if (approxFrameLength > (bufferSize >> 2))
210                         currentBatchMode = BatchMode.OFF;
211 
212                     // If the aggregate buffer overflows, do not batch.
213                     space -= approxFrameLength;
214                     if (space <= 0)
215                         currentBatchMode = BatchMode.OFF;
216 
217                     entries.add(entry);
218                 }
219             }
220 
221             if (LOG.isDebugEnabled())
222                 LOG.debug("{} processing {} entries: {}", FrameFlusher.this, entries.size(), entries);
223 
224             if (entries.isEmpty())
225             {
226                 if (batchMode != BatchMode.AUTO)
227                 {
228                     // Nothing more to do, release the aggregate buffer if we need to.
229                     // Releasing it here rather than in succeeded() allows for its reuse.
230                     releaseAggregate();
231                     return Action.IDLE;
232                 }
233 
234                 LOG.debug("{} auto flushing", FrameFlusher.this);
235                 return flush();
236             }
237 
238             batchMode = currentBatchMode;
239 
240             return currentBatchMode == BatchMode.OFF ? flush() : batch();
241         }
242 
243         private Action flush()
244         {
245             if (!BufferUtil.isEmpty(aggregate))
246             {
247                 buffers.add(aggregate);
248                 if (LOG.isDebugEnabled())
249                     LOG.debug("{} flushing aggregate {}", FrameFlusher.this, aggregate);
250             }
251 
252             // Do not allocate the iterator here.
253             for (int i = 0; i < entries.size(); ++i)
254             {
255                 FrameEntry entry = entries.get(i);
256                 // Skip the "synthetic" frame used for flushing.
257                 if (entry.frame == FLUSH_FRAME)
258                     continue;
259                 buffers.add(entry.generateHeaderBytes());
260                 ByteBuffer payload = entry.frame.getPayload();
261                 if (BufferUtil.hasContent(payload))
262                     buffers.add(payload);
263             }
264 
265             if (LOG.isDebugEnabled())
266                 LOG.debug("{} flushing {} frames: {}", FrameFlusher.this, entries.size(), entries);
267 
268             if (buffers.isEmpty())
269             {
270                 releaseAggregate();
271                 // We may have the FLUSH_FRAME to notify.
272                 succeedEntries();
273                 return Action.IDLE;
274             }
275 
276             endpoint.write(this, buffers.toArray(new ByteBuffer[buffers.size()]));
277             buffers.clear();
278             return Action.SCHEDULED;
279         }
280 
281         private Action batch()
282         {
283             if (aggregate == null)
284             {
285                 aggregate = bufferPool.acquire(bufferSize, true);
286                 if (LOG.isDebugEnabled())
287                     LOG.debug("{} acquired aggregate buffer {}", FrameFlusher.this, aggregate);
288             }
289 
290             // Do not allocate the iterator here.
291             for (int i = 0; i < entries.size(); ++i)
292             {
293                 FrameEntry entry = entries.get(i);
294 
295                 entry.generateHeaderBytes(aggregate);
296 
297                 ByteBuffer payload = entry.frame.getPayload();
298                 if (BufferUtil.hasContent(payload))
299                     BufferUtil.append(aggregate, payload);
300             }
301             if (LOG.isDebugEnabled())
302                 LOG.debug("{} aggregated {} frames: {}", FrameFlusher.this, entries.size(), entries);
303             succeeded();
304             return Action.SCHEDULED;
305         }
306 
307         private void releaseAggregate()
308         {
309             if (aggregate != null && BufferUtil.isEmpty(aggregate))
310             {
311                 bufferPool.release(aggregate);
312                 aggregate = null;
313             }
314         }
315 
316         @Override
317         public void succeeded()
318         {
319             succeedEntries();
320             super.succeeded();
321         }
322 
323         private void succeedEntries()
324         {
325             // Do not allocate the iterator here.
326             for (int i = 0; i < entries.size(); ++i)
327             {
328                 FrameEntry entry = entries.get(i);
329                 notifyCallbackSuccess(entry.callback);
330                 entry.release();
331             }
332             entries.clear();
333         }
334 
335         @Override
336         protected void completed()
337         {
338             // This IteratingCallback never completes.
339         }
340 
341         @Override
342         public void failed(Throwable x)
343         {
344             for (FrameEntry entry : entries)
345             {
346                 notifyCallbackFailure(entry.callback, x);
347                 entry.release();
348             }
349             entries.clear();
350             super.failed(x);
351             failure = x;
352             onFailure(x);
353         }
354     }
355 
356     private class FrameEntry
357     {
358         private final Frame frame;
359         private final WriteCallback callback;
360         private final BatchMode batchMode;
361         private ByteBuffer headerBuffer;
362 
363         private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
364         {
365             this.frame = Objects.requireNonNull(frame);
366             this.callback = callback;
367             this.batchMode = batchMode;
368         }
369 
370         private ByteBuffer generateHeaderBytes()
371         {
372             return headerBuffer = generator.generateHeaderBytes(frame);
373         }
374 
375         private void generateHeaderBytes(ByteBuffer buffer)
376         {
377             generator.generateHeaderBytes(frame, buffer);
378         }
379 
380         private void release()
381         {
382             if (headerBuffer != null)
383             {
384                 generator.getBufferPool().release(headerBuffer);
385                 headerBuffer = null;
386             }
387         }
388 
389         @Override
390         public String toString()
391         {
392             return String.format("%s[%s,%s,%s,%s]", getClass().getSimpleName(), frame, callback, batchMode, failure);
393         }
394     }
395 }