View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.message;
20  
21  import java.io.IOException;
22  import java.io.OutputStream;
23  import java.nio.ByteBuffer;
24  
25  import org.eclipse.jetty.io.ByteBufferPool;
26  import org.eclipse.jetty.util.BufferUtil;
27  import org.eclipse.jetty.util.log.Log;
28  import org.eclipse.jetty.util.log.Logger;
29  import org.eclipse.jetty.websocket.api.BatchMode;
30  import org.eclipse.jetty.websocket.api.WriteCallback;
31  import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
32  import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
33  import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
34  import org.eclipse.jetty.websocket.common.WebSocketSession;
35  import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
36  
37  /**
38   * Support for writing a single WebSocket BINARY message via a {@link OutputStream}
39   */
40  public class MessageOutputStream extends OutputStream
41  {
42      private static final Logger LOG = Log.getLogger(MessageOutputStream.class);
43  
44      private final OutgoingFrames outgoing;
45      private final ByteBufferPool bufferPool;
46      private final BlockingWriteCallback blocker;
47      private long frameCount;
48      private BinaryFrame frame;
49      private ByteBuffer buffer;
50      private WriteCallback callback;
51      private boolean closed;
52  
53      public MessageOutputStream(WebSocketSession session)
54      {
55          this(session.getOutgoingHandler(), session.getPolicy().getMaxBinaryMessageBufferSize(), session.getBufferPool());
56      }
57  
58      public MessageOutputStream(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool)
59      {
60          this.outgoing = outgoing;
61          this.bufferPool = bufferPool;
62          this.blocker = new BlockingWriteCallback();
63          this.buffer = bufferPool.acquire(bufferSize, true);
64          BufferUtil.flipToFill(buffer);
65          this.frame = new BinaryFrame();
66      }
67  
68      @Override
69      public void write(byte[] bytes, int off, int len) throws IOException
70      {
71          try
72          {
73              send(bytes, off, len);
74          }
75          catch (Throwable x)
76          {
77              // Notify without holding locks.
78              notifyFailure(x);
79              throw x;
80          }
81      }
82  
83      @Override
84      public void write(int b) throws IOException
85      {
86          try
87          {
88              send(new byte[]{(byte)b}, 0, 1);
89          }
90          catch (Throwable x)
91          {
92              // Notify without holding locks.
93              notifyFailure(x);
94              throw x;
95          }
96      }
97  
98      @Override
99      public void flush() throws IOException
100     {
101         try
102         {
103             flush(false);
104         }
105         catch (Throwable x)
106         {
107             // Notify without holding locks.
108             notifyFailure(x);
109             throw x;
110         }
111     }
112 
113     @Override
114     public void close() throws IOException
115     {
116         try
117         {
118             flush(true);
119             bufferPool.release(buffer);
120             if (LOG.isDebugEnabled())
121                 LOG.debug("Stream closed, {} frames sent", frameCount);
122             // Notify without holding locks.
123             notifySuccess();
124         }
125         catch (Throwable x)
126         {
127             // Notify without holding locks.
128             notifyFailure(x);
129             throw x;
130         }
131     }
132 
133     private void flush(boolean fin) throws IOException
134     {
135         synchronized (this)
136         {
137             if (closed)
138                 throw new IOException("Stream is closed");
139 
140             closed = fin;
141 
142             BufferUtil.flipToFlush(buffer, 0);
143             if (LOG.isDebugEnabled())
144                 LOG.debug("flush({}): {}", fin, BufferUtil.toDetailString(buffer));
145             frame.setPayload(buffer);
146             frame.setFin(fin);
147 
148             try(WriteBlocker b=blocker.acquireWriteBlocker())
149             {
150                 outgoing.outgoingFrame(frame, b, BatchMode.OFF);
151                 b.block();
152             }
153 
154             ++frameCount;
155             // Any flush after the first will be a CONTINUATION frame.
156             frame.setIsContinuation();
157 
158             BufferUtil.flipToFill(buffer);
159         }
160     }
161 
162     private void send(byte[] bytes, int offset, int length) throws IOException
163     {
164         synchronized (this)
165         {
166             if (closed)
167                 throw new IOException("Stream is closed");
168 
169             while (length > 0)
170             {
171                 // There may be no space available, we want
172                 // to handle correctly when space == 0.
173                 int space = buffer.remaining();
174                 int size = Math.min(space, length);
175                 buffer.put(bytes, offset, size);
176                 offset += size;
177                 length -= size;
178                 if (length > 0)
179                 {
180                     // If we could not write everything, it means
181                     // that the buffer was full, so flush it.
182                     flush(false);
183                 }
184             }
185         }
186     }
187 
188     public void setCallback(WriteCallback callback)
189     {
190         synchronized (this)
191         {
192             this.callback = callback;
193         }
194     }
195 
196     private void notifySuccess()
197     {
198         WriteCallback callback;
199         synchronized (this)
200         {
201             callback = this.callback;
202         }
203         if (callback != null)
204         {
205             callback.writeSuccess();
206         }
207     }
208 
209     private void notifyFailure(Throwable failure)
210     {
211         WriteCallback callback;
212         synchronized (this)
213         {
214             callback = this.callback;
215         }
216         if (callback != null)
217         {
218             callback.writeFailed(failure);
219         }
220     }
221 }