1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.jsr356.endpoints;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.Reader;
24 import java.nio.ByteBuffer;
25 import java.util.Map;
26 import javax.websocket.CloseReason;
27 import javax.websocket.DecodeException;
28
29 import org.eclipse.jetty.util.BufferUtil;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
33 import org.eclipse.jetty.websocket.api.extensions.Frame;
34 import org.eclipse.jetty.websocket.common.events.EventDriver;
35 import org.eclipse.jetty.websocket.common.message.MessageInputStream;
36 import org.eclipse.jetty.websocket.common.message.MessageReader;
37 import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
38 import org.eclipse.jetty.websocket.common.message.SimpleTextMessage;
39 import org.eclipse.jetty.websocket.jsr356.JsrSession;
40 import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
41 import org.eclipse.jetty.websocket.jsr356.messages.BinaryPartialOnMessage;
42 import org.eclipse.jetty.websocket.jsr356.messages.TextPartialOnMessage;
43
44
45
46
47 public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements EventDriver
48 {
49 private static final Logger LOG = Log.getLogger(JsrAnnotatedEventDriver.class);
50 private final JsrEvents<?, ?> events;
51
52 public JsrAnnotatedEventDriver(WebSocketPolicy policy, EndpointInstance endpointInstance, JsrEvents<?, ?> events)
53 {
54 super(policy,endpointInstance);
55 this.events = events;
56 }
57
58 @Override
59 public void init(JsrSession jsrsession)
60 {
61 this.events.init(jsrsession);
62 }
63
64
65
66
67 @Override
68 public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
69 {
70 if (LOG.isDebugEnabled())
71 {
72 LOG.debug("onBinaryFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
73 LOG.debug("events.onBinary={}",events.hasBinary());
74 LOG.debug("events.onBinaryStream={}",events.hasBinaryStream());
75 }
76 boolean handled = false;
77
78 if (events.hasBinary())
79 {
80 handled = true;
81 if (activeMessage == null)
82 {
83 if (events.isBinaryPartialSupported())
84 {
85
86 LOG.debug("Partial Binary Message: fin={}",fin);
87 activeMessage = new BinaryPartialOnMessage(this);
88 }
89 else
90 {
91
92 LOG.debug("Whole Binary Message");
93 activeMessage = new SimpleBinaryMessage(this);
94 }
95 }
96 }
97
98 if (events.hasBinaryStream())
99 {
100 handled = true;
101
102 if (activeMessage == null)
103 {
104 LOG.debug("Binary Message InputStream");
105 final MessageInputStream stream = new MessageInputStream();
106 activeMessage = stream;
107
108
109 dispatch(new Runnable()
110 {
111 @Override
112 public void run()
113 {
114 try
115 {
116 events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
117 }
118 catch (DecodeException | IOException e)
119 {
120 onFatalError(e);
121 }
122 }
123 });
124 }
125 }
126
127 LOG.debug("handled = {}",handled);
128
129
130 if (handled && (activeMessage != null))
131 {
132 appendMessage(buffer,fin);
133 }
134 }
135
136
137
138
139 @Override
140 public void onBinaryMessage(byte[] data)
141 {
142 if (data == null)
143 {
144 return;
145 }
146
147 ByteBuffer buf = ByteBuffer.wrap(data);
148
149 if (LOG.isDebugEnabled())
150 {
151 LOG.debug("onBinaryMessage({})",BufferUtil.toDetailString(buf));
152 }
153
154 try
155 {
156
157 events.callBinary(jsrsession.getAsyncRemote(),websocket,buf,true);
158 }
159 catch (DecodeException e)
160 {
161 onFatalError(e);
162 }
163 }
164
165 @Override
166 protected void onClose(CloseReason closereason)
167 {
168 events.callClose(websocket,closereason);
169 }
170
171 @Override
172 public void onConnect()
173 {
174 events.callOpen(websocket,config);
175 }
176
177 @Override
178 public void onError(Throwable cause)
179 {
180 events.callError(websocket,cause);
181 }
182
183 private void onFatalError(Throwable t)
184 {
185 onError(t);
186 }
187
188 @Override
189 public void onFrame(Frame frame)
190 {
191
192 }
193
194 @Override
195 public void onInputStream(InputStream stream)
196 {
197 try
198 {
199 events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
200 }
201 catch (DecodeException | IOException e)
202 {
203 onFatalError(e);
204 }
205 }
206
207 public void onPartialBinaryMessage(ByteBuffer buffer, boolean fin)
208 {
209 try
210 {
211 events.callBinary(jsrsession.getAsyncRemote(),websocket,buffer,fin);
212 }
213 catch (DecodeException e)
214 {
215 onFatalError(e);
216 }
217 }
218
219 public void onPartialTextMessage(String message, boolean fin)
220 {
221 try
222 {
223 events.callText(jsrsession.getAsyncRemote(),websocket,message,fin);
224 }
225 catch (DecodeException e)
226 {
227 onFatalError(e);
228 }
229 }
230
231 @Override
232 public void onPing(ByteBuffer buffer)
233 {
234 try
235 {
236 events.callPong(jsrsession.getAsyncRemote(),websocket,buffer);
237 }
238 catch (DecodeException | IOException e)
239 {
240 onFatalError(e);
241 }
242 }
243
244 @Override
245 public void onPong(ByteBuffer buffer)
246 {
247 try
248 {
249 events.callPong(jsrsession.getAsyncRemote(),websocket,buffer);
250 }
251 catch (DecodeException | IOException e)
252 {
253 onFatalError(e);
254 }
255 }
256
257 @Override
258 public void onReader(Reader reader)
259 {
260 try
261 {
262 events.callTextStream(jsrsession.getAsyncRemote(),websocket,reader);
263 }
264 catch (DecodeException | IOException e)
265 {
266 onFatalError(e);
267 }
268 }
269
270
271
272
273 @Override
274 public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
275 {
276 if (LOG.isDebugEnabled())
277 {
278 LOG.debug("onTextFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
279 LOG.debug("events.hasText={}",events.hasText());
280 LOG.debug("events.hasTextStream={}",events.hasTextStream());
281 }
282
283 boolean handled = false;
284
285 if (events.hasText())
286 {
287 handled = true;
288 if (activeMessage == null)
289 {
290 if (events.isTextPartialSupported())
291 {
292
293 LOG.debug("Partial Text Message: fin={}",fin);
294 activeMessage = new TextPartialOnMessage(this);
295 }
296 else
297 {
298
299 LOG.debug("Whole Text Message");
300 activeMessage = new SimpleTextMessage(this);
301 }
302 }
303 }
304
305 if (events.hasTextStream())
306 {
307 handled = true;
308
309 if (activeMessage == null)
310 {
311 LOG.debug("Text Message Writer");
312
313 final MessageReader stream = new MessageReader(new MessageInputStream());
314 activeMessage = stream;
315
316
317 dispatch(new Runnable()
318 {
319 @Override
320 public void run()
321 {
322 try
323 {
324 events.callTextStream(jsrsession.getAsyncRemote(),websocket,stream);
325 }
326 catch (DecodeException | IOException e)
327 {
328 onFatalError(e);
329 }
330 }
331 });
332 }
333 }
334
335 LOG.debug("handled = {}",handled);
336
337
338 if (handled && (activeMessage != null))
339 {
340 appendMessage(buffer,fin);
341 }
342 }
343
344
345
346
347 @Override
348 public void onTextMessage(String message)
349 {
350 LOG.debug("onText({})",message);
351
352 try
353 {
354
355 events.callText(jsrsession.getAsyncRemote(),websocket,message,true);
356 }
357 catch (DecodeException e)
358 {
359 onFatalError(e);
360 }
361 }
362
363 @Override
364 public void setPathParameters(Map<String, String> pathParameters)
365 {
366 events.setPathParameters(pathParameters);
367 }
368
369 @Override
370 public String toString()
371 {
372 return String.format("%s[websocket=%s]",this.getClass().getSimpleName(),websocket);
373 }
374 }