1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.message;
20
21 import java.io.IOException;
22 import java.io.Writer;
23 import java.nio.ByteBuffer;
24
25 import org.eclipse.jetty.io.ByteBufferPool;
26 import org.eclipse.jetty.util.BufferUtil;
27 import org.eclipse.jetty.util.log.Log;
28 import org.eclipse.jetty.util.log.Logger;
29 import org.eclipse.jetty.websocket.api.BatchMode;
30 import org.eclipse.jetty.websocket.api.WriteCallback;
31 import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
32 import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
33 import org.eclipse.jetty.websocket.common.WebSocketSession;
34 import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
35 import org.eclipse.jetty.websocket.common.frames.TextFrame;
36
37
38
39
40
41
42 public class MessageWriter extends Writer
43 {
44 private static final Logger LOG = Log.getLogger(MessageWriter.class);
45
46 private final OutgoingFrames outgoing;
47 private final ByteBufferPool bufferPool;
48 private final BlockingWriteCallback blocker;
49 private long frameCount;
50 private TextFrame frame;
51 private ByteBuffer buffer;
52 private Utf8CharBuffer utf;
53 private WriteCallback callback;
54 private boolean closed;
55
56 public MessageWriter(WebSocketSession session)
57 {
58 this(session.getOutgoingHandler(), session.getPolicy().getMaxTextMessageBufferSize(), session.getBufferPool());
59 }
60
61 public MessageWriter(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool)
62 {
63 this.outgoing = outgoing;
64 this.bufferPool = bufferPool;
65 this.blocker = new BlockingWriteCallback();
66 this.buffer = bufferPool.acquire(bufferSize, true);
67 BufferUtil.flipToFill(buffer);
68 this.frame = new TextFrame();
69 this.utf = Utf8CharBuffer.wrap(buffer);
70 }
71
72 @Override
73 public void write(char[] chars, int off, int len) throws IOException
74 {
75 try
76 {
77 send(chars, off, len);
78 }
79 catch (Throwable x)
80 {
81
82 notifyFailure(x);
83 throw x;
84 }
85 }
86
87 @Override
88 public void write(int c) throws IOException
89 {
90 try
91 {
92 send(new char[]{(char)c}, 0, 1);
93 }
94 catch (Throwable x)
95 {
96
97 notifyFailure(x);
98 throw x;
99 }
100 }
101
102 @Override
103 public void flush() throws IOException
104 {
105 try
106 {
107 flush(false);
108 }
109 catch (Throwable x)
110 {
111
112 notifyFailure(x);
113 throw x;
114 }
115 }
116
117 @Override
118 public void close() throws IOException
119 {
120 try
121 {
122 flush(true);
123 bufferPool.release(buffer);
124 LOG.debug("Stream closed, {} frames sent", frameCount);
125
126 notifySuccess();
127 }
128 catch (Throwable x)
129 {
130
131 notifyFailure(x);
132 throw x;
133 }
134 }
135
136 private void flush(boolean fin) throws IOException
137 {
138 synchronized (this)
139 {
140 if (closed)
141 throw new IOException("Stream is closed");
142
143 closed = fin;
144
145 ByteBuffer data = utf.getByteBuffer();
146 LOG.debug("flush({}): {}", fin, BufferUtil.toDetailString(buffer));
147 frame.setPayload(data);
148 frame.setFin(fin);
149
150 try (WriteBlocker b = blocker.acquireWriteBlocker())
151 {
152 outgoing.outgoingFrame(frame, b, BatchMode.OFF);
153 b.block();
154 }
155
156 ++frameCount;
157
158 frame.setIsContinuation();
159
160 utf.clear();
161 }
162 }
163
164 private void send(char[] chars, int offset, int length) throws IOException
165 {
166 synchronized (this)
167 {
168 if (closed)
169 throw new IOException("Stream is closed");
170
171 while (length > 0)
172 {
173
174
175 int space = utf.remaining();
176 int size = Math.min(space, length);
177 utf.append(chars, offset, size);
178 offset += size;
179 length -= size;
180 if (length > 0)
181 {
182
183
184 flush(false);
185 }
186 }
187 }
188 }
189
190 public void setCallback(WriteCallback callback)
191 {
192 synchronized (this)
193 {
194 this.callback = callback;
195 }
196 }
197
198 private void notifySuccess()
199 {
200 WriteCallback callback;
201 synchronized (this)
202 {
203 callback = this.callback;
204 }
205 if (callback != null)
206 {
207 callback.writeSuccess();
208 }
209 }
210
211 private void notifyFailure(Throwable failure)
212 {
213 WriteCallback callback;
214 synchronized (this)
215 {
216 callback = this.callback;
217 }
218 if (callback != null)
219 {
220 callback.writeFailed(failure);
221 }
222 }
223 }