1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.jsr356.endpoints;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.Reader;
24 import java.nio.ByteBuffer;
25 import java.util.Map;
26
27 import javax.websocket.CloseReason;
28 import javax.websocket.DecodeException;
29 import javax.websocket.MessageHandler.Whole;
30
31 import org.eclipse.jetty.util.BufferUtil;
32 import org.eclipse.jetty.util.log.Log;
33 import org.eclipse.jetty.util.log.Logger;
34 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
35 import org.eclipse.jetty.websocket.api.extensions.Frame;
36 import org.eclipse.jetty.websocket.common.events.EventDriver;
37 import org.eclipse.jetty.websocket.common.message.MessageInputStream;
38 import org.eclipse.jetty.websocket.common.message.MessageReader;
39 import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
40 import org.eclipse.jetty.websocket.common.message.SimpleTextMessage;
41 import org.eclipse.jetty.websocket.jsr356.JsrSession;
42 import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
43 import org.eclipse.jetty.websocket.jsr356.messages.BinaryPartialOnMessage;
44 import org.eclipse.jetty.websocket.jsr356.messages.TextPartialOnMessage;
45
46
47
48
49 public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements EventDriver
50 {
51 private static final Logger LOG = Log.getLogger(JsrAnnotatedEventDriver.class);
52 private final JsrEvents<?, ?> events;
53
54 public JsrAnnotatedEventDriver(WebSocketPolicy policy, EndpointInstance endpointInstance, JsrEvents<?, ?> events)
55 {
56 super(policy,endpointInstance);
57 this.events = events;
58 }
59
60 @Override
61 public void init(JsrSession jsrsession)
62 {
63 this.events.init(jsrsession);
64 }
65
66
67
68
69 @Override
70 public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
71 {
72 if (LOG.isDebugEnabled())
73 {
74 LOG.debug("onBinaryFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
75 LOG.debug("events.onBinary={}",events.hasBinary());
76 LOG.debug("events.onBinaryStream={}",events.hasBinaryStream());
77 }
78 boolean handled = false;
79
80 if (events.hasBinary())
81 {
82 handled = true;
83 if (activeMessage == null)
84 {
85 if (events.isBinaryPartialSupported())
86 {
87
88 LOG.debug("Partial Binary Message: fin={}",fin);
89 activeMessage = new BinaryPartialOnMessage(this);
90 }
91 else
92 {
93
94 LOG.debug("Whole Binary Message");
95 activeMessage = new SimpleBinaryMessage(this);
96 }
97 }
98 }
99
100 if (events.hasBinaryStream())
101 {
102 handled = true;
103
104 if (activeMessage == null)
105 {
106 LOG.debug("Binary Message InputStream");
107 final MessageInputStream stream = new MessageInputStream(session.getConnection());
108 activeMessage = stream;
109
110
111 dispatch(new Runnable()
112 {
113 @Override
114 public void run()
115 {
116 try
117 {
118 events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
119 }
120 catch (DecodeException | IOException e)
121 {
122 onFatalError(e);
123 }
124 }
125 });
126 }
127 }
128
129 LOG.debug("handled = {}",handled);
130
131
132 if (handled && (activeMessage != null))
133 {
134 appendMessage(buffer,fin);
135 }
136 }
137
138
139
140
141 @Override
142 public void onBinaryMessage(byte[] data)
143 {
144 if (data == null)
145 {
146 return;
147 }
148
149 ByteBuffer buf = ByteBuffer.wrap(data);
150
151 if (LOG.isDebugEnabled())
152 {
153 LOG.debug("onBinaryMessage({})",BufferUtil.toDetailString(buf));
154 }
155
156 try
157 {
158
159 events.callBinary(jsrsession.getAsyncRemote(),websocket,buf,true);
160 }
161 catch (DecodeException e)
162 {
163 onFatalError(e);
164 }
165 }
166
167 @Override
168 protected void onClose(CloseReason closereason)
169 {
170 events.callClose(websocket,closereason);
171 }
172
173 @Override
174 public void onConnect()
175 {
176 events.callOpen(websocket,config);
177 }
178
179 @Override
180 public void onError(Throwable cause)
181 {
182 events.callError(websocket,cause);
183 }
184
185 private void onFatalError(Throwable t)
186 {
187 onError(t);
188 }
189
190 @Override
191 public void onFrame(Frame frame)
192 {
193
194 }
195
196 @Override
197 public void onInputStream(InputStream stream)
198 {
199 try
200 {
201 events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
202 }
203 catch (DecodeException | IOException e)
204 {
205 onFatalError(e);
206 }
207 }
208
209 public void onPartialBinaryMessage(ByteBuffer buffer, boolean fin)
210 {
211 try
212 {
213 events.callBinary(jsrsession.getAsyncRemote(),websocket,buffer,fin);
214 }
215 catch (DecodeException e)
216 {
217 onFatalError(e);
218 }
219 }
220
221 public void onPartialTextMessage(String message, boolean fin)
222 {
223 try
224 {
225 events.callText(jsrsession.getAsyncRemote(),websocket,message,fin);
226 }
227 catch (DecodeException e)
228 {
229 onFatalError(e);
230 }
231 }
232
233 @Override
234 public void onPong(ByteBuffer buffer)
235 {
236 try
237 {
238 events.callPong(jsrsession.getAsyncRemote(),websocket,buffer);
239 }
240 catch (DecodeException | IOException e)
241 {
242 onFatalError(e);
243 }
244 }
245
246 @Override
247 public void onReader(Reader reader)
248 {
249 try
250 {
251 events.callTextStream(jsrsession.getAsyncRemote(),websocket,reader);
252 }
253 catch (DecodeException | IOException e)
254 {
255 onFatalError(e);
256 }
257 }
258
259
260
261
262 @Override
263 public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
264 {
265 if (LOG.isDebugEnabled())
266 {
267 LOG.debug("onTextFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
268 LOG.debug("events.hasText={}",events.hasText());
269 LOG.debug("events.hasTextStream={}",events.hasTextStream());
270 }
271
272 boolean handled = false;
273
274 if (events.hasText())
275 {
276 handled = true;
277 if (activeMessage == null)
278 {
279 if (events.isTextPartialSupported())
280 {
281
282 LOG.debug("Partial Text Message: fin={}",fin);
283 activeMessage = new TextPartialOnMessage(this);
284 }
285 else
286 {
287
288 LOG.debug("Whole Text Message");
289 activeMessage = new SimpleTextMessage(this);
290 }
291 }
292 }
293
294 if (events.hasTextStream())
295 {
296 handled = true;
297
298 if (activeMessage == null)
299 {
300 LOG.debug("Text Message Writer");
301
302 final MessageReader stream = new MessageReader(new MessageInputStream(session.getConnection()));
303 activeMessage = stream;
304
305
306 dispatch(new Runnable()
307 {
308 @Override
309 public void run()
310 {
311 try
312 {
313 events.callTextStream(jsrsession.getAsyncRemote(),websocket,stream);
314 }
315 catch (DecodeException | IOException e)
316 {
317 onFatalError(e);
318 }
319 }
320 });
321 }
322 }
323
324 LOG.debug("handled = {}",handled);
325
326
327 if (handled && (activeMessage != null))
328 {
329 appendMessage(buffer,fin);
330 }
331 }
332
333
334
335
336 @Override
337 public void onTextMessage(String message)
338 {
339 LOG.debug("onText({})",message);
340
341 try
342 {
343
344 events.callText(jsrsession.getAsyncRemote(),websocket,message,true);
345 }
346 catch (DecodeException e)
347 {
348 onFatalError(e);
349 }
350 }
351
352 @Override
353 public void setPathParameters(Map<String, String> pathParameters)
354 {
355 events.setPathParameters(pathParameters);
356 }
357
358 @Override
359 public String toString()
360 {
361 return String.format("%s[websocket=%s]",this.getClass().getSimpleName(),websocket);
362 }
363 }