View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.jsr356.endpoints;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.Reader;
24  import java.nio.ByteBuffer;
25  import java.util.Map;
26  import javax.websocket.CloseReason;
27  import javax.websocket.DecodeException;
28  
29  import org.eclipse.jetty.util.BufferUtil;
30  import org.eclipse.jetty.util.log.Log;
31  import org.eclipse.jetty.util.log.Logger;
32  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
33  import org.eclipse.jetty.websocket.api.extensions.Frame;
34  import org.eclipse.jetty.websocket.common.events.EventDriver;
35  import org.eclipse.jetty.websocket.common.message.MessageInputStream;
36  import org.eclipse.jetty.websocket.common.message.MessageReader;
37  import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
38  import org.eclipse.jetty.websocket.common.message.SimpleTextMessage;
39  import org.eclipse.jetty.websocket.jsr356.JsrSession;
40  import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
41  import org.eclipse.jetty.websocket.jsr356.messages.BinaryPartialOnMessage;
42  import org.eclipse.jetty.websocket.jsr356.messages.TextPartialOnMessage;
43  
44  /**
45   * Base implementation for JSR-356 Annotated event drivers.
46   */
47  public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements EventDriver
48  {
49      private static final Logger LOG = Log.getLogger(JsrAnnotatedEventDriver.class);
50      private final JsrEvents<?, ?> events;
51  
52      public JsrAnnotatedEventDriver(WebSocketPolicy policy, EndpointInstance endpointInstance, JsrEvents<?, ?> events)
53      {
54          super(policy,endpointInstance);
55          this.events = events;
56      }
57  
58      @Override
59      public void init(JsrSession jsrsession)
60      {
61          this.events.init(jsrsession);
62      }
63  
64      /**
65       * Entry point for all incoming binary frames.
66       */
67      @Override
68      public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
69      {
70          if (LOG.isDebugEnabled())
71          {
72              LOG.debug("onBinaryFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
73              LOG.debug("events.onBinary={}",events.hasBinary());
74              LOG.debug("events.onBinaryStream={}",events.hasBinaryStream());
75          }
76          boolean handled = false;
77  
78          if (events.hasBinary())
79          {
80              handled = true;
81              if (activeMessage == null)
82              {
83                  if (events.isBinaryPartialSupported())
84                  {
85                      // Partial Message Support (does not use messageAppender)
86                      LOG.debug("Partial Binary Message: fin={}",fin);
87                      activeMessage = new BinaryPartialOnMessage(this);
88                  }
89                  else
90                  {
91                      // Whole Message Support
92                      LOG.debug("Whole Binary Message");
93                      activeMessage = new SimpleBinaryMessage(this);
94                  }
95              }
96          }
97  
98          if (events.hasBinaryStream())
99          {
100             handled = true;
101             // Streaming Message Support
102             if (activeMessage == null)
103             {
104                 LOG.debug("Binary Message InputStream");
105                 final MessageInputStream stream = new MessageInputStream();
106                 activeMessage = stream;
107 
108                 // Always dispatch streaming read to another thread.
109                 dispatch(new Runnable()
110                 {
111                     @Override
112                     public void run()
113                     {
114                         try
115                         {
116                             events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
117                         }
118                         catch (DecodeException | IOException e)
119                         {
120                             onFatalError(e);
121                         }
122                     }
123                 });
124             }
125         }
126 
127         LOG.debug("handled = {}",handled);
128 
129         // Process any active MessageAppender
130         if (handled && (activeMessage != null))
131         {
132             appendMessage(buffer,fin);
133         }
134     }
135 
136     /**
137      * Entry point for binary frames destined for {@link Whole}
138      */
139     @Override
140     public void onBinaryMessage(byte[] data)
141     {
142         if (data == null)
143         {
144             return;
145         }
146 
147         ByteBuffer buf = ByteBuffer.wrap(data);
148 
149         if (LOG.isDebugEnabled())
150         {
151             LOG.debug("onBinaryMessage({})",BufferUtil.toDetailString(buf));
152         }
153 
154         try
155         {
156             // FIN is always true here
157             events.callBinary(jsrsession.getAsyncRemote(),websocket,buf,true);
158         }
159         catch (DecodeException e)
160         {
161             onFatalError(e);
162         }
163     }
164 
165     @Override
166     protected void onClose(CloseReason closereason)
167     {
168         events.callClose(websocket,closereason);
169     }
170 
171     @Override
172     public void onConnect()
173     {
174         events.callOpen(websocket,config);
175     }
176 
177     @Override
178     public void onError(Throwable cause)
179     {
180         events.callError(websocket,cause);
181     }
182 
183     private void onFatalError(Throwable t)
184     {
185         onError(t);
186     }
187 
188     @Override
189     public void onFrame(Frame frame)
190     {
191         /* Ignored in JSR-356 */
192     }
193 
194     @Override
195     public void onInputStream(InputStream stream)
196     {
197         try
198         {
199             events.callBinaryStream(jsrsession.getAsyncRemote(),websocket,stream);
200         }
201         catch (DecodeException | IOException e)
202         {
203             onFatalError(e);
204         }
205     }
206 
207     public void onPartialBinaryMessage(ByteBuffer buffer, boolean fin)
208     {
209         try
210         {
211             events.callBinary(jsrsession.getAsyncRemote(),websocket,buffer,fin);
212         }
213         catch (DecodeException e)
214         {
215             onFatalError(e);
216         }
217     }
218 
219     public void onPartialTextMessage(String message, boolean fin)
220     {
221         try
222         {
223             events.callText(jsrsession.getAsyncRemote(),websocket,message,fin);
224         }
225         catch (DecodeException e)
226         {
227             onFatalError(e);
228         }
229     }
230 
231     @Override
232     public void onPing(ByteBuffer buffer)
233     {
234         try
235         {
236             events.callPong(jsrsession.getAsyncRemote(),websocket,buffer);
237         }
238         catch (DecodeException | IOException e)
239         {
240             onFatalError(e);
241         }
242     }
243     
244     @Override
245     public void onPong(ByteBuffer buffer)
246     {
247         try
248         {
249             events.callPong(jsrsession.getAsyncRemote(),websocket,buffer);
250         }
251         catch (DecodeException | IOException e)
252         {
253             onFatalError(e);
254         }
255     }
256 
257     @Override
258     public void onReader(Reader reader)
259     {
260         try
261         {
262             events.callTextStream(jsrsession.getAsyncRemote(),websocket,reader);
263         }
264         catch (DecodeException | IOException e)
265         {
266             onFatalError(e);
267         }
268     }
269 
270     /**
271      * Entry point for all incoming text frames.
272      */
273     @Override
274     public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
275     {
276         if (LOG.isDebugEnabled())
277         {
278             LOG.debug("onTextFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
279             LOG.debug("events.hasText={}",events.hasText());
280             LOG.debug("events.hasTextStream={}",events.hasTextStream());
281         }
282 
283         boolean handled = false;
284 
285         if (events.hasText())
286         {
287             handled = true;
288             if (activeMessage == null)
289             {
290                 if (events.isTextPartialSupported())
291                 {
292                     // Partial Message Support
293                     LOG.debug("Partial Text Message: fin={}",fin);
294                     activeMessage = new TextPartialOnMessage(this);
295                 }
296                 else
297                 {
298                     // Whole Message Support
299                     LOG.debug("Whole Text Message");
300                     activeMessage = new SimpleTextMessage(this);
301                 }
302             }
303         }
304 
305         if (events.hasTextStream())
306         {
307             handled = true;
308             // Streaming Message Support
309             if (activeMessage == null)
310             {
311                 LOG.debug("Text Message Writer");
312 
313                 final MessageReader stream = new MessageReader(new MessageInputStream());
314                 activeMessage = stream;
315 
316                 // Always dispatch streaming read to another thread.
317                 dispatch(new Runnable()
318                 {
319                     @Override
320                     public void run()
321                     {
322                         try
323                         {
324                             events.callTextStream(jsrsession.getAsyncRemote(),websocket,stream);
325                         }
326                         catch (DecodeException | IOException e)
327                         {
328                             onFatalError(e);
329                         }
330                     }
331                 });
332             }
333         }
334 
335         LOG.debug("handled = {}",handled);
336 
337         // Process any active MessageAppender
338         if (handled && (activeMessage != null))
339         {
340             appendMessage(buffer,fin);
341         }
342     }
343 
344     /**
345      * Entry point for whole text messages
346      */
347     @Override
348     public void onTextMessage(String message)
349     {
350         LOG.debug("onText({})",message);
351 
352         try
353         {
354             // FIN is always true here
355             events.callText(jsrsession.getAsyncRemote(),websocket,message,true);
356         }
357         catch (DecodeException e)
358         {
359             onFatalError(e);
360         }
361     }
362 
363     @Override
364     public void setPathParameters(Map<String, String> pathParameters)
365     {
366         events.setPathParameters(pathParameters);
367     }
368 
369     @Override
370     public String toString()
371     {
372         return String.format("%s[websocket=%s]",this.getClass().getSimpleName(),websocket);
373     }
374 }