View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.jsr356.endpoints;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.Reader;
24  import java.nio.ByteBuffer;
25  import java.util.Map;
26  
27  import javax.websocket.CloseReason;
28  import javax.websocket.DecodeException;
29  import javax.websocket.MessageHandler.Whole;
30  
31  import org.eclipse.jetty.util.BufferUtil;
32  import org.eclipse.jetty.util.log.Log;
33  import org.eclipse.jetty.util.log.Logger;
34  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
35  import org.eclipse.jetty.websocket.api.extensions.Frame;
36  import org.eclipse.jetty.websocket.common.message.MessageInputStream;
37  import org.eclipse.jetty.websocket.common.message.MessageReader;
38  import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
39  import org.eclipse.jetty.websocket.common.message.SimpleTextMessage;
40  import org.eclipse.jetty.websocket.jsr356.JsrSession;
41  import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
42  import org.eclipse.jetty.websocket.jsr356.messages.BinaryPartialOnMessage;
43  import org.eclipse.jetty.websocket.jsr356.messages.TextPartialOnMessage;
44  
45  /**
46   * Base implementation for JSR-356 Annotated event drivers.
47   */
48  public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver
49  {
50      private static final Logger LOG = Log.getLogger(JsrAnnotatedEventDriver.class);
51      private final JsrEvents<?, ?> events;
52  
53      public JsrAnnotatedEventDriver(WebSocketPolicy policy, EndpointInstance endpointInstance, JsrEvents<?, ?> events)
54      {
55          super(policy,endpointInstance);
56          this.events = events;
57      }
58  
59      @Override
60      public void init(JsrSession jsrsession)
61      {
62          this.events.init(jsrsession);
63      }
64  
65      /**
66       * Entry point for all incoming binary frames.
67       */
68      @Override
69      public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
70      {
71          if (LOG.isDebugEnabled())
72          {
73              LOG.debug("onBinaryFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
74              LOG.debug("events.onBinary={}",events.hasBinary());
75              LOG.debug("events.onBinaryStream={}",events.hasBinaryStream());
76          }
77          boolean handled = false;
78  
79          if (events.hasBinary())
80          {
81              handled = true;
82              if (activeMessage == null)
83              {
84                  if (events.isBinaryPartialSupported())
85                  {
86                      // Partial Message Support (does not use messageAppender)
87                      if (LOG.isDebugEnabled())
88                      {
89                          LOG.debug("Partial Binary Message: fin={}",fin);
90                      }
91                      activeMessage = new BinaryPartialOnMessage(this);
92                  }
93                  else
94                  {
95                      // Whole Message Support
96                      if (LOG.isDebugEnabled())
97                      {
98                          LOG.debug("Whole Binary Message");
99                      }
100                     activeMessage = new SimpleBinaryMessage(this);
101                 }
102             }
103         }
104 
105         if (events.hasBinaryStream())
106         {
107             handled = true;
108             // Streaming Message Support
109             if (activeMessage == null)
110             {
111                 if (LOG.isDebugEnabled())
112                 {
113                     LOG.debug("Binary Message InputStream");
114                 }
115                 final MessageInputStream stream = new MessageInputStream();
116                 activeMessage = stream;
117 
118                 // Always dispatch streaming read to another thread.
119                 dispatch(new Runnable()
120                 {
121                     @Override
122                     public void run()
123                     {
124                         try
125                         {
126                             events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
127                         }
128                         catch (Throwable e)
129                         {
130                             onFatalError(e);
131                         }
132                     }
133                 });
134             }
135         }
136 
137         if (LOG.isDebugEnabled())
138         {
139             LOG.debug("handled = {}",handled);
140         }
141 
142         // Process any active MessageAppender
143         if (handled && (activeMessage != null))
144         {
145             appendMessage(buffer,fin);
146         }
147     }
148 
149     /**
150      * Entry point for binary frames destined for {@link Whole}
151      */
152     @Override
153     public void onBinaryMessage(byte[] data)
154     {
155         if (data == null)
156         {
157             return;
158         }
159 
160         ByteBuffer buf = ByteBuffer.wrap(data);
161 
162         if (LOG.isDebugEnabled())
163         {
164             LOG.debug("onBinaryMessage({})",BufferUtil.toDetailString(buf));
165         }
166 
167         try
168         {
169             // FIN is always true here
170             events.callBinary(jsrsession.getAsyncRemote(),websocket,buf,true);
171         }
172         catch (Throwable e)
173         {
174             onFatalError(e);
175         }
176     }
177 
178     @Override
179     protected void onClose(CloseReason closereason)
180     {
181         events.callClose(websocket,closereason);
182     }
183 
184     @Override
185     public void onConnect()
186     {
187         events.callOpen(websocket,config);
188     }
189 
190     @Override
191     public void onError(Throwable cause)
192     {
193         try
194         {
195             events.callError(websocket,cause);
196         }
197         catch (Throwable e)
198         {
199             LOG.warn("Unable to call onError with cause", cause);
200             LOG.warn("Call to onError resulted in exception", e);
201         }
202     }
203 
204     private void onFatalError(Throwable t)
205     {
206         onError(t);
207     }
208 
209     @Override
210     public void onFrame(Frame frame)
211     {
212         /* Ignored in JSR-356 */
213     }
214 
215     @Override
216     public void onInputStream(InputStream stream) throws IOException
217     {
218         try
219         {
220             events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
221         }
222         catch (DecodeException e)
223         {
224             throw new RuntimeException("Unable decode input stream", e);
225         }
226     }
227 
228     public void onPartialBinaryMessage(ByteBuffer buffer, boolean fin)
229     {
230         try
231         {
232             events.callBinary(jsrsession.getAsyncRemote(),websocket,buffer,fin);
233         }
234         catch (DecodeException e)
235         {
236             throw new RuntimeException("Unable decode partial binary message", e);
237         }
238     }
239 
240     public void onPartialTextMessage(String message, boolean fin)
241     {
242         try
243         {
244             events.callText(jsrsession.getAsyncRemote(),websocket,message,fin);
245         }
246         catch (DecodeException e)
247         {
248             throw new RuntimeException("Unable decode partial text message", e);
249         }
250     }
251 
252     @Override
253     public void onPing(ByteBuffer buffer)
254     {
255         // Call pong, as there is no "onPing" method in the JSR
256         events.callPong(jsrsession.getAsyncRemote(),websocket,buffer);
257     }
258     
259     @Override
260     public void onPong(ByteBuffer buffer)
261     {
262         events.callPong(jsrsession.getAsyncRemote(),websocket,buffer);
263     }
264 
265     @Override
266     public void onReader(Reader reader) throws IOException
267     {
268         try
269         {
270             events.callTextStream(jsrsession.getAsyncRemote(),websocket,reader);
271         }
272         catch (DecodeException e)
273         {
274             throw new RuntimeException("Unable decode reader", e);
275         }
276     }
277 
278     /**
279      * Entry point for all incoming text frames.
280      */
281     @Override
282     public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
283     {
284         if (LOG.isDebugEnabled())
285         {
286             LOG.debug("onTextFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
287             LOG.debug("events.hasText={}",events.hasText());
288             LOG.debug("events.hasTextStream={}",events.hasTextStream());
289         }
290 
291         boolean handled = false;
292 
293         if (events.hasText())
294         {
295             handled = true;
296             if (activeMessage == null)
297             {
298                 if (events.isTextPartialSupported())
299                 {
300                     // Partial Message Support
301                     if (LOG.isDebugEnabled())
302                     {
303                         LOG.debug("Partial Text Message: fin={}",fin);
304                     }
305                     activeMessage = new TextPartialOnMessage(this);
306                 }
307                 else
308                 {
309                     // Whole Message Support
310                     if (LOG.isDebugEnabled())
311                     {
312                         LOG.debug("Whole Text Message");
313                     }
314                     activeMessage = new SimpleTextMessage(this);
315                 }
316             }
317         }
318 
319         if (events.hasTextStream())
320         {
321             handled = true;
322             // Streaming Message Support
323             if (activeMessage == null)
324             {
325                 if (LOG.isDebugEnabled())
326                 {
327                     LOG.debug("Text Message Writer");
328                 }
329 
330                 final MessageReader stream = new MessageReader(new MessageInputStream());
331                 activeMessage = stream;
332 
333                 // Always dispatch streaming read to another thread.
334                 dispatch(new Runnable()
335                 {
336                     @Override
337                     public void run()
338                     {
339                         try
340                         {
341                             events.callTextStream(jsrsession.getAsyncRemote(),websocket,stream);
342                         }
343                         catch (Throwable e)
344                         {
345                             onFatalError(e);
346                         }
347                     }
348                 });
349             }
350         }
351 
352         if (LOG.isDebugEnabled())
353         {
354             LOG.debug("handled = {}", handled);
355         }
356 
357         // Process any active MessageAppender
358         if (handled && (activeMessage != null))
359         {
360             appendMessage(buffer,fin);
361         }
362     }
363 
364     /**
365      * Entry point for whole text messages
366      */
367     @Override
368     public void onTextMessage(String message)
369     {
370         if (LOG.isDebugEnabled())
371         {
372             LOG.debug("onText({})",message);
373         }
374 
375         try
376         {
377             // FIN is always true here
378             events.callText(jsrsession.getAsyncRemote(),websocket,message,true);
379         }
380         catch (Throwable e)
381         {
382             onFatalError(e);
383         }
384     }
385 
386     @Override
387     public void setPathParameters(Map<String, String> pathParameters)
388     {
389         events.setPathParameters(pathParameters);
390     }
391 
392     @Override
393     public String toString()
394     {
395         return String.format("%s[websocket=%s]",this.getClass().getSimpleName(),websocket);
396     }
397 }