1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.message;
20
21 import java.io.IOException;
22 import java.io.Writer;
23 import java.nio.ByteBuffer;
24
25 import org.eclipse.jetty.io.ByteBufferPool;
26 import org.eclipse.jetty.util.BufferUtil;
27 import org.eclipse.jetty.util.log.Log;
28 import org.eclipse.jetty.util.log.Logger;
29 import org.eclipse.jetty.websocket.api.BatchMode;
30 import org.eclipse.jetty.websocket.api.WriteCallback;
31 import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
32 import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
33 import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
34 import org.eclipse.jetty.websocket.common.WebSocketSession;
35 import org.eclipse.jetty.websocket.common.frames.TextFrame;
36
37
38
39
40
41
42 public class MessageWriter extends Writer
43 {
44 private static final Logger LOG = Log.getLogger(MessageWriter.class);
45
46 private final OutgoingFrames outgoing;
47 private final ByteBufferPool bufferPool;
48 private final BlockingWriteCallback blocker;
49 private long frameCount;
50 private TextFrame frame;
51 private ByteBuffer buffer;
52 private Utf8CharBuffer utf;
53 private WriteCallback callback;
54 private boolean closed;
55
56 public MessageWriter(WebSocketSession session)
57 {
58 this(session.getOutgoingHandler(), session.getPolicy().getMaxTextMessageBufferSize(), session.getBufferPool());
59 }
60
61 public MessageWriter(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool)
62 {
63 this.outgoing = outgoing;
64 this.bufferPool = bufferPool;
65 this.blocker = new BlockingWriteCallback();
66 this.buffer = bufferPool.acquire(bufferSize, true);
67 BufferUtil.flipToFill(buffer);
68 this.frame = new TextFrame();
69 this.utf = Utf8CharBuffer.wrap(buffer);
70 }
71
72 @Override
73 public void write(char[] chars, int off, int len) throws IOException
74 {
75 try
76 {
77 send(chars, off, len);
78 }
79 catch (Throwable x)
80 {
81
82 notifyFailure(x);
83 throw x;
84 }
85 }
86
87 @Override
88 public void write(int c) throws IOException
89 {
90 try
91 {
92 send(new char[]{(char)c}, 0, 1);
93 }
94 catch (Throwable x)
95 {
96
97 notifyFailure(x);
98 throw x;
99 }
100 }
101
102 @Override
103 public void flush() throws IOException
104 {
105 try
106 {
107 flush(false);
108 }
109 catch (Throwable x)
110 {
111
112 notifyFailure(x);
113 throw x;
114 }
115 }
116
117 @Override
118 public void close() throws IOException
119 {
120 try
121 {
122 flush(true);
123 bufferPool.release(buffer);
124 if (LOG.isDebugEnabled())
125 LOG.debug("Stream closed, {} frames sent", frameCount);
126
127 notifySuccess();
128 }
129 catch (Throwable x)
130 {
131
132 notifyFailure(x);
133 throw x;
134 }
135 }
136
137 private void flush(boolean fin) throws IOException
138 {
139 synchronized (this)
140 {
141 if (closed)
142 throw new IOException("Stream is closed");
143
144 closed = fin;
145
146 ByteBuffer data = utf.getByteBuffer();
147 if (LOG.isDebugEnabled())
148 LOG.debug("flush({}): {}", fin, BufferUtil.toDetailString(buffer));
149 frame.setPayload(data);
150 frame.setFin(fin);
151
152 try (WriteBlocker b = blocker.acquireWriteBlocker())
153 {
154 outgoing.outgoingFrame(frame, b, BatchMode.OFF);
155 b.block();
156 }
157
158 ++frameCount;
159
160 frame.setIsContinuation();
161
162 utf.clear();
163 }
164 }
165
166 private void send(char[] chars, int offset, int length) throws IOException
167 {
168 synchronized (this)
169 {
170 if (closed)
171 throw new IOException("Stream is closed");
172
173 while (length > 0)
174 {
175
176
177 int space = utf.remaining();
178 int size = Math.min(space, length);
179 utf.append(chars, offset, size);
180 offset += size;
181 length -= size;
182 if (length > 0)
183 {
184
185
186 flush(false);
187 }
188 }
189 }
190 }
191
192 public void setCallback(WriteCallback callback)
193 {
194 synchronized (this)
195 {
196 this.callback = callback;
197 }
198 }
199
200 private void notifySuccess()
201 {
202 WriteCallback callback;
203 synchronized (this)
204 {
205 callback = this.callback;
206 }
207 if (callback != null)
208 {
209 callback.writeSuccess();
210 }
211 }
212
213 private void notifyFailure(Throwable failure)
214 {
215 WriteCallback callback;
216 synchronized (this)
217 {
218 callback = this.callback;
219 }
220 if (callback != null)
221 {
222 callback.writeFailed(failure);
223 }
224 }
225 }