1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.message;
20
21 import java.io.IOException;
22 import java.io.OutputStream;
23 import java.nio.ByteBuffer;
24 import java.util.concurrent.ExecutionException;
25
26 import org.eclipse.jetty.io.ByteBufferPool;
27 import org.eclipse.jetty.util.BufferUtil;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.websocket.api.WriteCallback;
31 import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
32 import org.eclipse.jetty.websocket.common.WebSocketSession;
33 import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
34 import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
35
36
37
38
39 public class MessageOutputStream extends OutputStream
40 {
41 private static final Logger LOG = Log.getLogger(MessageOutputStream.class);
42 private final OutgoingFrames outgoing;
43 private final ByteBufferPool bufferPool;
44 private long frameCount = 0;
45 private BinaryFrame frame;
46 private ByteBuffer buffer;
47 private FutureWriteCallback blocker;
48 private WriteCallback callback;
49 private boolean closed = false;
50
51 public MessageOutputStream(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool)
52 {
53 this.outgoing = outgoing;
54 this.bufferPool = bufferPool;
55 this.buffer = bufferPool.acquire(bufferSize,true);
56 BufferUtil.flipToFill(buffer);
57 this.frame = new BinaryFrame();
58 }
59
60 public MessageOutputStream(WebSocketSession session)
61 {
62 this(session.getOutgoingHandler(),session.getPolicy().getMaxBinaryMessageBufferSize(),session.getBufferPool());
63 }
64
65 private void assertNotClosed() throws IOException
66 {
67 if (closed)
68 {
69 IOException e = new IOException("Stream is closed");
70 notifyFailure(e);
71 throw e;
72 }
73 }
74
75 @Override
76 public synchronized void close() throws IOException
77 {
78 assertNotClosed();
79 LOG.debug("close()");
80
81
82 flush(true);
83
84
85 LOG.debug("Sent Frame Count: {}",frameCount);
86 closed = true;
87 try
88 {
89 if (callback != null)
90 {
91 callback.writeSuccess();
92 }
93 super.close();
94 bufferPool.release(buffer);
95 LOG.debug("closed");
96 }
97 catch (IOException e)
98 {
99 notifyFailure(e);
100 throw e;
101 }
102 }
103
104 @Override
105 public synchronized void flush() throws IOException
106 {
107 LOG.debug("flush()");
108 assertNotClosed();
109
110
111 flush(false);
112 try
113 {
114 super.flush();
115 LOG.debug("flushed");
116 }
117 catch (IOException e)
118 {
119 notifyFailure(e);
120 throw e;
121 }
122 }
123
124
125
126
127
128
129
130
131 private synchronized void flush(boolean fin) throws IOException
132 {
133 BufferUtil.flipToFlush(buffer,0);
134 LOG.debug("flush({}): {}",fin,BufferUtil.toDetailString(buffer));
135 frame.setPayload(buffer);
136 frame.setFin(fin);
137
138 try
139 {
140 blocker = new FutureWriteCallback();
141 outgoing.outgoingFrame(frame,blocker);
142 try
143 {
144
145 blocker.get();
146
147 frameCount++;
148 frame.setIsContinuation();
149 }
150 catch (ExecutionException e)
151 {
152 Throwable cause = e.getCause();
153 if (cause != null)
154 {
155 if (cause instanceof IOException)
156 {
157 throw (IOException)cause;
158 }
159 else
160 {
161 throw new IOException(cause);
162 }
163 }
164 throw new IOException("Failed to flush",e);
165 }
166 catch (InterruptedException e)
167 {
168 throw new IOException("Failed to flush",e);
169 }
170 }
171 catch (IOException e)
172 {
173 notifyFailure(e);
174 throw e;
175 }
176 }
177
178 private void notifyFailure(IOException e)
179 {
180 if (callback != null)
181 {
182 callback.writeFailed(e);
183 }
184 }
185
186 public void setCallback(WriteCallback callback)
187 {
188 this.callback = callback;
189 }
190
191 @Override
192 public synchronized void write(byte[] b) throws IOException
193 {
194 try
195 {
196 this.write(b,0,b.length);
197 }
198 catch (IOException e)
199 {
200 notifyFailure(e);
201 throw e;
202 }
203 }
204
205 @Override
206 public synchronized void write(byte[] b, int off, int len) throws IOException
207 {
208 LOG.debug("write(byte[{}], {}, {})",b.length,off,len);
209 int left = len;
210 int offset = off;
211 while (left > 0)
212 {
213 if (LOG.isDebugEnabled())
214 {
215 LOG.debug("buffer: {}",BufferUtil.toDetailString(buffer));
216 }
217 int space = buffer.remaining();
218 assert (space > 0);
219 int size = Math.min(space,left);
220 buffer.put(b,offset,size);
221 assert (size > 0);
222 left -= size;
223 if (left > 0)
224 {
225 flush(false);
226 }
227 offset += size;
228 }
229 }
230
231 @Override
232 public synchronized void write(int b) throws IOException
233 {
234 assertNotClosed();
235
236
237 buffer.put((byte)b);
238 if (buffer.remaining() <= 0)
239 {
240 flush(false);
241 }
242 }
243 }