1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.jsr356.endpoints;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.Reader;
24 import java.nio.ByteBuffer;
25 import java.util.Map;
26
27 import javax.websocket.CloseReason;
28 import javax.websocket.DecodeException;
29 import javax.websocket.MessageHandler.Whole;
30
31 import org.eclipse.jetty.util.BufferUtil;
32 import org.eclipse.jetty.util.log.Log;
33 import org.eclipse.jetty.util.log.Logger;
34 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
35 import org.eclipse.jetty.websocket.api.extensions.Frame;
36 import org.eclipse.jetty.websocket.common.message.MessageInputStream;
37 import org.eclipse.jetty.websocket.common.message.MessageReader;
38 import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
39 import org.eclipse.jetty.websocket.common.message.SimpleTextMessage;
40 import org.eclipse.jetty.websocket.jsr356.JsrSession;
41 import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
42 import org.eclipse.jetty.websocket.jsr356.messages.BinaryPartialOnMessage;
43 import org.eclipse.jetty.websocket.jsr356.messages.TextPartialOnMessage;
44
45
46
47
48 public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver
49 {
50 private static final Logger LOG = Log.getLogger(JsrAnnotatedEventDriver.class);
51 private final JsrEvents<?, ?> events;
52
53 public JsrAnnotatedEventDriver(WebSocketPolicy policy, EndpointInstance endpointInstance, JsrEvents<?, ?> events)
54 {
55 super(policy,endpointInstance);
56 this.events = events;
57 }
58
59 @Override
60 public void init(JsrSession jsrsession)
61 {
62 this.events.init(jsrsession);
63 }
64
65
66
67
68 @Override
69 public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
70 {
71 if (LOG.isDebugEnabled())
72 {
73 LOG.debug("onBinaryFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
74 LOG.debug("events.onBinary={}",events.hasBinary());
75 LOG.debug("events.onBinaryStream={}",events.hasBinaryStream());
76 }
77 boolean handled = false;
78
79 if (events.hasBinary())
80 {
81 handled = true;
82 if (activeMessage == null)
83 {
84 if (events.isBinaryPartialSupported())
85 {
86
87 if (LOG.isDebugEnabled())
88 {
89 LOG.debug("Partial Binary Message: fin={}",fin);
90 }
91 activeMessage = new BinaryPartialOnMessage(this);
92 }
93 else
94 {
95
96 if (LOG.isDebugEnabled())
97 {
98 LOG.debug("Whole Binary Message");
99 }
100 activeMessage = new SimpleBinaryMessage(this);
101 }
102 }
103 }
104
105 if (events.hasBinaryStream())
106 {
107 handled = true;
108
109 if (activeMessage == null)
110 {
111 if (LOG.isDebugEnabled())
112 {
113 LOG.debug("Binary Message InputStream");
114 }
115 final MessageInputStream stream = new MessageInputStream();
116 activeMessage = stream;
117
118
119 dispatch(new Runnable()
120 {
121 @Override
122 public void run()
123 {
124 try
125 {
126 events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
127 }
128 catch (Throwable e)
129 {
130 onFatalError(e);
131 }
132 }
133 });
134 }
135 }
136
137 if (LOG.isDebugEnabled())
138 {
139 LOG.debug("handled = {}",handled);
140 }
141
142
143 if (handled && (activeMessage != null))
144 {
145 appendMessage(buffer,fin);
146 }
147 }
148
149
150
151
152 @Override
153 public void onBinaryMessage(byte[] data)
154 {
155 if (data == null)
156 {
157 return;
158 }
159
160 ByteBuffer buf = ByteBuffer.wrap(data);
161
162 if (LOG.isDebugEnabled())
163 {
164 LOG.debug("onBinaryMessage({})",BufferUtil.toDetailString(buf));
165 }
166
167 try
168 {
169
170 events.callBinary(jsrsession.getAsyncRemote(),websocket,buf,true);
171 }
172 catch (Throwable e)
173 {
174 onFatalError(e);
175 }
176 }
177
178 @Override
179 protected void onClose(CloseReason closereason)
180 {
181 events.callClose(websocket,closereason);
182 }
183
184 @Override
185 public void onConnect()
186 {
187 events.callOpen(websocket,config);
188 }
189
190 @Override
191 public void onError(Throwable cause)
192 {
193 try
194 {
195 events.callError(websocket,cause);
196 }
197 catch (Throwable e)
198 {
199 LOG.warn("Unable to call onError with cause", cause);
200 LOG.warn("Call to onError resulted in exception", e);
201 }
202 }
203
204 private void onFatalError(Throwable t)
205 {
206 onError(t);
207 }
208
209 @Override
210 public void onFrame(Frame frame)
211 {
212
213 }
214
215 @Override
216 public void onInputStream(InputStream stream) throws IOException
217 {
218 try
219 {
220 events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
221 }
222 catch (DecodeException e)
223 {
224 throw new RuntimeException("Unable decode input stream", e);
225 }
226 }
227
228 public void onPartialBinaryMessage(ByteBuffer buffer, boolean fin)
229 {
230 try
231 {
232 events.callBinary(jsrsession.getAsyncRemote(),websocket,buffer,fin);
233 }
234 catch (DecodeException e)
235 {
236 throw new RuntimeException("Unable decode partial binary message", e);
237 }
238 }
239
240 public void onPartialTextMessage(String message, boolean fin)
241 {
242 try
243 {
244 events.callText(jsrsession.getAsyncRemote(),websocket,message,fin);
245 }
246 catch (DecodeException e)
247 {
248 throw new RuntimeException("Unable decode partial text message", e);
249 }
250 }
251
252 @Override
253 public void onPing(ByteBuffer buffer)
254 {
255
256 events.callPong(jsrsession.getAsyncRemote(),websocket,buffer);
257 }
258
259 @Override
260 public void onPong(ByteBuffer buffer)
261 {
262 events.callPong(jsrsession.getAsyncRemote(),websocket,buffer);
263 }
264
265 @Override
266 public void onReader(Reader reader) throws IOException
267 {
268 try
269 {
270 events.callTextStream(jsrsession.getAsyncRemote(),websocket,reader);
271 }
272 catch (DecodeException e)
273 {
274 throw new RuntimeException("Unable decode reader", e);
275 }
276 }
277
278
279
280
281 @Override
282 public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
283 {
284 if (LOG.isDebugEnabled())
285 {
286 LOG.debug("onTextFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
287 LOG.debug("events.hasText={}",events.hasText());
288 LOG.debug("events.hasTextStream={}",events.hasTextStream());
289 }
290
291 boolean handled = false;
292
293 if (events.hasText())
294 {
295 handled = true;
296 if (activeMessage == null)
297 {
298 if (events.isTextPartialSupported())
299 {
300
301 if (LOG.isDebugEnabled())
302 {
303 LOG.debug("Partial Text Message: fin={}",fin);
304 }
305 activeMessage = new TextPartialOnMessage(this);
306 }
307 else
308 {
309
310 if (LOG.isDebugEnabled())
311 {
312 LOG.debug("Whole Text Message");
313 }
314 activeMessage = new SimpleTextMessage(this);
315 }
316 }
317 }
318
319 if (events.hasTextStream())
320 {
321 handled = true;
322
323 if (activeMessage == null)
324 {
325 if (LOG.isDebugEnabled())
326 {
327 LOG.debug("Text Message Writer");
328 }
329
330 final MessageReader stream = new MessageReader(new MessageInputStream());
331 activeMessage = stream;
332
333
334 dispatch(new Runnable()
335 {
336 @Override
337 public void run()
338 {
339 try
340 {
341 events.callTextStream(jsrsession.getAsyncRemote(),websocket,stream);
342 }
343 catch (Throwable e)
344 {
345 onFatalError(e);
346 }
347 }
348 });
349 }
350 }
351
352 if (LOG.isDebugEnabled())
353 {
354 LOG.debug("handled = {}", handled);
355 }
356
357
358 if (handled && (activeMessage != null))
359 {
360 appendMessage(buffer,fin);
361 }
362 }
363
364
365
366
367 @Override
368 public void onTextMessage(String message)
369 {
370 if (LOG.isDebugEnabled())
371 {
372 LOG.debug("onText({})",message);
373 }
374
375 try
376 {
377
378 events.callText(jsrsession.getAsyncRemote(),websocket,message,true);
379 }
380 catch (Throwable e)
381 {
382 onFatalError(e);
383 }
384 }
385
386 @Override
387 public void setPathParameters(Map<String, String> pathParameters)
388 {
389 events.setPathParameters(pathParameters);
390 }
391
392 @Override
393 public String toString()
394 {
395 return String.format("%s[websocket=%s]",this.getClass().getSimpleName(),websocket);
396 }
397 }