View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.nio.ByteBuffer;
23  import java.util.ArrayDeque;
24  import java.util.ArrayList;
25  import java.util.Deque;
26  import java.util.List;
27  import java.util.Objects;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  
30  import org.eclipse.jetty.io.ByteBufferPool;
31  import org.eclipse.jetty.io.EndPoint;
32  import org.eclipse.jetty.util.BufferUtil;
33  import org.eclipse.jetty.util.IteratingCallback;
34  import org.eclipse.jetty.util.log.Log;
35  import org.eclipse.jetty.util.log.Logger;
36  import org.eclipse.jetty.websocket.api.BatchMode;
37  import org.eclipse.jetty.websocket.api.WriteCallback;
38  import org.eclipse.jetty.websocket.api.extensions.Frame;
39  import org.eclipse.jetty.websocket.common.Generator;
40  import org.eclipse.jetty.websocket.common.OpCode;
41  import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
42  
43  /**
44   * Interface for working with bytes destined for {@link EndPoint#write(org.eclipse.jetty.util.Callback, ByteBuffer...)}
45   */
46  public class FrameFlusher
47  {
48      private class Flusher extends IteratingCallback
49      {
50          private final List<FrameEntry> entries;
51          private final List<ByteBuffer> buffers;
52          private ByteBuffer aggregate;
53          private BatchMode batchMode;
54  
55          public Flusher(int maxGather)
56          {
57              entries = new ArrayList<>(maxGather);
58              buffers = new ArrayList<>((maxGather * 2) + 1);
59          }
60  
61          private Action batch()
62          {
63              if (aggregate == null)
64              {
65                  aggregate = bufferPool.acquire(bufferSize,true);
66                  if (LOG.isDebugEnabled())
67                  {
68                      LOG.debug("{} acquired aggregate buffer {}",FrameFlusher.this,aggregate);
69                  }
70              }
71  
72              // Do not allocate the iterator here.
73              for (int i = 0; i < entries.size(); ++i)
74              {
75                  FrameEntry entry = entries.get(i);
76  
77                  entry.generateHeaderBytes(aggregate);
78  
79                  ByteBuffer payload = entry.frame.getPayload();
80                  if (BufferUtil.hasContent(payload))
81                  {
82                      BufferUtil.append(aggregate,payload);
83                  }
84              }
85              if (LOG.isDebugEnabled())
86              {
87                  LOG.debug("{} aggregated {} frames: {}",FrameFlusher.this,entries.size(),entries);
88              }
89              succeeded();
90              return Action.SCHEDULED;
91          }
92  
93          @Override
94          protected void onCompleteSuccess()
95          {
96              // This IteratingCallback never completes.
97          }
98  
99          @Override
100         public void onCompleteFailure(Throwable x)
101         {
102             for (FrameEntry entry : entries)
103             {
104                 notifyCallbackFailure(entry.callback,x);
105                 entry.release();
106             }
107             entries.clear();
108             failure = x;
109             onFailure(x);
110         }
111 
112         private Action flush()
113         {
114             if (!BufferUtil.isEmpty(aggregate))
115             {
116                 buffers.add(aggregate);
117                 if (LOG.isDebugEnabled())
118                 {
119                     LOG.debug("{} flushing aggregate {}",FrameFlusher.this,aggregate);
120                 }
121             }
122 
123             // Do not allocate the iterator here.
124             for (int i = 0; i < entries.size(); ++i)
125             {
126                 FrameEntry entry = entries.get(i);
127                 // Skip the "synthetic" frame used for flushing.
128                 if (entry.frame == FLUSH_FRAME)
129                 {
130                     continue;
131                 }
132                 buffers.add(entry.generateHeaderBytes());
133                 ByteBuffer payload = entry.frame.getPayload();
134                 if (BufferUtil.hasContent(payload))
135                 {
136                     buffers.add(payload);
137                 }
138             }
139 
140             if (LOG.isDebugEnabled())
141             {
142                 LOG.debug("{} flushing {} frames: {}",FrameFlusher.this,entries.size(),entries);
143             }
144 
145             if (buffers.isEmpty())
146             {
147                 releaseAggregate();
148                 // We may have the FLUSH_FRAME to notify.
149                 succeedEntries();
150                 return Action.IDLE;
151             }
152 
153             endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
154             buffers.clear();
155             return Action.SCHEDULED;
156         }
157 
158         @Override
159         protected Action process() throws Exception
160         {
161             int space = aggregate == null?bufferSize:BufferUtil.space(aggregate);
162             BatchMode currentBatchMode = BatchMode.AUTO;
163             synchronized (lock)
164             {
165                 while ((entries.size() <= maxGather) && !queue.isEmpty())
166                 {
167                     FrameEntry entry = queue.poll();
168                     currentBatchMode = BatchMode.max(currentBatchMode,entry.batchMode);
169 
170                     // Force flush if we need to.
171                     if (entry.frame == FLUSH_FRAME)
172                     {
173                         currentBatchMode = BatchMode.OFF;
174                     }
175 
176                     int payloadLength = BufferUtil.length(entry.frame.getPayload());
177                     int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
178 
179                     // If it is a "big" frame, avoid copying into the aggregate buffer.
180                     if (approxFrameLength > (bufferSize >> 2))
181                     {
182                         currentBatchMode = BatchMode.OFF;
183                     }
184 
185                     // If the aggregate buffer overflows, do not batch.
186                     space -= approxFrameLength;
187                     if (space <= 0)
188                     {
189                         currentBatchMode = BatchMode.OFF;
190                     }
191 
192                     entries.add(entry);
193                 }
194             }
195 
196             if (LOG.isDebugEnabled())
197             {
198                 LOG.debug("{} processing {} entries: {}",FrameFlusher.this,entries.size(),entries);
199             }
200 
201             if (entries.isEmpty())
202             {
203                 if (batchMode != BatchMode.AUTO)
204                 {
205                     // Nothing more to do, release the aggregate buffer if we need to.
206                     // Releasing it here rather than in succeeded() allows for its reuse.
207                     releaseAggregate();
208                     return Action.IDLE;
209                 }
210 
211                 LOG.debug("{} auto flushing",FrameFlusher.this);
212                 return flush();
213             }
214 
215             batchMode = currentBatchMode;
216 
217             return currentBatchMode == BatchMode.OFF?flush():batch();
218         }
219 
220         private void releaseAggregate()
221         {
222             if ((aggregate != null) && BufferUtil.isEmpty(aggregate))
223             {
224                 bufferPool.release(aggregate);
225                 aggregate = null;
226             }
227         }
228 
229         @Override
230         public void succeeded()
231         {
232             succeedEntries();
233             super.succeeded();
234         }
235 
236         private void succeedEntries()
237         {
238             // Do not allocate the iterator here.
239             for (int i = 0; i < entries.size(); ++i)
240             {
241                 FrameEntry entry = entries.get(i);
242                 notifyCallbackSuccess(entry.callback);
243                 entry.release();
244             }
245             entries.clear();
246         }
247     }
248 
249     private class FrameEntry
250     {
251         private final Frame frame;
252         private final WriteCallback callback;
253         private final BatchMode batchMode;
254         private ByteBuffer headerBuffer;
255 
256         private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
257         {
258             this.frame = Objects.requireNonNull(frame);
259             this.callback = callback;
260             this.batchMode = batchMode;
261         }
262 
263         private ByteBuffer generateHeaderBytes()
264         {
265             return headerBuffer = generator.generateHeaderBytes(frame);
266         }
267 
268         private void generateHeaderBytes(ByteBuffer buffer)
269         {
270             generator.generateHeaderBytes(frame,buffer);
271         }
272 
273         private void release()
274         {
275             if (headerBuffer != null)
276             {
277                 generator.getBufferPool().release(headerBuffer);
278                 headerBuffer = null;
279             }
280         }
281 
282         @Override
283         public String toString()
284         {
285             return String.format("%s[%s,%s,%s,%s]",getClass().getSimpleName(),frame,callback,batchMode,failure);
286         }
287     }
288 
289     public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
290     private static final Logger LOG = Log.getLogger(FrameFlusher.class);
291     private final ByteBufferPool bufferPool;
292     private final EndPoint endpoint;
293     private final int bufferSize;
294     private final Generator generator;
295     private final int maxGather;
296     private final Object lock = new Object();
297     private final Deque<FrameEntry> queue = new ArrayDeque<>();
298     private final Flusher flusher;
299     private final AtomicBoolean closed = new AtomicBoolean();
300     private volatile Throwable failure;
301 
302     public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
303     {
304         this.bufferPool = bufferPool;
305         this.endpoint = endpoint;
306         this.bufferSize = bufferSize;
307         this.generator = Objects.requireNonNull(generator);
308         this.maxGather = maxGather;
309         this.flusher = new Flusher(maxGather);
310     }
311 
312     public void close()
313     {
314         if (closed.compareAndSet(false,true))
315         {
316             LOG.debug("{} closing {}",this);
317             EOFException eof = new EOFException("Connection has been closed locally");
318             flusher.failed(eof);
319 
320             // Fail also queued entries.
321             List<FrameEntry> entries = new ArrayList<>();
322             synchronized (lock)
323             {
324                 entries.addAll(queue);
325                 queue.clear();
326             }
327             // Notify outside sync block.
328             for (FrameEntry entry : entries)
329             {
330                 notifyCallbackFailure(entry.callback,eof);
331             }
332         }
333     }
334 
335     public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
336     {
337         if (closed.get())
338         {
339             notifyCallbackFailure(callback,new EOFException("Connection has been closed locally"));
340             return;
341         }
342         if (flusher.isFailed())
343         {
344             notifyCallbackFailure(callback,failure);
345             return;
346         }
347 
348         FrameEntry entry = new FrameEntry(frame,callback,batchMode);
349 
350         synchronized (lock)
351         {
352             switch (frame.getOpCode())
353             {
354                 case OpCode.PING:
355                 {
356                     // Prepend PINGs so they are processed first.
357                     queue.offerFirst(entry);
358                     break;
359                 }
360                 case OpCode.CLOSE:
361                 {
362                     // There may be a chance that other frames are
363                     // added after this close frame, but we will
364                     // fail them later to keep it simple here.
365                     closed.set(true);
366                     queue.offer(entry);
367                     break;
368                 }
369                 default:
370                 {
371                     queue.offer(entry);
372                     break;
373                 }
374             }
375         }
376 
377         if (LOG.isDebugEnabled())
378         {
379             LOG.debug("{} queued {}",this,entry);
380         }
381 
382         flusher.iterate();
383     }
384 
385     protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
386     {
387         try
388         {
389             if (callback != null)
390             {
391                 callback.writeFailed(failure);
392             }
393         }
394         catch (Throwable x)
395         {
396             if (LOG.isDebugEnabled())
397                 LOG.debug("Exception while notifying failure of callback " + callback,x);
398         }
399     }
400 
401     protected void notifyCallbackSuccess(WriteCallback callback)
402     {
403         try
404         {
405             if (callback != null)
406             {
407                 callback.writeSuccess();
408             }
409         }
410         catch (Throwable x)
411         {
412             if (LOG.isDebugEnabled())
413                 LOG.debug("Exception while notifying success of callback " + callback,x);
414         }
415     }
416 
417     protected void onFailure(Throwable x)
418     {
419         LOG.warn(x);
420     }
421 
422     @Override
423     public String toString()
424     {
425         ByteBuffer aggregate = flusher.aggregate;
426         return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",getClass().getSimpleName(),queue.size(),aggregate == null?0:aggregate.position(),
427                 failure);
428     }
429 }