View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.message;
20  
21  import java.io.IOException;
22  import java.io.OutputStream;
23  import java.nio.ByteBuffer;
24  import java.util.concurrent.ExecutionException;
25  
26  import org.eclipse.jetty.io.ByteBufferPool;
27  import org.eclipse.jetty.util.BufferUtil;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  import org.eclipse.jetty.websocket.api.WriteCallback;
31  import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
32  import org.eclipse.jetty.websocket.common.WebSocketSession;
33  import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
34  import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
35  
36  /**
37   * Support for writing a single WebSocket BINARY message via a {@link OutputStream}
38   */
39  public class MessageOutputStream extends OutputStream
40  {
41      private static final Logger LOG = Log.getLogger(MessageOutputStream.class);
42      private final OutgoingFrames outgoing;
43      private final ByteBufferPool bufferPool;
44      private long frameCount = 0;
45      private BinaryFrame frame;
46      private ByteBuffer buffer;
47      private FutureWriteCallback blocker;
48      private WriteCallback callback;
49      private boolean closed = false;
50  
51      public MessageOutputStream(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool)
52      {
53          this.outgoing = outgoing;
54          this.bufferPool = bufferPool;
55          this.buffer = bufferPool.acquire(bufferSize,true);
56          BufferUtil.flipToFill(buffer);
57          this.frame = new BinaryFrame();
58      }
59  
60      public MessageOutputStream(WebSocketSession session)
61      {
62          this(session.getOutgoingHandler(),session.getPolicy().getMaxBinaryMessageBufferSize(),session.getBufferPool());
63      }
64  
65      private void assertNotClosed() throws IOException
66      {
67          if (closed)
68          {
69              IOException e = new IOException("Stream is closed");
70              notifyFailure(e);
71              throw e;
72          }
73      }
74  
75      @Override
76      public synchronized void close() throws IOException
77      {
78          assertNotClosed();
79          LOG.debug("close()");
80  
81          // finish sending whatever in the buffer with FIN=true
82          flush(true);
83  
84          // close stream
85          LOG.debug("Sent Frame Count: {}",frameCount);
86          closed = true;
87          try
88          {
89              if (callback != null)
90              {
91                  callback.writeSuccess();
92              }
93              super.close();
94              bufferPool.release(buffer);
95              LOG.debug("closed");
96          }
97          catch (IOException e)
98          {
99              notifyFailure(e);
100             throw e;
101         }
102     }
103 
104     @Override
105     public synchronized void flush() throws IOException
106     {
107         LOG.debug("flush()");
108         assertNotClosed();
109 
110         // flush whatever is in the buffer with FIN=false
111         flush(false);
112         try
113         {
114             super.flush();
115             LOG.debug("flushed");
116         }
117         catch (IOException e)
118         {
119             notifyFailure(e);
120             throw e;
121         }
122     }
123 
124     /**
125      * Flush whatever is in the buffer.
126      * 
127      * @param fin
128      *            fin flag
129      * @throws IOException
130      */
131     private synchronized void flush(boolean fin) throws IOException
132     {
133         BufferUtil.flipToFlush(buffer,0);
134         LOG.debug("flush({}): {}",fin,BufferUtil.toDetailString(buffer));
135         frame.setPayload(buffer);
136         frame.setFin(fin);
137 
138         try
139         {
140             blocker = new FutureWriteCallback();
141             outgoing.outgoingFrame(frame,blocker);
142             try
143             {
144                 // block on write
145                 blocker.get();
146                 // block success
147                 frameCount++;
148                 frame.setIsContinuation();
149             }
150             catch (ExecutionException e)
151             {
152                 Throwable cause = e.getCause();
153                 if (cause != null)
154                 {
155                     if (cause instanceof IOException)
156                     {
157                         throw (IOException)cause;
158                     }
159                     else
160                     {
161                         throw new IOException(cause);
162                     }
163                 }
164                 throw new IOException("Failed to flush",e);
165             }
166             catch (InterruptedException e)
167             {
168                 throw new IOException("Failed to flush",e);
169             }
170         }
171         catch (IOException e)
172         {
173             notifyFailure(e);
174             throw e;
175         }
176     }
177 
178     private void notifyFailure(IOException e)
179     {
180         if (callback != null)
181         {
182             callback.writeFailed(e);
183         }
184     }
185 
186     public void setCallback(WriteCallback callback)
187     {
188         this.callback = callback;
189     }
190 
191     @Override
192     public synchronized void write(byte[] b) throws IOException
193     {
194         try
195         {
196             this.write(b,0,b.length);
197         }
198         catch (IOException e)
199         {
200             notifyFailure(e);
201             throw e;
202         }
203     }
204 
205     @Override
206     public synchronized void write(byte[] b, int off, int len) throws IOException
207     {
208         LOG.debug("write(byte[{}], {}, {})",b.length,off,len);
209         int left = len; // bytes left to write
210         int offset = off; // offset within provided array
211         while (left > 0)
212         {
213             if (LOG.isDebugEnabled())
214             {
215                 LOG.debug("buffer: {}",BufferUtil.toDetailString(buffer));
216             }
217             int space = buffer.remaining();
218             assert (space > 0);
219             int size = Math.min(space,left);
220             buffer.put(b,offset,size);
221             assert (size > 0);
222             left -= size; // decrement bytes left
223             if (left > 0)
224             {
225                 flush(false);
226             }
227             offset += size; // increment offset
228         }
229     }
230 
231     @Override
232     public synchronized void write(int b) throws IOException
233     {
234         assertNotClosed();
235 
236         // buffer up to limit, flush once buffer reached.
237         buffer.put((byte)b);
238         if (buffer.remaining() <= 0)
239         {
240             flush(false);
241         }
242     }
243 }