View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.message;
20  
21  import java.io.IOException;
22  import java.io.OutputStream;
23  import java.nio.ByteBuffer;
24  
25  import org.eclipse.jetty.io.ByteBufferPool;
26  import org.eclipse.jetty.util.BufferUtil;
27  import org.eclipse.jetty.util.log.Log;
28  import org.eclipse.jetty.util.log.Logger;
29  import org.eclipse.jetty.websocket.api.BatchMode;
30  import org.eclipse.jetty.websocket.api.WriteCallback;
31  import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
32  import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
33  import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
34  import org.eclipse.jetty.websocket.common.WebSocketSession;
35  import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
36  
37  /**
38   * Support for writing a single WebSocket BINARY message via a {@link OutputStream}
39   */
40  public class MessageOutputStream extends OutputStream
41  {
42      private static final Logger LOG = Log.getLogger(MessageOutputStream.class);
43  
44      private final OutgoingFrames outgoing;
45      private final ByteBufferPool bufferPool;
46      private final BlockingWriteCallback blocker;
47      private long frameCount;
48      private BinaryFrame frame;
49      private ByteBuffer buffer;
50      private WriteCallback callback;
51      private boolean closed;
52  
53      public MessageOutputStream(WebSocketSession session)
54      {
55          this(session.getOutgoingHandler(), session.getPolicy().getMaxBinaryMessageBufferSize(), session.getBufferPool());
56      }
57  
58      public MessageOutputStream(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool)
59      {
60          this.outgoing = outgoing;
61          this.bufferPool = bufferPool;
62          this.blocker = new BlockingWriteCallback();
63          this.buffer = bufferPool.acquire(bufferSize, true);
64          BufferUtil.flipToFill(buffer);
65          this.frame = new BinaryFrame();
66      }
67  
68      @Override
69      public void write(byte[] bytes, int off, int len) throws IOException
70      {
71          try
72          {
73              send(bytes, off, len);
74          }
75          catch (Throwable x)
76          {
77              // Notify without holding locks.
78              notifyFailure(x);
79              throw x;
80          }
81      }
82  
83      @Override
84      public void write(int b) throws IOException
85      {
86          try
87          {
88              send(new byte[]{(byte)b}, 0, 1);
89          }
90          catch (Throwable x)
91          {
92              // Notify without holding locks.
93              notifyFailure(x);
94              throw x;
95          }
96      }
97  
98      @Override
99      public void flush() throws IOException
100     {
101         try
102         {
103             flush(false);
104         }
105         catch (Throwable x)
106         {
107             // Notify without holding locks.
108             notifyFailure(x);
109             throw x;
110         }
111     }
112 
113     @Override
114     public void close() throws IOException
115     {
116         try
117         {
118             flush(true);
119             bufferPool.release(buffer);
120             if (LOG.isDebugEnabled())
121                 LOG.debug("Stream closed, {} frames sent", frameCount);
122             // Notify without holding locks.
123             notifySuccess();
124         }
125         catch (Throwable x)
126         {
127             // Notify without holding locks.
128             notifyFailure(x);
129             throw x;
130         }
131     }
132 
133     private void flush(boolean fin) throws IOException
134     {
135         synchronized (this)
136         {
137             if (closed)
138                 throw new IOException("Stream is closed");
139 
140             closed = fin;
141 
142             BufferUtil.flipToFlush(buffer, 0);
143             frame.setPayload(buffer);
144             frame.setFin(fin);
145 
146             try(WriteBlocker b=blocker.acquireWriteBlocker())
147             {
148                 outgoing.outgoingFrame(frame, b, BatchMode.OFF);
149                 b.block();
150             }
151 
152             ++frameCount;
153             // Any flush after the first will be a CONTINUATION frame.
154             frame.setIsContinuation();
155 
156             BufferUtil.flipToFill(buffer);
157         }
158     }
159 
160     private void send(byte[] bytes, int offset, int length) throws IOException
161     {
162         synchronized (this)
163         {
164             if (closed)
165                 throw new IOException("Stream is closed");
166 
167             while (length > 0)
168             {
169                 // There may be no space available, we want
170                 // to handle correctly when space == 0.
171                 int space = buffer.remaining();
172                 int size = Math.min(space, length);
173                 buffer.put(bytes, offset, size);
174                 offset += size;
175                 length -= size;
176                 if (length > 0)
177                 {
178                     // If we could not write everything, it means
179                     // that the buffer was full, so flush it.
180                     flush(false);
181                 }
182             }
183         }
184     }
185 
186     public void setCallback(WriteCallback callback)
187     {
188         synchronized (this)
189         {
190             this.callback = callback;
191         }
192     }
193 
194     private void notifySuccess()
195     {
196         WriteCallback callback;
197         synchronized (this)
198         {
199             callback = this.callback;
200         }
201         if (callback != null)
202         {
203             callback.writeSuccess();
204         }
205     }
206 
207     private void notifyFailure(Throwable failure)
208     {
209         WriteCallback callback;
210         synchronized (this)
211         {
212             callback = this.callback;
213         }
214         if (callback != null)
215         {
216             callback.writeFailed(failure);
217         }
218     }
219 }