1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.message;
20
21 import java.io.IOException;
22 import java.io.OutputStream;
23 import java.nio.ByteBuffer;
24
25 import org.eclipse.jetty.io.ByteBufferPool;
26 import org.eclipse.jetty.util.BufferUtil;
27 import org.eclipse.jetty.util.log.Log;
28 import org.eclipse.jetty.util.log.Logger;
29 import org.eclipse.jetty.websocket.api.BatchMode;
30 import org.eclipse.jetty.websocket.api.WriteCallback;
31 import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
32 import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
33 import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
34 import org.eclipse.jetty.websocket.common.WebSocketSession;
35 import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
36
37
38
39
40 public class MessageOutputStream extends OutputStream
41 {
42 private static final Logger LOG = Log.getLogger(MessageOutputStream.class);
43
44 private final OutgoingFrames outgoing;
45 private final ByteBufferPool bufferPool;
46 private final BlockingWriteCallback blocker;
47 private long frameCount;
48 private BinaryFrame frame;
49 private ByteBuffer buffer;
50 private WriteCallback callback;
51 private boolean closed;
52
53 public MessageOutputStream(WebSocketSession session)
54 {
55 this(session.getOutgoingHandler(), session.getPolicy().getMaxBinaryMessageBufferSize(), session.getBufferPool());
56 }
57
58 public MessageOutputStream(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool)
59 {
60 this.outgoing = outgoing;
61 this.bufferPool = bufferPool;
62 this.blocker = new BlockingWriteCallback();
63 this.buffer = bufferPool.acquire(bufferSize, true);
64 BufferUtil.flipToFill(buffer);
65 this.frame = new BinaryFrame();
66 }
67
68 @Override
69 public void write(byte[] bytes, int off, int len) throws IOException
70 {
71 try
72 {
73 send(bytes, off, len);
74 }
75 catch (Throwable x)
76 {
77
78 notifyFailure(x);
79 throw x;
80 }
81 }
82
83 @Override
84 public void write(int b) throws IOException
85 {
86 try
87 {
88 send(new byte[]{(byte)b}, 0, 1);
89 }
90 catch (Throwable x)
91 {
92
93 notifyFailure(x);
94 throw x;
95 }
96 }
97
98 @Override
99 public void flush() throws IOException
100 {
101 try
102 {
103 flush(false);
104 }
105 catch (Throwable x)
106 {
107
108 notifyFailure(x);
109 throw x;
110 }
111 }
112
113 @Override
114 public void close() throws IOException
115 {
116 try
117 {
118 flush(true);
119 bufferPool.release(buffer);
120 if (LOG.isDebugEnabled())
121 LOG.debug("Stream closed, {} frames sent", frameCount);
122
123 notifySuccess();
124 }
125 catch (Throwable x)
126 {
127
128 notifyFailure(x);
129 throw x;
130 }
131 }
132
133 private void flush(boolean fin) throws IOException
134 {
135 synchronized (this)
136 {
137 if (closed)
138 throw new IOException("Stream is closed");
139
140 closed = fin;
141
142 BufferUtil.flipToFlush(buffer, 0);
143 if (LOG.isDebugEnabled())
144 LOG.debug("flush({}): {}", fin, BufferUtil.toDetailString(buffer));
145 frame.setPayload(buffer);
146 frame.setFin(fin);
147
148 try(WriteBlocker b=blocker.acquireWriteBlocker())
149 {
150 outgoing.outgoingFrame(frame, b, BatchMode.OFF);
151 b.block();
152 }
153
154 ++frameCount;
155
156 frame.setIsContinuation();
157
158 BufferUtil.flipToFill(buffer);
159 }
160 }
161
162 private void send(byte[] bytes, int offset, int length) throws IOException
163 {
164 synchronized (this)
165 {
166 if (closed)
167 throw new IOException("Stream is closed");
168
169 while (length > 0)
170 {
171
172
173 int space = buffer.remaining();
174 int size = Math.min(space, length);
175 buffer.put(bytes, offset, size);
176 offset += size;
177 length -= size;
178 if (length > 0)
179 {
180
181
182 flush(false);
183 }
184 }
185 }
186 }
187
188 public void setCallback(WriteCallback callback)
189 {
190 synchronized (this)
191 {
192 this.callback = callback;
193 }
194 }
195
196 private void notifySuccess()
197 {
198 WriteCallback callback;
199 synchronized (this)
200 {
201 callback = this.callback;
202 }
203 if (callback != null)
204 {
205 callback.writeSuccess();
206 }
207 }
208
209 private void notifyFailure(Throwable failure)
210 {
211 WriteCallback callback;
212 synchronized (this)
213 {
214 callback = this.callback;
215 }
216 if (callback != null)
217 {
218 callback.writeFailed(failure);
219 }
220 }
221 }