1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.message;
20
21 import java.io.IOException;
22 import java.io.OutputStream;
23 import java.nio.ByteBuffer;
24
25 import org.eclipse.jetty.io.ByteBufferPool;
26 import org.eclipse.jetty.util.BufferUtil;
27 import org.eclipse.jetty.util.log.Log;
28 import org.eclipse.jetty.util.log.Logger;
29 import org.eclipse.jetty.websocket.api.BatchMode;
30 import org.eclipse.jetty.websocket.api.WriteCallback;
31 import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
32 import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
33 import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
34 import org.eclipse.jetty.websocket.common.WebSocketSession;
35 import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
36
37
38
39
40 public class MessageOutputStream extends OutputStream
41 {
42 private static final Logger LOG = Log.getLogger(MessageOutputStream.class);
43
44 private final OutgoingFrames outgoing;
45 private final ByteBufferPool bufferPool;
46 private final BlockingWriteCallback blocker;
47 private long frameCount;
48 private BinaryFrame frame;
49 private ByteBuffer buffer;
50 private WriteCallback callback;
51 private boolean closed;
52
53 public MessageOutputStream(WebSocketSession session)
54 {
55 this(session.getOutgoingHandler(), session.getPolicy().getMaxBinaryMessageBufferSize(), session.getBufferPool());
56 }
57
58 public MessageOutputStream(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool)
59 {
60 this.outgoing = outgoing;
61 this.bufferPool = bufferPool;
62 this.blocker = new BlockingWriteCallback();
63 this.buffer = bufferPool.acquire(bufferSize, true);
64 BufferUtil.flipToFill(buffer);
65 this.frame = new BinaryFrame();
66 }
67
68 @Override
69 public void write(byte[] bytes, int off, int len) throws IOException
70 {
71 try
72 {
73 send(bytes, off, len);
74 }
75 catch (Throwable x)
76 {
77
78 notifyFailure(x);
79 throw x;
80 }
81 }
82
83 @Override
84 public void write(int b) throws IOException
85 {
86 try
87 {
88 send(new byte[]{(byte)b}, 0, 1);
89 }
90 catch (Throwable x)
91 {
92
93 notifyFailure(x);
94 throw x;
95 }
96 }
97
98 @Override
99 public void flush() throws IOException
100 {
101 try
102 {
103 flush(false);
104 }
105 catch (Throwable x)
106 {
107
108 notifyFailure(x);
109 throw x;
110 }
111 }
112
113 @Override
114 public void close() throws IOException
115 {
116 try
117 {
118 flush(true);
119 bufferPool.release(buffer);
120 if (LOG.isDebugEnabled())
121 LOG.debug("Stream closed, {} frames sent", frameCount);
122
123 notifySuccess();
124 }
125 catch (Throwable x)
126 {
127
128 notifyFailure(x);
129 throw x;
130 }
131 }
132
133 private void flush(boolean fin) throws IOException
134 {
135 synchronized (this)
136 {
137 if (closed)
138 throw new IOException("Stream is closed");
139
140 closed = fin;
141
142 BufferUtil.flipToFlush(buffer, 0);
143 frame.setPayload(buffer);
144 frame.setFin(fin);
145
146 try(WriteBlocker b=blocker.acquireWriteBlocker())
147 {
148 outgoing.outgoingFrame(frame, b, BatchMode.OFF);
149 b.block();
150 }
151
152 ++frameCount;
153
154 frame.setIsContinuation();
155
156 BufferUtil.flipToFill(buffer);
157 }
158 }
159
160 private void send(byte[] bytes, int offset, int length) throws IOException
161 {
162 synchronized (this)
163 {
164 if (closed)
165 throw new IOException("Stream is closed");
166
167 while (length > 0)
168 {
169
170
171 int space = buffer.remaining();
172 int size = Math.min(space, length);
173 buffer.put(bytes, offset, size);
174 offset += size;
175 length -= size;
176 if (length > 0)
177 {
178
179
180 flush(false);
181 }
182 }
183 }
184 }
185
186 public void setCallback(WriteCallback callback)
187 {
188 synchronized (this)
189 {
190 this.callback = callback;
191 }
192 }
193
194 private void notifySuccess()
195 {
196 WriteCallback callback;
197 synchronized (this)
198 {
199 callback = this.callback;
200 }
201 if (callback != null)
202 {
203 callback.writeSuccess();
204 }
205 }
206
207 private void notifyFailure(Throwable failure)
208 {
209 WriteCallback callback;
210 synchronized (this)
211 {
212 callback = this.callback;
213 }
214 if (callback != null)
215 {
216 callback.writeFailed(failure);
217 }
218 }
219 }