1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.message;
20
21 import java.io.IOException;
22 import java.io.Writer;
23 import java.nio.ByteBuffer;
24 import java.util.concurrent.ExecutionException;
25
26 import org.eclipse.jetty.io.ByteBufferPool;
27 import org.eclipse.jetty.util.BufferUtil;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.websocket.api.WriteCallback;
31 import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
32 import org.eclipse.jetty.websocket.common.WebSocketSession;
33 import org.eclipse.jetty.websocket.common.frames.TextFrame;
34 import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
35
36
37
38
39
40
41 public class MessageWriter extends Writer
42 {
43 private static final Logger LOG = Log.getLogger(MessageWriter.class);
44 private final OutgoingFrames outgoing;
45 private final ByteBufferPool bufferPool;
46 private long frameCount = 0;
47 private TextFrame frame;
48 private ByteBuffer buffer;
49 private Utf8CharBuffer utf;
50 private FutureWriteCallback blocker;
51 private WriteCallback callback;
52 private boolean closed = false;
53
54 public MessageWriter(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool)
55 {
56 this.outgoing = outgoing;
57 this.bufferPool = bufferPool;
58 this.buffer = bufferPool.acquire(bufferSize,true);
59 BufferUtil.flipToFill(buffer);
60 this.utf = Utf8CharBuffer.wrap(buffer);
61 this.frame = new TextFrame();
62 }
63
64 public MessageWriter(WebSocketSession session)
65 {
66 this(session.getOutgoingHandler(),session.getPolicy().getMaxTextMessageBufferSize(),session.getBufferPool());
67 }
68
69 private void assertNotClosed() throws IOException
70 {
71 if (closed)
72 {
73 IOException e = new IOException("Stream is closed");
74 notifyFailure(e);
75 throw e;
76 }
77 }
78
79 @Override
80 public synchronized void close() throws IOException
81 {
82 assertNotClosed();
83
84
85 flush(true);
86
87
88 closed = true;
89 if (callback != null)
90 {
91 callback.writeSuccess();
92 }
93 bufferPool.release(buffer);
94 LOG.debug("closed (frame count={})",frameCount);
95 }
96
97 @Override
98 public void flush() throws IOException
99 {
100 assertNotClosed();
101
102
103 flush(false);
104 }
105
106
107
108
109
110
111
112
113 private synchronized void flush(boolean fin) throws IOException
114 {
115 ByteBuffer data = utf.getByteBuffer();
116 frame.setPayload(data);
117 frame.setFin(fin);
118
119 try
120 {
121 blocker = new FutureWriteCallback();
122 outgoing.outgoingFrame(frame,blocker);
123 try
124 {
125
126 blocker.get();
127
128
129 utf.clear();
130 frameCount++;
131 frame.setIsContinuation();
132 }
133 catch (ExecutionException e)
134 {
135 Throwable cause = e.getCause();
136 if (cause != null)
137 {
138 if (cause instanceof IOException)
139 {
140 throw (IOException)cause;
141 }
142 else
143 {
144 throw new IOException(cause);
145 }
146 }
147 throw new IOException("Failed to flush",e);
148 }
149 catch (InterruptedException e)
150 {
151 throw new IOException("Failed to flush",e);
152 }
153 }
154 catch (IOException e)
155 {
156 notifyFailure(e);
157 throw e;
158 }
159 }
160
161 private void notifyFailure(IOException e)
162 {
163 if (callback != null)
164 {
165 callback.writeFailed(e);
166 }
167 }
168
169 public void setCallback(WriteCallback callback)
170 {
171 this.callback = callback;
172 }
173
174 @Override
175 public void write(char[] cbuf) throws IOException
176 {
177 try
178 {
179 this.write(cbuf,0,cbuf.length);
180 }
181 catch (IOException e)
182 {
183 notifyFailure(e);
184 throw e;
185 }
186 }
187
188 @Override
189 public void write(char[] cbuf, int off, int len) throws IOException
190 {
191 assertNotClosed();
192 int left = len;
193 int offset = off;
194 while (left > 0)
195 {
196 int space = utf.remaining();
197 int size = Math.min(space,left);
198 assert (space > 0);
199 assert (size > 0);
200 utf.append(cbuf,offset,size);
201 left -= size;
202 if (left > 0)
203 {
204 flush(false);
205 }
206 offset += size;
207 }
208 }
209
210 @Override
211 public void write(int c) throws IOException
212 {
213 assertNotClosed();
214
215
216 utf.append(c);
217 if (utf.remaining() <= 0)
218 {
219 flush(false);
220 }
221 }
222 }