View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.LinkedList;
24  import java.util.Objects;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  
27  import org.eclipse.jetty.io.AbstractConnection;
28  import org.eclipse.jetty.io.EndPoint;
29  import org.eclipse.jetty.util.BufferUtil;
30  import org.eclipse.jetty.util.Callback;
31  import org.eclipse.jetty.util.log.Log;
32  import org.eclipse.jetty.util.log.Logger;
33  import org.eclipse.jetty.websocket.api.extensions.Frame;
34  import org.eclipse.jetty.websocket.common.Generator;
35  
36  /**
37   * Interface for working with bytes destined for {@link EndPoint#write(Callback, ByteBuffer...)}
38   */
39  public class WriteBytesProvider implements Callback
40  {
41      private class FrameEntry
42      {
43          protected final Frame frame;
44          protected final Callback callback;
45  
46          public FrameEntry(Frame frame, Callback callback)
47          {
48              this.frame = frame;
49              this.callback = callback;
50          }
51  
52          public ByteBuffer getByteBuffer()
53          {
54              ByteBuffer buffer = generator.generate(bufferSize,frame);
55              if (LOG.isDebugEnabled())
56              {
57                  LOG.debug("getByteBuffer() - {}",BufferUtil.toDetailString(buffer));
58              }
59              return buffer;
60          }
61      }
62  
63      private static final Logger LOG = Log.getLogger(WriteBytesProvider.class);
64  
65      /** The websocket generator */
66      private final Generator generator;
67      /** Flush callback, for notifying when a flush should be performed */
68      private final Callback flushCallback;
69      /** Backlog of frames */
70      private LinkedList<FrameEntry> queue;
71      /** the buffer input size */
72      private int bufferSize = 2048;
73      /** Currently active frame */
74      private FrameEntry active;
75      /** Failure state for the entire WriteBytesProvider */
76      private Throwable failure;
77      /** The last requested buffer */
78      private ByteBuffer buffer;
79      /** Is WriteBytesProvider closed to more WriteBytes being enqueued? */
80      private AtomicBoolean closed;
81  
82      /**
83       * Create a WriteBytesProvider with specified Generator and "flush" Callback.
84       * 
85       * @param generator
86       *            the generator to use for converting {@link Frame} objects to network {@link ByteBuffer}s
87       * @param flushCallback
88       *            the flush callback to call, on a write event, after the write event has been processed by this {@link WriteBytesProvider}.
89       *            <p>
90       *            Used to trigger another flush of the next set of bytes.
91       */
92      public WriteBytesProvider(Generator generator, Callback flushCallback)
93      {
94          this.generator = Objects.requireNonNull(generator);
95          this.flushCallback = Objects.requireNonNull(flushCallback);
96          this.queue = new LinkedList<>();
97          this.closed = new AtomicBoolean(false);
98      }
99  
100     public void enqueue(Frame frame, Callback callback)
101     {
102         Objects.requireNonNull(frame);
103         LOG.debug("enqueue({}, {})",frame,callback);
104         synchronized (this)
105         {
106             if (closed.get())
107             {
108                 // Closed for more frames.
109                 LOG.debug("Write is closed: {}",frame,callback);
110                 if (callback != null)
111                 {
112                     callback.failed(new IOException("Write is closed"));
113                 }
114                 return;
115             }
116 
117             if (isFailed())
118             {
119                 // no changes when failed
120                 notifyFailure(callback);
121                 return;
122             }
123 
124             FrameEntry entry = new FrameEntry(frame,callback);
125 
126             switch (frame.getType())
127             {
128                 case PING:
129                     queue.addFirst(entry);
130                     break;
131                 case CLOSE:
132                     closed.set(true);
133                     // drop the rest of the queue?
134                     queue.addLast(entry);
135                     break;
136                 default:
137                     queue.addLast(entry);
138             }
139         }
140     }
141 
142     public void failAll(Throwable t)
143     {
144         synchronized (this)
145         {
146             if (isFailed())
147             {
148                 // already failed.
149                 return;
150             }
151 
152             failure = t;
153 
154             for (FrameEntry fe : queue)
155             {
156                 notifyFailure(fe.callback);
157             }
158 
159             queue.clear();
160 
161             // notify flush callback
162             flushCallback.failed(failure);
163         }
164     }
165 
166     /**
167      * Write of ByteBuffer failed.
168      * 
169      * @param cause
170      *            the cause of the failure
171      */
172     @Override
173     public void failed(Throwable cause)
174     {
175         failAll(cause);
176     }
177 
178     public int getBufferSize()
179     {
180         return bufferSize;
181     }
182 
183     /**
184      * Get the next ByteBuffer to write.
185      * 
186      * @return the next ByteBuffer (or null if nothing to write)
187      */
188     public ByteBuffer getByteBuffer()
189     {
190         synchronized (this)
191         {
192             if (active == null)
193             {
194                 if (queue.isEmpty())
195                 {
196                     // nothing in queue
197                     return null;
198                 }
199                 // get current topmost entry
200                 active = queue.pop();
201             }
202 
203             if (active == null)
204             {
205                 // no active frame available, even in queue.
206                 return null;
207             }
208 
209             buffer = active.getByteBuffer();
210         }
211         return buffer;
212     }
213 
214     public Throwable getFailure()
215     {
216         return failure;
217     }
218 
219     /**
220      * Used to test for the final frame possible to be enqueued, the CLOSE frame.
221      * 
222      * @return true if close frame has been enqueued already.
223      */
224     public boolean isClosed()
225     {
226         synchronized (this)
227         {
228             return closed.get();
229         }
230     }
231 
232     public boolean isFailed()
233     {
234         return (failure != null);
235     }
236 
237     /**
238      * Notify specific callback of failure.
239      * 
240      * @param callback
241      *            the callback to notify
242      */
243     private void notifyFailure(Callback callback)
244     {
245         if (callback == null)
246         {
247             return;
248         }
249         callback.failed(failure);
250     }
251 
252     /**
253      * Set the buffer size used for generating ByteBuffers from the frames.
254      * <p>
255      * Value usually obtained from {@link AbstractConnection#getInputBufferSize()}
256      * 
257      * @param bufferSize
258      *            the buffer size to use
259      */
260     public void setBufferSize(int bufferSize)
261     {
262         this.bufferSize = bufferSize;
263     }
264 
265     /**
266      * Write of ByteBuffer succeeded.
267      */
268     @Override
269     public void succeeded()
270     {
271         synchronized (this)
272         {
273             // Release the active byte buffer first
274             generator.getBufferPool().release(buffer);
275 
276             if (active == null)
277             {
278                 return;
279             }
280 
281             if (active.frame.remaining() <= 0)
282             {
283                 // All done with active FrameEntry
284                 if (active.callback != null)
285                 {
286                     try
287                     {
288                         // TODO: should probably have callback invoked in new thread as part of scheduler
289                         // notify of success
290                         active.callback.succeeded();
291                     }
292                     catch (Throwable t)
293                     {
294                         LOG.warn("Callback failure",t);
295                     }
296                 }
297 
298                 // null it out
299                 active = null;
300             }
301 
302             // notify flush callback
303             flushCallback.succeeded();
304         }
305     }
306 
307     @Override
308     public String toString()
309     {
310         StringBuilder b = new StringBuilder();
311         b.append("WriteBytesProvider[");
312         b.append("flushCallback=").append(flushCallback);
313         if (isFailed())
314         {
315             b.append(",FAILURE=").append(failure.getClass().getName());
316             b.append(",").append(failure.getMessage());
317         }
318         else
319         {
320             b.append(",active=").append(active);
321             b.append(",queue.size=").append(queue.size());
322         }
323         b.append(']');
324         return b.toString();
325     }
326 }