View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.jsr356.endpoints;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.Reader;
24  import java.nio.ByteBuffer;
25  import java.util.Map;
26  import javax.websocket.CloseReason;
27  import javax.websocket.DecodeException;
28  
29  import org.eclipse.jetty.util.BufferUtil;
30  import org.eclipse.jetty.util.log.Log;
31  import org.eclipse.jetty.util.log.Logger;
32  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
33  import org.eclipse.jetty.websocket.api.extensions.Frame;
34  import org.eclipse.jetty.websocket.common.message.MessageInputStream;
35  import org.eclipse.jetty.websocket.common.message.MessageReader;
36  import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
37  import org.eclipse.jetty.websocket.common.message.SimpleTextMessage;
38  import org.eclipse.jetty.websocket.jsr356.JsrSession;
39  import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
40  import org.eclipse.jetty.websocket.jsr356.messages.BinaryPartialOnMessage;
41  import org.eclipse.jetty.websocket.jsr356.messages.TextPartialOnMessage;
42  
43  /**
44   * Base implementation for JSR-356 Annotated event drivers.
45   */
46  public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver
47  {
48      private static final Logger LOG = Log.getLogger(JsrAnnotatedEventDriver.class);
49      private final JsrEvents<?, ?> events;
50  
51      public JsrAnnotatedEventDriver(WebSocketPolicy policy, EndpointInstance endpointInstance, JsrEvents<?, ?> events)
52      {
53          super(policy,endpointInstance);
54          this.events = events;
55      }
56  
57      @Override
58      public void init(JsrSession jsrsession)
59      {
60          this.events.init(jsrsession);
61      }
62  
63      /**
64       * Entry point for all incoming binary frames.
65       */
66      @Override
67      public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
68      {
69          if (LOG.isDebugEnabled())
70          {
71              LOG.debug("onBinaryFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
72              LOG.debug("events.onBinary={}",events.hasBinary());
73              LOG.debug("events.onBinaryStream={}",events.hasBinaryStream());
74          }
75          boolean handled = false;
76  
77          if (events.hasBinary())
78          {
79              handled = true;
80              if (activeMessage == null)
81              {
82                  if (events.isBinaryPartialSupported())
83                  {
84                      // Partial Message Support (does not use messageAppender)
85                      if (LOG.isDebugEnabled())
86                      {
87                          LOG.debug("Partial Binary Message: fin={}",fin);
88                      }
89                      activeMessage = new BinaryPartialOnMessage(this);
90                  }
91                  else
92                  {
93                      // Whole Message Support
94                      if (LOG.isDebugEnabled())
95                      {
96                          LOG.debug("Whole Binary Message");
97                      }
98                      activeMessage = new SimpleBinaryMessage(this);
99                  }
100             }
101         }
102 
103         if (events.hasBinaryStream())
104         {
105             handled = true;
106             // Streaming Message Support
107             if (activeMessage == null)
108             {
109                 if (LOG.isDebugEnabled())
110                 {
111                     LOG.debug("Binary Message InputStream");
112                 }
113                 final MessageInputStream stream = new MessageInputStream();
114                 activeMessage = stream;
115 
116                 // Always dispatch streaming read to another thread.
117                 dispatch(new Runnable()
118                 {
119                     @Override
120                     public void run()
121                     {
122                         try
123                         {
124                             events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
125                         }
126                         catch (DecodeException | IOException e)
127                         {
128                             onFatalError(e);
129                         }
130                     }
131                 });
132             }
133         }
134 
135         if (LOG.isDebugEnabled())
136         {
137             LOG.debug("handled = {}",handled);
138         }
139 
140         // Process any active MessageAppender
141         if (handled && (activeMessage != null))
142         {
143             appendMessage(buffer,fin);
144         }
145     }
146 
147     /**
148      * Entry point for binary frames destined for {@link Whole}
149      */
150     @Override
151     public void onBinaryMessage(byte[] data)
152     {
153         if (data == null)
154         {
155             return;
156         }
157 
158         ByteBuffer buf = ByteBuffer.wrap(data);
159 
160         if (LOG.isDebugEnabled())
161         {
162             LOG.debug("onBinaryMessage({})",BufferUtil.toDetailString(buf));
163         }
164 
165         try
166         {
167             // FIN is always true here
168             events.callBinary(jsrsession.getAsyncRemote(),websocket,buf,true);
169         }
170         catch (DecodeException e)
171         {
172             onFatalError(e);
173         }
174     }
175 
176     @Override
177     protected void onClose(CloseReason closereason)
178     {
179         events.callClose(websocket,closereason);
180     }
181 
182     @Override
183     public void onConnect()
184     {
185         events.callOpen(websocket,config);
186     }
187 
188     @Override
189     public void onError(Throwable cause)
190     {
191         events.callError(websocket,cause);
192     }
193 
194     private void onFatalError(Throwable t)
195     {
196         onError(t);
197     }
198 
199     @Override
200     public void onFrame(Frame frame)
201     {
202         /* Ignored in JSR-356 */
203     }
204 
205     @Override
206     public void onInputStream(InputStream stream)
207     {
208         try
209         {
210             events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
211         }
212         catch (DecodeException | IOException e)
213         {
214             onFatalError(e);
215         }
216     }
217 
218     public void onPartialBinaryMessage(ByteBuffer buffer, boolean fin)
219     {
220         try
221         {
222             events.callBinary(jsrsession.getAsyncRemote(),websocket,buffer,fin);
223         }
224         catch (DecodeException e)
225         {
226             onFatalError(e);
227         }
228     }
229 
230     public void onPartialTextMessage(String message, boolean fin)
231     {
232         try
233         {
234             events.callText(jsrsession.getAsyncRemote(),websocket,message,fin);
235         }
236         catch (DecodeException e)
237         {
238             onFatalError(e);
239         }
240     }
241 
242     @Override
243     public void onPing(ByteBuffer buffer)
244     {
245         try
246         {
247             events.callPong(jsrsession.getAsyncRemote(),websocket,buffer);
248         }
249         catch (DecodeException | IOException e)
250         {
251             onFatalError(e);
252         }
253     }
254     
255     @Override
256     public void onPong(ByteBuffer buffer)
257     {
258         try
259         {
260             events.callPong(jsrsession.getAsyncRemote(),websocket,buffer);
261         }
262         catch (DecodeException | IOException e)
263         {
264             onFatalError(e);
265         }
266     }
267 
268     @Override
269     public void onReader(Reader reader)
270     {
271         try
272         {
273             events.callTextStream(jsrsession.getAsyncRemote(),websocket,reader);
274         }
275         catch (DecodeException | IOException e)
276         {
277             onFatalError(e);
278         }
279     }
280 
281     /**
282      * Entry point for all incoming text frames.
283      */
284     @Override
285     public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
286     {
287         if (LOG.isDebugEnabled())
288         {
289             LOG.debug("onTextFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
290             LOG.debug("events.hasText={}",events.hasText());
291             LOG.debug("events.hasTextStream={}",events.hasTextStream());
292         }
293 
294         boolean handled = false;
295 
296         if (events.hasText())
297         {
298             handled = true;
299             if (activeMessage == null)
300             {
301                 if (events.isTextPartialSupported())
302                 {
303                     // Partial Message Support
304                     if (LOG.isDebugEnabled())
305                     {
306                         LOG.debug("Partial Text Message: fin={}",fin);
307                     }
308                     activeMessage = new TextPartialOnMessage(this);
309                 }
310                 else
311                 {
312                     // Whole Message Support
313                     if (LOG.isDebugEnabled())
314                     {
315                         LOG.debug("Whole Text Message");
316                     }
317                     activeMessage = new SimpleTextMessage(this);
318                 }
319             }
320         }
321 
322         if (events.hasTextStream())
323         {
324             handled = true;
325             // Streaming Message Support
326             if (activeMessage == null)
327             {
328                 if (LOG.isDebugEnabled())
329                 {
330                     LOG.debug("Text Message Writer");
331                 }
332 
333                 final MessageReader stream = new MessageReader(new MessageInputStream());
334                 activeMessage = stream;
335 
336                 // Always dispatch streaming read to another thread.
337                 dispatch(new Runnable()
338                 {
339                     @Override
340                     public void run()
341                     {
342                         try
343                         {
344                             events.callTextStream(jsrsession.getAsyncRemote(),websocket,stream);
345                         }
346                         catch (DecodeException | IOException e)
347                         {
348                             onFatalError(e);
349                         }
350                     }
351                 });
352             }
353         }
354 
355         if (LOG.isDebugEnabled())
356         {
357             LOG.debug("handled = {}", handled);
358         }
359 
360         // Process any active MessageAppender
361         if (handled && (activeMessage != null))
362         {
363             appendMessage(buffer,fin);
364         }
365     }
366 
367     /**
368      * Entry point for whole text messages
369      */
370     @Override
371     public void onTextMessage(String message)
372     {
373         if (LOG.isDebugEnabled())
374         {
375             LOG.debug("onText({})",message);
376         }
377 
378         try
379         {
380             // FIN is always true here
381             events.callText(jsrsession.getAsyncRemote(),websocket,message,true);
382         }
383         catch (DecodeException e)
384         {
385             onFatalError(e);
386         }
387     }
388 
389     @Override
390     public void setPathParameters(Map<String, String> pathParameters)
391     {
392         events.setPathParameters(pathParameters);
393     }
394 
395     @Override
396     public String toString()
397     {
398         return String.format("%s[websocket=%s]",this.getClass().getSimpleName(),websocket);
399     }
400 }