View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.jsr356.endpoints;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.Reader;
24  import java.nio.ByteBuffer;
25  import java.util.Map;
26  
27  import javax.websocket.CloseReason;
28  import javax.websocket.Endpoint;
29  import javax.websocket.MessageHandler;
30  import javax.websocket.MessageHandler.Whole;
31  import javax.websocket.PongMessage;
32  
33  import org.eclipse.jetty.util.BufferUtil;
34  import org.eclipse.jetty.util.log.Log;
35  import org.eclipse.jetty.util.log.Logger;
36  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
37  import org.eclipse.jetty.websocket.api.extensions.Frame;
38  import org.eclipse.jetty.websocket.common.message.MessageInputStream;
39  import org.eclipse.jetty.websocket.common.message.MessageReader;
40  import org.eclipse.jetty.websocket.jsr356.JsrPongMessage;
41  import org.eclipse.jetty.websocket.jsr356.JsrSession;
42  import org.eclipse.jetty.websocket.jsr356.MessageHandlerWrapper;
43  import org.eclipse.jetty.websocket.jsr356.MessageType;
44  import org.eclipse.jetty.websocket.jsr356.messages.BinaryPartialMessage;
45  import org.eclipse.jetty.websocket.jsr356.messages.BinaryWholeMessage;
46  import org.eclipse.jetty.websocket.jsr356.messages.TextPartialMessage;
47  import org.eclipse.jetty.websocket.jsr356.messages.TextWholeMessage;
48  
49  /**
50   * EventDriver for websocket that extend from {@link javax.websocket.Endpoint}
51   */
52  public class JsrEndpointEventDriver extends AbstractJsrEventDriver
53  {
54      private static final Logger LOG = Log.getLogger(JsrEndpointEventDriver.class);
55  
56      private final Endpoint endpoint;
57      private Map<String, String> pathParameters;
58  
59      public JsrEndpointEventDriver(WebSocketPolicy policy, EndpointInstance endpointInstance)
60      {
61          super(policy,endpointInstance);
62          this.endpoint = (Endpoint)endpointInstance.getEndpoint();
63      }
64  
65      @Override
66      public void init(JsrSession jsrsession)
67      {
68          jsrsession.setPathParameters(pathParameters);
69      }
70  
71      @Override
72      public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
73      {
74          if (activeMessage == null)
75          {
76              final MessageHandlerWrapper wrapper = jsrsession.getMessageHandlerWrapper(MessageType.BINARY);
77              if (wrapper == null)
78              {
79                  if (LOG.isDebugEnabled())
80                  {
81                      LOG.debug("No BINARY MessageHandler declared");
82                  }
83                  return;
84              }
85              if (wrapper.wantsPartialMessages())
86              {
87                  activeMessage = new BinaryPartialMessage(wrapper);
88              }
89              else if (wrapper.wantsStreams())
90              {
91                  final MessageInputStream stream = new MessageInputStream();
92                  activeMessage = stream;
93                  dispatch(new Runnable()
94                  {
95                      @SuppressWarnings("unchecked")
96                      @Override
97                      public void run()
98                      {
99                          MessageHandler.Whole<InputStream> handler = (Whole<InputStream>)wrapper.getHandler();
100                         handler.onMessage(stream);
101                     }
102                 });
103             }
104             else
105             {
106                 activeMessage = new BinaryWholeMessage(this,wrapper);
107             }
108         }
109 
110         activeMessage.appendFrame(buffer,fin);
111 
112         if (fin)
113         {
114             activeMessage.messageComplete();
115             activeMessage = null;
116         }
117     }
118 
119     @Override
120     public void onBinaryMessage(byte[] data)
121     {
122         /* Ignored, handled by BinaryWholeMessage */
123     }
124 
125     @Override
126     protected void onClose(CloseReason closereason)
127     {
128         endpoint.onClose(this.jsrsession,closereason);
129     }
130 
131     @Override
132     public void onConnect()
133     {
134         if (LOG.isDebugEnabled())
135         {
136             LOG.debug("onConnect({}, {})",jsrsession,config);
137         }
138 
139         // Let unhandled exceptions flow out
140         endpoint.onOpen(jsrsession,config);
141     }
142 
143     @Override
144     public void onError(Throwable cause)
145     {
146         try
147         {
148             endpoint.onError(jsrsession,cause);
149         }
150         catch (Throwable t)
151         {
152             LOG.warn("Unable to report to onError due to exception",t);
153         }
154     }
155 
156     @Override
157     public void onFrame(Frame frame)
158     {
159         /* Ignored, not supported by JSR-356 */
160     }
161 
162     @Override
163     public void onInputStream(InputStream stream)
164     {
165         /* Ignored, handled by BinaryStreamMessage */
166     }
167 
168     @Override
169     public void onReader(Reader reader)
170     {
171         /* Ignored, handled by TextStreamMessage */
172     }
173 
174     @Override
175     public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
176     {
177         if (activeMessage == null)
178         {
179             final MessageHandlerWrapper wrapper = jsrsession.getMessageHandlerWrapper(MessageType.TEXT);
180             if (wrapper == null)
181             {
182                 if (LOG.isDebugEnabled())
183                 {
184                     LOG.debug("No TEXT MessageHandler declared");
185                 }
186                 return;
187             }
188             if (wrapper.wantsPartialMessages())
189             {
190                 activeMessage = new TextPartialMessage(wrapper);
191             }
192             else if (wrapper.wantsStreams())
193             {
194                 final MessageReader stream = new MessageReader(new MessageInputStream());
195                 activeMessage = stream;
196 
197                 dispatch(new Runnable()
198                 {
199                     @SuppressWarnings("unchecked")
200                     @Override
201                     public void run()
202                     {
203                         MessageHandler.Whole<Reader> handler = (Whole<Reader>)wrapper.getHandler();
204                         handler.onMessage(stream);
205                     }
206                 });
207             }
208             else
209             {
210                 activeMessage = new TextWholeMessage(this,wrapper);
211             }
212         }
213 
214         activeMessage.appendFrame(buffer,fin);
215 
216         if (fin)
217         {
218             activeMessage.messageComplete();
219             activeMessage = null;
220         }
221     }
222 
223     @Override
224     public void onTextMessage(String message)
225     {
226         /* Ignored, handled by TextWholeMessage */
227     }
228 
229     @Override
230     public void onPing(ByteBuffer buffer)
231     {
232         onPongMessage(buffer);
233     }
234 
235     @Override
236     public void onPong(ByteBuffer buffer)
237     {
238         onPongMessage(buffer);
239     }
240 
241     private void onPongMessage(ByteBuffer buffer)
242     {
243         final MessageHandlerWrapper wrapper = jsrsession.getMessageHandlerWrapper(MessageType.PONG);
244         if (wrapper == null)
245         {
246             if (LOG.isDebugEnabled())
247             {
248                 LOG.debug("No PONG MessageHandler declared");
249             }
250             return;
251         }
252 
253         ByteBuffer pongBuf = null;
254 
255         if (BufferUtil.isEmpty(buffer))
256         {
257             pongBuf = BufferUtil.EMPTY_BUFFER;
258         }
259         else
260         {
261             pongBuf = ByteBuffer.allocate(buffer.remaining());
262             BufferUtil.put(buffer,pongBuf);
263             BufferUtil.flipToFlush(pongBuf,0);
264         }
265 
266         @SuppressWarnings("unchecked")
267         Whole<PongMessage> pongHandler = (Whole<PongMessage>)wrapper.getHandler();
268         pongHandler.onMessage(new JsrPongMessage(pongBuf));
269     }
270 
271     @Override
272     public void setPathParameters(Map<String, String> pathParameters)
273     {
274         this.pathParameters = pathParameters;
275     }
276 
277     @Override
278     public String toString()
279     {
280         return String.format("%s[%s]",JsrEndpointEventDriver.class.getSimpleName(),endpoint.getClass().getName());
281     }
282 }