View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.nio.ByteBuffer;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Objects;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  
28  import org.eclipse.jetty.io.ByteBufferPool;
29  import org.eclipse.jetty.io.EndPoint;
30  import org.eclipse.jetty.util.ArrayQueue;
31  import org.eclipse.jetty.util.BufferUtil;
32  import org.eclipse.jetty.util.IteratingCallback;
33  import org.eclipse.jetty.util.log.Log;
34  import org.eclipse.jetty.util.log.Logger;
35  import org.eclipse.jetty.websocket.api.BatchMode;
36  import org.eclipse.jetty.websocket.api.WriteCallback;
37  import org.eclipse.jetty.websocket.api.extensions.Frame;
38  import org.eclipse.jetty.websocket.common.Generator;
39  import org.eclipse.jetty.websocket.common.OpCode;
40  import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
41  
42  /**
43   * Interface for working with bytes destined for {@link EndPoint#write(org.eclipse.jetty.util.Callback, ByteBuffer...)}
44   */
45  public class FrameFlusher
46  {
47      private class Flusher extends IteratingCallback
48      {
49          private final List<FrameEntry> entries;
50          private final List<ByteBuffer> buffers;
51          private ByteBuffer aggregate;
52          private BatchMode batchMode;
53  
54          public Flusher(int maxGather)
55          {
56              entries = new ArrayList<>(maxGather);
57              buffers = new ArrayList<>((maxGather * 2) + 1);
58          }
59  
60          private Action batch()
61          {
62              if (aggregate == null)
63              {
64                  aggregate = bufferPool.acquire(bufferSize,true);
65                  if (LOG.isDebugEnabled())
66                  {
67                      LOG.debug("{} acquired aggregate buffer {}",FrameFlusher.this,aggregate);
68                  }
69              }
70  
71              // Do not allocate the iterator here.
72              for (int i = 0; i < entries.size(); ++i)
73              {
74                  FrameEntry entry = entries.get(i);
75  
76                  entry.generateHeaderBytes(aggregate);
77  
78                  ByteBuffer payload = entry.frame.getPayload();
79                  if (BufferUtil.hasContent(payload))
80                  {
81                      BufferUtil.append(aggregate,payload);
82                  }
83              }
84              if (LOG.isDebugEnabled())
85              {
86                  LOG.debug("{} aggregated {} frames: {}",FrameFlusher.this,entries.size(),entries);
87              }
88              succeeded();
89              return Action.SCHEDULED;
90          }
91  
92          @Override
93          protected void onCompleteSuccess()
94          {
95              // This IteratingCallback never completes.
96          }
97  
98          @Override
99          public void onCompleteFailure(Throwable x)
100         {
101             for (FrameEntry entry : entries)
102             {
103                 notifyCallbackFailure(entry.callback,x);
104                 entry.release();
105             }
106             entries.clear();
107             failure = x;
108             onFailure(x);
109         }
110 
111         private Action flush()
112         {
113             if (!BufferUtil.isEmpty(aggregate))
114             {
115                 buffers.add(aggregate);
116                 if (LOG.isDebugEnabled())
117                 {
118                     LOG.debug("{} flushing aggregate {}",FrameFlusher.this,aggregate);
119                 }
120             }
121 
122             // Do not allocate the iterator here.
123             for (int i = 0; i < entries.size(); ++i)
124             {
125                 FrameEntry entry = entries.get(i);
126                 // Skip the "synthetic" frame used for flushing.
127                 if (entry.frame == FLUSH_FRAME)
128                 {
129                     continue;
130                 }
131                 buffers.add(entry.generateHeaderBytes());
132                 ByteBuffer payload = entry.frame.getPayload();
133                 if (BufferUtil.hasContent(payload))
134                 {
135                     buffers.add(payload);
136                 }
137             }
138 
139             if (LOG.isDebugEnabled())
140             {
141                 LOG.debug("{} flushing {} frames: {}",FrameFlusher.this,entries.size(),entries);
142             }
143 
144             if (buffers.isEmpty())
145             {
146                 releaseAggregate();
147                 // We may have the FLUSH_FRAME to notify.
148                 succeedEntries();
149                 return Action.IDLE;
150             }
151 
152             endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
153             buffers.clear();
154             return Action.SCHEDULED;
155         }
156 
157         @Override
158         protected Action process() throws Exception
159         {
160             int space = aggregate == null?bufferSize:BufferUtil.space(aggregate);
161             BatchMode currentBatchMode = BatchMode.AUTO;
162             synchronized (lock)
163             {
164                 while ((entries.size() <= maxGather) && !queue.isEmpty())
165                 {
166                     FrameEntry entry = queue.remove(0);
167                     currentBatchMode = BatchMode.max(currentBatchMode,entry.batchMode);
168 
169                     // Force flush if we need to.
170                     if (entry.frame == FLUSH_FRAME)
171                     {
172                         currentBatchMode = BatchMode.OFF;
173                     }
174 
175                     int payloadLength = BufferUtil.length(entry.frame.getPayload());
176                     int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
177 
178                     // If it is a "big" frame, avoid copying into the aggregate buffer.
179                     if (approxFrameLength > (bufferSize >> 2))
180                     {
181                         currentBatchMode = BatchMode.OFF;
182                     }
183 
184                     // If the aggregate buffer overflows, do not batch.
185                     space -= approxFrameLength;
186                     if (space <= 0)
187                     {
188                         currentBatchMode = BatchMode.OFF;
189                     }
190 
191                     entries.add(entry);
192                 }
193             }
194 
195             if (LOG.isDebugEnabled())
196             {
197                 LOG.debug("{} processing {} entries: {}",FrameFlusher.this,entries.size(),entries);
198             }
199 
200             if (entries.isEmpty())
201             {
202                 if (batchMode != BatchMode.AUTO)
203                 {
204                     // Nothing more to do, release the aggregate buffer if we need to.
205                     // Releasing it here rather than in succeeded() allows for its reuse.
206                     releaseAggregate();
207                     return Action.IDLE;
208                 }
209 
210                 LOG.debug("{} auto flushing",FrameFlusher.this);
211                 return flush();
212             }
213 
214             batchMode = currentBatchMode;
215 
216             return currentBatchMode == BatchMode.OFF?flush():batch();
217         }
218 
219         private void releaseAggregate()
220         {
221             if ((aggregate != null) && BufferUtil.isEmpty(aggregate))
222             {
223                 bufferPool.release(aggregate);
224                 aggregate = null;
225             }
226         }
227 
228         @Override
229         public void succeeded()
230         {
231             succeedEntries();
232             super.succeeded();
233         }
234 
235         private void succeedEntries()
236         {
237             // Do not allocate the iterator here.
238             for (int i = 0; i < entries.size(); ++i)
239             {
240                 FrameEntry entry = entries.get(i);
241                 notifyCallbackSuccess(entry.callback);
242                 entry.release();
243             }
244             entries.clear();
245         }
246     }
247 
248     private class FrameEntry
249     {
250         private final Frame frame;
251         private final WriteCallback callback;
252         private final BatchMode batchMode;
253         private ByteBuffer headerBuffer;
254 
255         private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
256         {
257             this.frame = Objects.requireNonNull(frame);
258             this.callback = callback;
259             this.batchMode = batchMode;
260         }
261 
262         private ByteBuffer generateHeaderBytes()
263         {
264             return headerBuffer = generator.generateHeaderBytes(frame);
265         }
266 
267         private void generateHeaderBytes(ByteBuffer buffer)
268         {
269             generator.generateHeaderBytes(frame,buffer);
270         }
271 
272         private void release()
273         {
274             if (headerBuffer != null)
275             {
276                 generator.getBufferPool().release(headerBuffer);
277                 headerBuffer = null;
278             }
279         }
280 
281         @Override
282         public String toString()
283         {
284             return String.format("%s[%s,%s,%s,%s]",getClass().getSimpleName(),frame,callback,batchMode,failure);
285         }
286     }
287 
288     public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
289     private static final Logger LOG = Log.getLogger(FrameFlusher.class);
290     private final ByteBufferPool bufferPool;
291     private final EndPoint endpoint;
292     private final int bufferSize;
293     private final Generator generator;
294     private final int maxGather;
295     private final Object lock = new Object();
296     private final ArrayQueue<FrameEntry> queue = new ArrayQueue<>(16,16,lock);
297     private final Flusher flusher;
298     private final AtomicBoolean closed = new AtomicBoolean();
299     private volatile Throwable failure;
300 
301     public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
302     {
303         this.bufferPool = bufferPool;
304         this.endpoint = endpoint;
305         this.bufferSize = bufferSize;
306         this.generator = Objects.requireNonNull(generator);
307         this.maxGather = maxGather;
308         this.flusher = new Flusher(maxGather);
309     }
310 
311     public void close()
312     {
313         if (closed.compareAndSet(false,true))
314         {
315             LOG.debug("{} closing {}",this);
316             EOFException eof = new EOFException("Connection has been closed locally");
317             flusher.failed(eof);
318 
319             // Fail also queued entries.
320             List<FrameEntry> entries = new ArrayList<>();
321             synchronized (lock)
322             {
323                 entries.addAll(queue);
324                 queue.clear();
325             }
326             // Notify outside sync block.
327             for (FrameEntry entry : entries)
328             {
329                 notifyCallbackFailure(entry.callback,eof);
330             }
331         }
332     }
333 
334     public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
335     {
336         if (closed.get())
337         {
338             notifyCallbackFailure(callback,new EOFException("Connection has been closed locally"));
339             return;
340         }
341         if (flusher.isFailed())
342         {
343             notifyCallbackFailure(callback,failure);
344             return;
345         }
346 
347         FrameEntry entry = new FrameEntry(frame,callback,batchMode);
348 
349         synchronized (lock)
350         {
351             switch (frame.getOpCode())
352             {
353                 case OpCode.PING:
354                 {
355                     // Prepend PINGs so they are processed first.
356                     queue.add(0,entry);
357                     break;
358                 }
359                 case OpCode.CLOSE:
360                 {
361                     // There may be a chance that other frames are
362                     // added after this close frame, but we will
363                     // fail them later to keep it simple here.
364                     closed.set(true);
365                     queue.add(entry);
366                     break;
367                 }
368                 default:
369                 {
370                     queue.add(entry);
371                     break;
372                 }
373             }
374         }
375 
376         if (LOG.isDebugEnabled())
377         {
378             LOG.debug("{} queued {}",this,entry);
379         }
380 
381         flusher.iterate();
382     }
383 
384     protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
385     {
386         try
387         {
388             if (callback != null)
389             {
390                 callback.writeFailed(failure);
391             }
392         }
393         catch (Throwable x)
394         {
395             if (LOG.isDebugEnabled())
396                 LOG.debug("Exception while notifying failure of callback " + callback,x);
397         }
398     }
399 
400     protected void notifyCallbackSuccess(WriteCallback callback)
401     {
402         try
403         {
404             if (callback != null)
405             {
406                 callback.writeSuccess();
407             }
408         }
409         catch (Throwable x)
410         {
411             if (LOG.isDebugEnabled())
412                 LOG.debug("Exception while notifying success of callback " + callback,x);
413         }
414     }
415 
416     protected void onFailure(Throwable x)
417     {
418         LOG.warn(x);
419     }
420 
421     @Override
422     public String toString()
423     {
424         ByteBuffer aggregate = flusher.aggregate;
425         return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",getClass().getSimpleName(),queue.size(),aggregate == null?0:aggregate.position(),
426                 failure);
427     }
428 }