View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.jsr356.endpoints;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.Reader;
24  import java.nio.ByteBuffer;
25  import java.util.Map;
26  
27  import javax.websocket.CloseReason;
28  import javax.websocket.DecodeException;
29  import javax.websocket.MessageHandler.Whole;
30  
31  import org.eclipse.jetty.util.BufferUtil;
32  import org.eclipse.jetty.util.log.Log;
33  import org.eclipse.jetty.util.log.Logger;
34  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
35  import org.eclipse.jetty.websocket.api.extensions.Frame;
36  import org.eclipse.jetty.websocket.common.events.EventDriver;
37  import org.eclipse.jetty.websocket.common.message.MessageInputStream;
38  import org.eclipse.jetty.websocket.common.message.MessageReader;
39  import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
40  import org.eclipse.jetty.websocket.common.message.SimpleTextMessage;
41  import org.eclipse.jetty.websocket.jsr356.JsrSession;
42  import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
43  import org.eclipse.jetty.websocket.jsr356.messages.BinaryPartialOnMessage;
44  import org.eclipse.jetty.websocket.jsr356.messages.TextPartialOnMessage;
45  
46  /**
47   * Base implementation for JSR-356 Annotated event drivers.
48   */
49  public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements EventDriver
50  {
51      private static final Logger LOG = Log.getLogger(JsrAnnotatedEventDriver.class);
52      private final JsrEvents<?, ?> events;
53  
54      public JsrAnnotatedEventDriver(WebSocketPolicy policy, EndpointInstance endpointInstance, JsrEvents<?, ?> events)
55      {
56          super(policy,endpointInstance);
57          this.events = events;
58      }
59  
60      @Override
61      public void init(JsrSession jsrsession)
62      {
63          this.events.init(jsrsession);
64      }
65  
66      /**
67       * Entry point for all incoming binary frames.
68       */
69      @Override
70      public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
71      {
72          if (LOG.isDebugEnabled())
73          {
74              LOG.debug("onBinaryFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
75              LOG.debug("events.onBinary={}",events.hasBinary());
76              LOG.debug("events.onBinaryStream={}",events.hasBinaryStream());
77          }
78          boolean handled = false;
79  
80          if (events.hasBinary())
81          {
82              handled = true;
83              if (activeMessage == null)
84              {
85                  if (events.isBinaryPartialSupported())
86                  {
87                      // Partial Message Support (does not use messageAppender)
88                      LOG.debug("Partial Binary Message: fin={}",fin);
89                      activeMessage = new BinaryPartialOnMessage(this);
90                  }
91                  else
92                  {
93                      // Whole Message Support
94                      LOG.debug("Whole Binary Message");
95                      activeMessage = new SimpleBinaryMessage(this);
96                  }
97              }
98          }
99  
100         if (events.hasBinaryStream())
101         {
102             handled = true;
103             // Streaming Message Support
104             if (activeMessage == null)
105             {
106                 LOG.debug("Binary Message InputStream");
107                 final MessageInputStream stream = new MessageInputStream(session.getConnection());
108                 activeMessage = stream;
109 
110                 // Always dispatch streaming read to another thread.
111                 dispatch(new Runnable()
112                 {
113                     @Override
114                     public void run()
115                     {
116                         try
117                         {
118                             events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
119                         }
120                         catch (DecodeException | IOException e)
121                         {
122                             onFatalError(e);
123                         }
124                     }
125                 });
126             }
127         }
128 
129         LOG.debug("handled = {}",handled);
130 
131         // Process any active MessageAppender
132         if (handled && (activeMessage != null))
133         {
134             appendMessage(buffer,fin);
135         }
136     }
137 
138     /**
139      * Entry point for binary frames destined for {@link Whole}
140      */
141     @Override
142     public void onBinaryMessage(byte[] data)
143     {
144         if (data == null)
145         {
146             return;
147         }
148 
149         ByteBuffer buf = ByteBuffer.wrap(data);
150 
151         if (LOG.isDebugEnabled())
152         {
153             LOG.debug("onBinaryMessage({})",BufferUtil.toDetailString(buf));
154         }
155 
156         try
157         {
158             // FIN is always true here
159             events.callBinary(jsrsession.getAsyncRemote(),websocket,buf,true);
160         }
161         catch (DecodeException e)
162         {
163             onFatalError(e);
164         }
165     }
166 
167     @Override
168     protected void onClose(CloseReason closereason)
169     {
170         events.callClose(websocket,closereason);
171     }
172 
173     @Override
174     public void onConnect()
175     {
176         events.callOpen(websocket,config);
177     }
178 
179     @Override
180     public void onError(Throwable cause)
181     {
182         events.callError(websocket,cause);
183     }
184 
185     private void onFatalError(Throwable t)
186     {
187         onError(t);
188     }
189 
190     @Override
191     public void onFrame(Frame frame)
192     {
193         /* Ignored in JSR-356 */
194     }
195 
196     @Override
197     public void onInputStream(InputStream stream)
198     {
199         try
200         {
201             events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
202         }
203         catch (DecodeException | IOException e)
204         {
205             onFatalError(e);
206         }
207     }
208 
209     public void onPartialBinaryMessage(ByteBuffer buffer, boolean fin)
210     {
211         try
212         {
213             events.callBinary(jsrsession.getAsyncRemote(),websocket,buffer,fin);
214         }
215         catch (DecodeException e)
216         {
217             onFatalError(e);
218         }
219     }
220 
221     public void onPartialTextMessage(String message, boolean fin)
222     {
223         try
224         {
225             events.callText(jsrsession.getAsyncRemote(),websocket,message,fin);
226         }
227         catch (DecodeException e)
228         {
229             onFatalError(e);
230         }
231     }
232 
233     @Override
234     public void onPong(ByteBuffer buffer)
235     {
236         try
237         {
238             events.callPong(jsrsession.getAsyncRemote(),websocket,buffer);
239         }
240         catch (DecodeException | IOException e)
241         {
242             onFatalError(e);
243         }
244     }
245 
246     @Override
247     public void onReader(Reader reader)
248     {
249         try
250         {
251             events.callTextStream(jsrsession.getAsyncRemote(),websocket,reader);
252         }
253         catch (DecodeException | IOException e)
254         {
255             onFatalError(e);
256         }
257     }
258 
259     /**
260      * Entry point for all incoming text frames.
261      */
262     @Override
263     public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
264     {
265         if (LOG.isDebugEnabled())
266         {
267             LOG.debug("onTextFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
268             LOG.debug("events.hasText={}",events.hasText());
269             LOG.debug("events.hasTextStream={}",events.hasTextStream());
270         }
271 
272         boolean handled = false;
273 
274         if (events.hasText())
275         {
276             handled = true;
277             if (activeMessage == null)
278             {
279                 if (events.isTextPartialSupported())
280                 {
281                     // Partial Message Support
282                     LOG.debug("Partial Text Message: fin={}",fin);
283                     activeMessage = new TextPartialOnMessage(this);
284                 }
285                 else
286                 {
287                     // Whole Message Support
288                     LOG.debug("Whole Text Message");
289                     activeMessage = new SimpleTextMessage(this);
290                 }
291             }
292         }
293 
294         if (events.hasTextStream())
295         {
296             handled = true;
297             // Streaming Message Support
298             if (activeMessage == null)
299             {
300                 LOG.debug("Text Message Writer");
301 
302                 final MessageReader stream = new MessageReader(new MessageInputStream(session.getConnection()));
303                 activeMessage = stream;
304 
305                 // Always dispatch streaming read to another thread.
306                 dispatch(new Runnable()
307                 {
308                     @Override
309                     public void run()
310                     {
311                         try
312                         {
313                             events.callTextStream(jsrsession.getAsyncRemote(),websocket,stream);
314                         }
315                         catch (DecodeException | IOException e)
316                         {
317                             onFatalError(e);
318                         }
319                     }
320                 });
321             }
322         }
323 
324         LOG.debug("handled = {}",handled);
325 
326         // Process any active MessageAppender
327         if (handled && (activeMessage != null))
328         {
329             appendMessage(buffer,fin);
330         }
331     }
332 
333     /**
334      * Entry point for whole text messages
335      */
336     @Override
337     public void onTextMessage(String message)
338     {
339         LOG.debug("onText({})",message);
340 
341         try
342         {
343             // FIN is always true here
344             events.callText(jsrsession.getAsyncRemote(),websocket,message,true);
345         }
346         catch (DecodeException e)
347         {
348             onFatalError(e);
349         }
350     }
351 
352     @Override
353     public void setPathParameters(Map<String, String> pathParameters)
354     {
355         events.setPathParameters(pathParameters);
356     }
357 
358     @Override
359     public String toString()
360     {
361         return String.format("%s[websocket=%s]",this.getClass().getSimpleName(),websocket);
362     }
363 }