1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.jsr356.endpoints;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.Reader;
24 import java.nio.ByteBuffer;
25 import java.util.Map;
26 import javax.websocket.CloseReason;
27 import javax.websocket.DecodeException;
28
29 import org.eclipse.jetty.util.BufferUtil;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
33 import org.eclipse.jetty.websocket.api.extensions.Frame;
34 import org.eclipse.jetty.websocket.common.message.MessageInputStream;
35 import org.eclipse.jetty.websocket.common.message.MessageReader;
36 import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
37 import org.eclipse.jetty.websocket.common.message.SimpleTextMessage;
38 import org.eclipse.jetty.websocket.jsr356.JsrSession;
39 import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
40 import org.eclipse.jetty.websocket.jsr356.messages.BinaryPartialOnMessage;
41 import org.eclipse.jetty.websocket.jsr356.messages.TextPartialOnMessage;
42
43
44
45
46 public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver
47 {
48 private static final Logger LOG = Log.getLogger(JsrAnnotatedEventDriver.class);
49 private final JsrEvents<?, ?> events;
50
51 public JsrAnnotatedEventDriver(WebSocketPolicy policy, EndpointInstance endpointInstance, JsrEvents<?, ?> events)
52 {
53 super(policy,endpointInstance);
54 this.events = events;
55 }
56
57 @Override
58 public void init(JsrSession jsrsession)
59 {
60 this.events.init(jsrsession);
61 }
62
63
64
65
66 @Override
67 public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
68 {
69 if (LOG.isDebugEnabled())
70 {
71 LOG.debug("onBinaryFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
72 LOG.debug("events.onBinary={}",events.hasBinary());
73 LOG.debug("events.onBinaryStream={}",events.hasBinaryStream());
74 }
75 boolean handled = false;
76
77 if (events.hasBinary())
78 {
79 handled = true;
80 if (activeMessage == null)
81 {
82 if (events.isBinaryPartialSupported())
83 {
84
85 if (LOG.isDebugEnabled())
86 {
87 LOG.debug("Partial Binary Message: fin={}",fin);
88 }
89 activeMessage = new BinaryPartialOnMessage(this);
90 }
91 else
92 {
93
94 if (LOG.isDebugEnabled())
95 {
96 LOG.debug("Whole Binary Message");
97 }
98 activeMessage = new SimpleBinaryMessage(this);
99 }
100 }
101 }
102
103 if (events.hasBinaryStream())
104 {
105 handled = true;
106
107 if (activeMessage == null)
108 {
109 if (LOG.isDebugEnabled())
110 {
111 LOG.debug("Binary Message InputStream");
112 }
113 final MessageInputStream stream = new MessageInputStream();
114 activeMessage = stream;
115
116
117 dispatch(new Runnable()
118 {
119 @Override
120 public void run()
121 {
122 try
123 {
124 events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
125 }
126 catch (DecodeException | IOException e)
127 {
128 onFatalError(e);
129 }
130 }
131 });
132 }
133 }
134
135 if (LOG.isDebugEnabled())
136 {
137 LOG.debug("handled = {}",handled);
138 }
139
140
141 if (handled && (activeMessage != null))
142 {
143 appendMessage(buffer,fin);
144 }
145 }
146
147
148
149
150 @Override
151 public void onBinaryMessage(byte[] data)
152 {
153 if (data == null)
154 {
155 return;
156 }
157
158 ByteBuffer buf = ByteBuffer.wrap(data);
159
160 if (LOG.isDebugEnabled())
161 {
162 LOG.debug("onBinaryMessage({})",BufferUtil.toDetailString(buf));
163 }
164
165 try
166 {
167
168 events.callBinary(jsrsession.getAsyncRemote(),websocket,buf,true);
169 }
170 catch (DecodeException e)
171 {
172 onFatalError(e);
173 }
174 }
175
176 @Override
177 protected void onClose(CloseReason closereason)
178 {
179 events.callClose(websocket,closereason);
180 }
181
182 @Override
183 public void onConnect()
184 {
185 events.callOpen(websocket,config);
186 }
187
188 @Override
189 public void onError(Throwable cause)
190 {
191 events.callError(websocket,cause);
192 }
193
194 private void onFatalError(Throwable t)
195 {
196 onError(t);
197 }
198
199 @Override
200 public void onFrame(Frame frame)
201 {
202
203 }
204
205 @Override
206 public void onInputStream(InputStream stream)
207 {
208 try
209 {
210 events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
211 }
212 catch (DecodeException | IOException e)
213 {
214 onFatalError(e);
215 }
216 }
217
218 public void onPartialBinaryMessage(ByteBuffer buffer, boolean fin)
219 {
220 try
221 {
222 events.callBinary(jsrsession.getAsyncRemote(),websocket,buffer,fin);
223 }
224 catch (DecodeException e)
225 {
226 onFatalError(e);
227 }
228 }
229
230 public void onPartialTextMessage(String message, boolean fin)
231 {
232 try
233 {
234 events.callText(jsrsession.getAsyncRemote(),websocket,message,fin);
235 }
236 catch (DecodeException e)
237 {
238 onFatalError(e);
239 }
240 }
241
242 @Override
243 public void onPing(ByteBuffer buffer)
244 {
245 try
246 {
247 events.callPong(jsrsession.getAsyncRemote(),websocket,buffer);
248 }
249 catch (DecodeException | IOException e)
250 {
251 onFatalError(e);
252 }
253 }
254
255 @Override
256 public void onPong(ByteBuffer buffer)
257 {
258 try
259 {
260 events.callPong(jsrsession.getAsyncRemote(),websocket,buffer);
261 }
262 catch (DecodeException | IOException e)
263 {
264 onFatalError(e);
265 }
266 }
267
268 @Override
269 public void onReader(Reader reader)
270 {
271 try
272 {
273 events.callTextStream(jsrsession.getAsyncRemote(),websocket,reader);
274 }
275 catch (DecodeException | IOException e)
276 {
277 onFatalError(e);
278 }
279 }
280
281
282
283
284 @Override
285 public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
286 {
287 if (LOG.isDebugEnabled())
288 {
289 LOG.debug("onTextFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
290 LOG.debug("events.hasText={}",events.hasText());
291 LOG.debug("events.hasTextStream={}",events.hasTextStream());
292 }
293
294 boolean handled = false;
295
296 if (events.hasText())
297 {
298 handled = true;
299 if (activeMessage == null)
300 {
301 if (events.isTextPartialSupported())
302 {
303
304 if (LOG.isDebugEnabled())
305 {
306 LOG.debug("Partial Text Message: fin={}",fin);
307 }
308 activeMessage = new TextPartialOnMessage(this);
309 }
310 else
311 {
312
313 if (LOG.isDebugEnabled())
314 {
315 LOG.debug("Whole Text Message");
316 }
317 activeMessage = new SimpleTextMessage(this);
318 }
319 }
320 }
321
322 if (events.hasTextStream())
323 {
324 handled = true;
325
326 if (activeMessage == null)
327 {
328 if (LOG.isDebugEnabled())
329 {
330 LOG.debug("Text Message Writer");
331 }
332
333 final MessageReader stream = new MessageReader(new MessageInputStream());
334 activeMessage = stream;
335
336
337 dispatch(new Runnable()
338 {
339 @Override
340 public void run()
341 {
342 try
343 {
344 events.callTextStream(jsrsession.getAsyncRemote(),websocket,stream);
345 }
346 catch (DecodeException | IOException e)
347 {
348 onFatalError(e);
349 }
350 }
351 });
352 }
353 }
354
355 if (LOG.isDebugEnabled())
356 {
357 LOG.debug("handled = {}", handled);
358 }
359
360
361 if (handled && (activeMessage != null))
362 {
363 appendMessage(buffer,fin);
364 }
365 }
366
367
368
369
370 @Override
371 public void onTextMessage(String message)
372 {
373 if (LOG.isDebugEnabled())
374 {
375 LOG.debug("onText({})",message);
376 }
377
378 try
379 {
380
381 events.callText(jsrsession.getAsyncRemote(),websocket,message,true);
382 }
383 catch (DecodeException e)
384 {
385 onFatalError(e);
386 }
387 }
388
389 @Override
390 public void setPathParameters(Map<String, String> pathParameters)
391 {
392 events.setPathParameters(pathParameters);
393 }
394
395 @Override
396 public String toString()
397 {
398 return String.format("%s[websocket=%s]",this.getClass().getSimpleName(),websocket);
399 }
400 }