View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.events;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.eclipse.jetty.util.BufferUtil;
25  import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
26  import org.eclipse.jetty.util.component.AbstractLifeCycle;
27  import org.eclipse.jetty.util.log.Log;
28  import org.eclipse.jetty.util.log.Logger;
29  import org.eclipse.jetty.websocket.api.BatchMode;
30  import org.eclipse.jetty.websocket.api.CloseException;
31  import org.eclipse.jetty.websocket.api.StatusCode;
32  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
33  import org.eclipse.jetty.websocket.api.extensions.Frame;
34  import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
35  import org.eclipse.jetty.websocket.common.CloseInfo;
36  import org.eclipse.jetty.websocket.common.OpCode;
37  import org.eclipse.jetty.websocket.common.WebSocketSession;
38  import org.eclipse.jetty.websocket.common.frames.CloseFrame;
39  import org.eclipse.jetty.websocket.common.message.MessageAppender;
40  
41  /**
42   * EventDriver is the main interface between the User's WebSocket POJO and the internal jetty implementation of WebSocket.
43   */
44  public abstract class AbstractEventDriver extends AbstractLifeCycle implements IncomingFrames, EventDriver
45  {
46      private static final Logger LOG = Log.getLogger(AbstractEventDriver.class);
47      protected final Logger TARGET_LOG;
48      protected WebSocketPolicy policy;
49      protected final Object websocket;
50      protected WebSocketSession session;
51      protected MessageAppender activeMessage;
52  
53      public AbstractEventDriver(WebSocketPolicy policy, Object websocket)
54      {
55          this.policy = policy;
56          this.websocket = websocket;
57          this.TARGET_LOG = Log.getLogger(websocket.getClass());
58      }
59  
60      protected void appendMessage(ByteBuffer buffer, boolean fin) throws IOException
61      {
62          activeMessage.appendFrame(buffer,fin);
63  
64          if (fin)
65          {
66              activeMessage.messageComplete();
67              activeMessage = null;
68          }
69      }
70  
71      protected void dispatch(Runnable runnable)
72      {
73          session.dispatch(runnable);
74      }
75  
76      @Override
77      public WebSocketPolicy getPolicy()
78      {
79          return policy;
80      }
81  
82      @Override
83      public WebSocketSession getSession()
84      {
85          return session;
86      }
87  
88      @Override
89      public final void incomingError(Throwable e)
90      {
91          if (LOG.isDebugEnabled())
92          {
93              LOG.debug("incomingError(" + e.getClass().getName() + ")",e);
94          }
95  
96          onError(e);
97      }
98  
99      @Override
100     public void incomingFrame(Frame frame)
101     {
102         if (LOG.isDebugEnabled())
103         {
104             LOG.debug("incomingFrame({})",frame);
105         }
106 
107         try
108         {
109             onFrame(frame);
110 
111             byte opcode = frame.getOpCode();
112             switch (opcode)
113             {
114                 case OpCode.CLOSE:
115                 {
116                     boolean validate = true;
117                     CloseFrame closeframe = (CloseFrame)frame;
118                     CloseInfo close = new CloseInfo(closeframe,validate);
119 
120                     // process handshake
121                     session.getConnection().getIOState().onCloseRemote(close);
122 
123                     return;
124                 }
125                 case OpCode.PING:
126                 {
127                     if (LOG.isDebugEnabled())
128                     {
129                         LOG.debug("PING: {}",BufferUtil.toDetailString(frame.getPayload()));
130                     }
131                     ByteBuffer pongBuf;
132                     if (frame.hasPayload())
133                     {
134                         pongBuf = ByteBuffer.allocate(frame.getPayload().remaining());
135                         BufferUtil.put(frame.getPayload().slice(),pongBuf);
136                         BufferUtil.flipToFlush(pongBuf,0);
137                     }
138                     else
139                     {
140                         pongBuf = ByteBuffer.allocate(0);
141                     }
142                     onPing(frame.getPayload());
143                     session.getRemote().sendPong(pongBuf);
144                     break;
145                 }
146                 case OpCode.PONG:
147                 {
148                     if (LOG.isDebugEnabled())
149                     {
150                         LOG.debug("PONG: {}",BufferUtil.toDetailString(frame.getPayload()));
151                     }
152                     onPong(frame.getPayload());
153                     break;
154                 }
155                 case OpCode.BINARY:
156                 {
157                     onBinaryFrame(frame.getPayload(),frame.isFin());
158                     return;
159                 }
160                 case OpCode.TEXT:
161                 {
162                     onTextFrame(frame.getPayload(),frame.isFin());
163                     return;
164                 }
165                 case OpCode.CONTINUATION:
166                 {
167                     onContinuationFrame(frame.getPayload(),frame.isFin());
168                     return;
169                 }
170                 default:
171                 {
172                     if (LOG.isDebugEnabled())
173                         LOG.debug("Unhandled OpCode: {}",opcode);
174                 }
175             }
176         }
177         catch (NotUtf8Exception e)
178         {
179             terminateConnection(StatusCode.BAD_PAYLOAD,e.getMessage());
180         }
181         catch (CloseException e)
182         {
183             terminateConnection(e.getStatusCode(),e.getMessage());
184         }
185         catch (Throwable t)
186         {
187             unhandled(t);
188         }
189     }
190 
191     @Override
192     public void onContinuationFrame(ByteBuffer buffer, boolean fin) throws IOException
193     {
194         if (activeMessage == null)
195         {
196             throw new IOException("Out of order Continuation frame encountered");
197         }
198 
199         appendMessage(buffer,fin);
200     }
201 
202     @Override
203     public void onPong(ByteBuffer buffer)
204     {
205         /* TODO: provide annotation in future */
206     }
207     
208     @Override
209     public void onPing(ByteBuffer buffer)
210     {
211         /* TODO: provide annotation in future */
212     }
213     
214     @Override
215     public BatchMode getBatchMode()
216     {
217         return null;
218     }
219 
220     @Override
221     public void openSession(WebSocketSession session)
222     {
223         if (LOG.isDebugEnabled())
224             LOG.debug("openSession({})",session);
225         this.session = session;
226         this.session.getContainerScope().getObjectFactory().decorate(this.websocket);
227         try
228         {
229             this.onConnect();
230         }
231         catch (Throwable t)
232         {
233             unhandled(t);
234             throw t;
235         }
236     }
237     
238     @Override
239     protected void doStop() throws Exception
240     {
241         session = null;
242     }
243 
244     protected void terminateConnection(int statusCode, String rawreason)
245     {
246         if (LOG.isDebugEnabled())
247             LOG.debug("terminateConnection({},{})",statusCode,rawreason);
248         session.close(statusCode,CloseFrame.truncate(rawreason));
249     }
250 
251     private void unhandled(Throwable t)
252     {
253         TARGET_LOG.warn("Unhandled Error (closing connection)",t);
254         onError(t);
255         
256         if (t instanceof CloseException)
257         {
258             terminateConnection(((CloseException)t).getStatusCode(),t.getClass().getSimpleName());
259             return;
260         }
261 
262         // Unhandled Error, close the connection.
263         switch (policy.getBehavior())
264         {
265             case SERVER:
266                 terminateConnection(StatusCode.SERVER_ERROR,t.getClass().getSimpleName());
267                 break;
268             case CLIENT:
269                 terminateConnection(StatusCode.POLICY_VIOLATION,t.getClass().getSimpleName());
270                 break;
271         }
272     }
273 }