View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Objects;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  
30  import org.eclipse.jetty.io.AbstractConnection;
31  import org.eclipse.jetty.io.EndPoint;
32  import org.eclipse.jetty.util.Callback;
33  import org.eclipse.jetty.util.log.Log;
34  import org.eclipse.jetty.util.log.Logger;
35  import org.eclipse.jetty.websocket.api.extensions.Frame;
36  import org.eclipse.jetty.websocket.common.Generator;
37  import org.eclipse.jetty.websocket.common.OpCode;
38  import org.eclipse.jetty.websocket.common.frames.DataFrame;
39  
40  /**
41   * Interface for working with bytes destined for {@link EndPoint#write(Callback, ByteBuffer...)}
42   */
43  public class WriteBytesProvider implements Callback
44  {
45      private class FrameEntry
46      {
47          protected final AtomicBoolean failed = new AtomicBoolean(false);
48          protected final Frame frame;
49          protected final Callback callback;
50          /** holds reference to header ByteBuffer, as it needs to be released on success/failure */
51          private ByteBuffer headerBuffer;
52  
53          public FrameEntry(Frame frame, Callback callback)
54          {
55              this.frame = frame;
56              this.callback = callback;
57          }
58  
59          public ByteBuffer getHeaderBytes()
60          {
61              ByteBuffer buf = generator.generateHeaderBytes(frame);
62              headerBuffer = buf;
63              return buf;
64          }
65  
66          public ByteBuffer getPayloadWindow()
67          {
68              // There is no need to release this ByteBuffer, as it is just a slice of the user provided payload
69              return generator.getPayloadWindow(bufferSize,frame);
70          }
71  
72          public void notifyFailure(Throwable t)
73          {
74              freeBuffers();
75              if (failed.getAndSet(true) == false)
76              {
77                  notifySafeFailure(callback,t);
78              }
79          }
80  
81          public void notifySucceeded()
82          {
83              freeBuffers();
84              if (callback == null)
85              {
86                  return;
87              }
88              try
89              {
90                  callback.succeeded();
91              }
92              catch (Throwable t)
93              {
94                  LOG.debug(t);
95              }
96          }
97  
98          public void freeBuffers()
99          {
100             if (headerBuffer != null)
101             {
102                 generator.getBufferPool().release(headerBuffer);
103                 headerBuffer = null;
104             }
105             releasePayloadBuffer(frame);
106         }
107 
108         /**
109          * Indicate that the frame entry is done generating
110          */
111         public boolean isDone()
112         {
113             return frame.remaining() <= 0;
114         }
115     }
116 
117     private static final Logger LOG = Log.getLogger(WriteBytesProvider.class);
118 
119     /** The websocket generator */
120     private final Generator generator;
121     /** Flush callback, for notifying when a flush should be performed */
122     private final Callback flushCallback;
123     /** Backlog of frames */
124     private LinkedList<FrameEntry> queue;
125     /** the buffer input size */
126     private int bufferSize = 2048;
127     /** the gathered write bytebuffer array limit */
128     private int gatheredBufferLimit = 10;
129     /** Past Frames, not yet notified (from gathered generation/write) */
130     private LinkedList<FrameEntry> past;
131     /** Currently active frame */
132     private FrameEntry active;
133     /** Tracking for failure */
134     private Throwable failure;
135     /** Is WriteBytesProvider closed to more WriteBytes being enqueued? */
136     private AtomicBoolean closed;
137 
138     /**
139      * Create a WriteBytesProvider with specified Generator and "flush" Callback.
140      * 
141      * @param generator
142      *            the generator to use for converting {@link Frame} objects to network {@link ByteBuffer}s
143      * @param flushCallback
144      *            the flush callback to call, on a write event, after the write event has been processed by this {@link WriteBytesProvider}.
145      *            <p>
146      *            Used to trigger another flush of the next set of bytes.
147      */
148     public WriteBytesProvider(Generator generator, Callback flushCallback)
149     {
150         this.generator = Objects.requireNonNull(generator);
151         this.flushCallback = Objects.requireNonNull(flushCallback);
152         this.queue = new LinkedList<>();
153         this.past = new LinkedList<>();
154         this.closed = new AtomicBoolean(false);
155     }
156 
157     /**
158      * Force closure of write bytes
159      */
160     public void close()
161     {
162         LOG.debug(".close()");
163         // Set queue closed, no new enqueue allowed.
164         this.closed.set(true);
165         // flush out backlog in queue
166         failAll(new EOFException("Connection has been disconnected"));
167     }
168 
169     public void enqueue(Frame frame, Callback callback)
170     {
171         Objects.requireNonNull(frame);
172         LOG.debug("enqueue({}, {})",frame,callback);
173         synchronized (this)
174         {
175             if (closed.get())
176             {
177                 // Closed for more frames.
178                 LOG.debug("Write is closed: {} {}",frame,callback);
179                 if (callback != null)
180                 {
181                     callback.failed(new IOException("Write is closed"));
182                 }
183                 return;
184             }
185 
186             if (failure != null)
187             {
188                 // no changes when failed
189                 LOG.debug("Write is in failure: {} {}",frame,callback);
190                 notifySafeFailure(callback,failure);
191                 return;
192             }
193 
194             FrameEntry entry = new FrameEntry(frame,callback);
195 
196             switch (frame.getOpCode())
197             {
198                 case OpCode.PING:
199                     queue.addFirst(entry);
200                     break;
201                 case OpCode.CLOSE:
202                     closed.set(true);
203                     // drop the rest of the queue?
204                     queue.addLast(entry);
205                     break;
206                 default:
207                     queue.addLast(entry);
208             }
209         }
210     }
211 
212     public void failAll(Throwable t)
213     {
214         // Collect entries for callback
215         List<FrameEntry> callbacks = new ArrayList<>();
216 
217         synchronized (this)
218         {
219             // fail active (if set)
220             if (active != null)
221             {
222                 FrameEntry entry = active;
223                 active = null;
224                 callbacks.add(entry);
225             }
226 
227             callbacks.addAll(past);
228             callbacks.addAll(queue);
229 
230             past.clear();
231             queue.clear();
232         }
233 
234         // notify flush callback
235         if (!callbacks.isEmpty())
236         {
237             // TODO: always notify instead?
238             flushCallback.failed(t);
239 
240             // notify entry callbacks
241             for (FrameEntry entry : callbacks)
242             {
243                 entry.notifyFailure(t);
244             }
245         }
246     }
247 
248     /**
249      * Callback failure.
250      * <p>
251      * Conditions: for Endpoint.write() failure.
252      * 
253      * @param cause
254      *            the cause of the failure
255      */
256     @Override
257     public void failed(Throwable cause)
258     {
259         failAll(cause);
260     }
261 
262     public int getBufferSize()
263     {
264         return bufferSize;
265     }
266 
267     /**
268      * Get the next set of ByteBuffers to write.
269      * 
270      * @return the next set of ByteBuffers to write
271      */
272     public List<ByteBuffer> getByteBuffers()
273     {
274         List<ByteBuffer> bufs = null;
275         int count = 0;
276         synchronized (this)
277         {
278             for (; count < gatheredBufferLimit; count++)
279             {
280                 if (active == null)
281                 {
282                     if (queue.isEmpty())
283                     {
284                         // nothing in queue
285                         return bufs;
286                     }
287 
288                     // get current topmost entry
289                     active = queue.pop();
290 
291                     // generate header
292                     if (bufs == null)
293                     {
294                         bufs = new ArrayList<>();
295                     }
296                     bufs.add(active.getHeaderBytes());
297                     count++;
298                 }
299 
300                 // collect payload window
301                 if (bufs == null)
302                 {
303                     bufs = new ArrayList<>();
304                 }
305                 bufs.add(active.getPayloadWindow());
306                 if (active.isDone())
307                 {
308                     past.add(active);
309                     active = null;
310                 }
311             }
312         }
313 
314         LOG.debug("Collected {} ByteBuffers",bufs.size());
315         return bufs;
316     }
317 
318     /**
319      * Used to test for the final frame possible to be enqueued, the CLOSE frame.
320      * 
321      * @return true if close frame has been enqueued already.
322      */
323     public boolean isClosed()
324     {
325         synchronized (this)
326         {
327             return closed.get();
328         }
329     }
330 
331     private void notifySafeFailure(Callback callback, Throwable t)
332     {
333         if (callback == null)
334         {
335             return;
336         }
337         try
338         {
339             callback.failed(t);
340         }
341         catch (Throwable e)
342         {
343             LOG.warn("Uncaught exception",e);
344         }
345     }
346 
347     public void releasePayloadBuffer(Frame frame)
348     {
349         if (!frame.hasPayload())
350         {
351             return;
352         }
353 
354         if (frame instanceof DataFrame)
355         {
356             DataFrame data = (DataFrame)frame;
357             if (data.isPooledBuffer())
358             {
359                 ByteBuffer payload = frame.getPayload();
360                 generator.getBufferPool().release(payload);
361             }
362         }
363     }
364 
365     /**
366      * Set the buffer size used for generating ByteBuffers from the frames.
367      * <p>
368      * Value usually obtained from {@link AbstractConnection#getInputBufferSize()}
369      * 
370      * @param bufferSize
371      *            the buffer size to use
372      */
373     public void setBufferSize(int bufferSize)
374     {
375         this.bufferSize = bufferSize;
376     }
377 
378     /**
379      * Write of ByteBuffer succeeded.
380      */
381     @Override
382     public void succeeded()
383     {
384         // Collect entries for callback
385         List<FrameEntry> callbacks = new ArrayList<>();
386 
387         synchronized (this)
388         {
389             if ((active != null) && (active.frame.remaining() <= 0))
390             {
391                 // All done with active FrameEntry
392                 FrameEntry entry = active;
393                 active = null;
394                 callbacks.add(entry);
395             }
396 
397             callbacks.addAll(past);
398             past.clear();
399         }
400 
401         // notify flush callback
402         flushCallback.succeeded();
403 
404         // notify entry callbacks outside of synchronize
405         for (FrameEntry entry : callbacks)
406         {
407             entry.notifySucceeded();
408         }
409     }
410 
411     @Override
412     public String toString()
413     {
414         StringBuilder b = new StringBuilder();
415         b.append("WriteBytesProvider[");
416         b.append("flushCallback=").append(flushCallback);
417         if (failure != null)
418         {
419             b.append(",failure=").append(failure.getClass().getName());
420             b.append(":").append(failure.getMessage());
421         }
422         else
423         {
424             b.append(",active=").append(active);
425             b.append(",queue.size=").append(queue.size());
426             b.append(",past.size=").append(past.size());
427         }
428         b.append(']');
429         return b.toString();
430     }
431 }