View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.util.LinkedList;
25  import java.util.Objects;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  
28  import org.eclipse.jetty.io.AbstractConnection;
29  import org.eclipse.jetty.io.EndPoint;
30  import org.eclipse.jetty.util.BufferUtil;
31  import org.eclipse.jetty.util.Callback;
32  import org.eclipse.jetty.util.log.Log;
33  import org.eclipse.jetty.util.log.Logger;
34  import org.eclipse.jetty.websocket.api.extensions.Frame;
35  import org.eclipse.jetty.websocket.common.Generator;
36  
37  /**
38   * Interface for working with bytes destined for {@link EndPoint#write(Callback, ByteBuffer...)}
39   */
40  public class WriteBytesProvider implements Callback
41  {
42      private class FrameEntry
43      {
44          protected final AtomicBoolean failed = new AtomicBoolean(false);
45          protected final Frame frame;
46          protected final Callback callback;
47  
48          public FrameEntry(Frame frame, Callback callback)
49          {
50              this.frame = frame;
51              this.callback = callback;
52          }
53  
54          public ByteBuffer getByteBuffer()
55          {
56              ByteBuffer buffer = generator.generate(bufferSize,frame);
57              if (LOG.isDebugEnabled())
58              {
59                  LOG.debug("getByteBuffer() - {}",BufferUtil.toDetailString(buffer));
60              }
61              return buffer;
62          }
63  
64          public void notifyFailure(Throwable t)
65          {
66              if (failed.getAndSet(true) == false)
67              {
68                  notifySafeFailure(callback,t);
69              }
70          }
71      }
72  
73      private static final Logger LOG = Log.getLogger(WriteBytesProvider.class);
74  
75      /** The websocket generator */
76      private final Generator generator;
77      /** Flush callback, for notifying when a flush should be performed */
78      private final Callback flushCallback;
79      /** Backlog of frames */
80      private LinkedList<FrameEntry> queue;
81      /** the buffer input size */
82      private int bufferSize = 2048;
83      /** Currently active frame */
84      private FrameEntry active;
85      /** Tracking for failure */
86      private Throwable failure;
87      /** The last requested buffer */
88      private ByteBuffer buffer;
89      /** Is WriteBytesProvider closed to more WriteBytes being enqueued? */
90      private AtomicBoolean closed;
91  
92      /**
93       * Create a WriteBytesProvider with specified Generator and "flush" Callback.
94       * 
95       * @param generator
96       *            the generator to use for converting {@link Frame} objects to network {@link ByteBuffer}s
97       * @param flushCallback
98       *            the flush callback to call, on a write event, after the write event has been processed by this {@link WriteBytesProvider}.
99       *            <p>
100      *            Used to trigger another flush of the next set of bytes.
101      */
102     public WriteBytesProvider(Generator generator, Callback flushCallback)
103     {
104         this.generator = Objects.requireNonNull(generator);
105         this.flushCallback = Objects.requireNonNull(flushCallback);
106         this.queue = new LinkedList<>();
107         this.closed = new AtomicBoolean(false);
108     }
109 
110     /**
111      * Force closure of write bytes
112      */
113     public void close()
114     {
115         // Set queue closed, no new enqueue allowed.
116         this.closed.set(true);
117         // flush out backlog in queue
118         failAll(new EOFException("Connection has been disconnected"));
119     }
120 
121     public void enqueue(Frame frame, Callback callback)
122     {
123         Objects.requireNonNull(frame);
124         LOG.debug("enqueue({}, {})",frame,callback);
125         synchronized (this)
126         {
127             if (closed.get())
128             {
129                 // Closed for more frames.
130                 LOG.debug("Write is closed: {} {}",frame,callback);
131                 if (callback != null)
132                 {
133                     callback.failed(new IOException("Write is closed"));
134                 }
135                 return;
136             }
137 
138             if (failure != null)
139             {
140                 // no changes when failed
141                 LOG.debug("Write is in failure: {} {}",frame,callback);
142                 notifySafeFailure(callback,failure);
143                 return;
144             }
145 
146             FrameEntry entry = new FrameEntry(frame,callback);
147 
148             switch (frame.getType())
149             {
150                 case PING:
151                     queue.addFirst(entry);
152                     break;
153                 case CLOSE:
154                     closed.set(true);
155                     // drop the rest of the queue?
156                     queue.addLast(entry);
157                     break;
158                 default:
159                     queue.addLast(entry);
160             }
161         }
162     }
163 
164     public void failAll(Throwable t)
165     {
166         synchronized (this)
167         {
168             boolean notified = false;
169 
170             // fail active (if set)
171             if (active != null)
172             {
173                 active.notifyFailure(t);
174                 notified = true;
175             }
176 
177             failure = t;
178 
179             // fail others
180             for (FrameEntry fe : queue)
181             {
182                 fe.notifyFailure(t);
183                 notified = true;
184             }
185 
186             queue.clear();
187 
188             if (notified)
189             {
190                 // notify flush callback
191                 flushCallback.failed(t);
192             }
193         }
194     }
195 
196     /**
197      * Callback failure.
198      * <p>
199      * Conditions: for Endpoint.write() failure.
200      * 
201      * @param cause
202      *            the cause of the failure
203      */
204     @Override
205     public void failed(Throwable cause)
206     {
207         failAll(cause);
208     }
209 
210     public int getBufferSize()
211     {
212         return bufferSize;
213     }
214 
215     /**
216      * Get the next ByteBuffer to write.
217      * 
218      * @return the next ByteBuffer (or null if nothing to write)
219      */
220     public ByteBuffer getByteBuffer()
221     {
222         synchronized (this)
223         {
224             if (active == null)
225             {
226                 if (queue.isEmpty())
227                 {
228                     // nothing in queue
229                     return null;
230                 }
231                 // get current topmost entry
232                 active = queue.pop();
233             }
234 
235             if (active == null)
236             {
237                 // no active frame available, even in queue.
238                 return null;
239             }
240 
241             buffer = active.getByteBuffer();
242         }
243         return buffer;
244     }
245 
246     /**
247      * Used to test for the final frame possible to be enqueued, the CLOSE frame.
248      * 
249      * @return true if close frame has been enqueued already.
250      */
251     public boolean isClosed()
252     {
253         synchronized (this)
254         {
255             return closed.get();
256         }
257     }
258 
259     private void notifySafeFailure(Callback callback, Throwable t)
260     {
261         if (callback == null)
262         {
263             return;
264         }
265         try
266         {
267             callback.failed(t);
268         }
269         catch (Throwable e)
270         {
271             LOG.warn("Uncaught exception",e);
272         }
273     }
274 
275     /**
276      * Set the buffer size used for generating ByteBuffers from the frames.
277      * <p>
278      * Value usually obtained from {@link AbstractConnection#getInputBufferSize()}
279      * 
280      * @param bufferSize
281      *            the buffer size to use
282      */
283     public void setBufferSize(int bufferSize)
284     {
285         this.bufferSize = bufferSize;
286     }
287 
288     /**
289      * Write of ByteBuffer succeeded.
290      */
291     @Override
292     public void succeeded()
293     {
294         Callback successCallback = null;
295 
296         synchronized (this)
297         {
298             // Release the active byte buffer first
299             generator.getBufferPool().release(buffer);
300 
301             if (active == null)
302             {
303                 return;
304             }
305 
306             if (active.frame.remaining() <= 0)
307             {
308                 // All done with active FrameEntry
309                 successCallback = active.callback;
310                 // Forget active
311                 active = null;
312             }
313 
314             // notify flush callback
315             flushCallback.succeeded();
316         }
317 
318         // Notify success (outside of synchronize lock)
319         if (successCallback != null)
320         {
321             try
322             {
323                 // notify of success
324                 successCallback.succeeded();
325             }
326             catch (Throwable t)
327             {
328                 LOG.warn("Callback failure",t);
329             }
330         }
331     }
332 
333     @Override
334     public String toString()
335     {
336         StringBuilder b = new StringBuilder();
337         b.append("WriteBytesProvider[");
338         b.append("flushCallback=").append(flushCallback);
339         if (failure != null)
340         {
341             b.append(",failure=").append(failure.getClass().getName());
342             b.append(":").append(failure.getMessage());
343         }
344         else
345         {
346             b.append(",active=").append(active);
347             b.append(",queue.size=").append(queue.size());
348         }
349         b.append(']');
350         return b.toString();
351     }
352 }