View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.events;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.eclipse.jetty.util.BufferUtil;
25  import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
26  import org.eclipse.jetty.util.log.Log;
27  import org.eclipse.jetty.util.log.Logger;
28  import org.eclipse.jetty.websocket.api.CloseException;
29  import org.eclipse.jetty.websocket.api.StatusCode;
30  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
31  import org.eclipse.jetty.websocket.api.extensions.Frame;
32  import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
33  import org.eclipse.jetty.websocket.common.CloseInfo;
34  import org.eclipse.jetty.websocket.common.OpCode;
35  import org.eclipse.jetty.websocket.common.WebSocketSession;
36  import org.eclipse.jetty.websocket.common.frames.CloseFrame;
37  import org.eclipse.jetty.websocket.common.message.MessageAppender;
38  
39  /**
40   * EventDriver is the main interface between the User's WebSocket POJO and the internal jetty implementation of WebSocket.
41   */
42  public abstract class AbstractEventDriver implements IncomingFrames, EventDriver
43  {
44      private static final Logger LOG = Log.getLogger(AbstractEventDriver.class);
45      protected final WebSocketPolicy policy;
46      protected final Object websocket;
47      protected WebSocketSession session;
48      protected MessageAppender activeMessage;
49  
50      public AbstractEventDriver(WebSocketPolicy policy, Object websocket)
51      {
52          this.policy = policy;
53          this.websocket = websocket;
54      }
55  
56      protected void appendMessage(ByteBuffer buffer, boolean fin) throws IOException
57      {
58          activeMessage.appendMessage(buffer,fin);
59  
60          if (fin)
61          {
62              activeMessage.messageComplete();
63              activeMessage = null;
64          }
65      }
66  
67      protected void dispatch(Runnable runnable)
68      {
69          session.dispatch(runnable);
70      }
71  
72      @Override
73      public WebSocketPolicy getPolicy()
74      {
75          return policy;
76      }
77  
78      @Override
79      public WebSocketSession getSession()
80      {
81          return session;
82      }
83  
84      @Override
85      public final void incomingError(Throwable e)
86      {
87          if (LOG.isDebugEnabled())
88          {
89              LOG.debug("incoming(WebSocketException)",e);
90          }
91  
92          if (e instanceof CloseException)
93          {
94              CloseException close = (CloseException)e;
95              terminateConnection(close.getStatusCode(),close.getMessage());
96          }
97  
98          onError(e);
99      }
100 
101     @Override
102     public void incomingFrame(Frame frame)
103     {
104         if (LOG.isDebugEnabled())
105         {
106             LOG.debug("{}.onFrame({})",websocket.getClass().getSimpleName(),frame);
107         }
108 
109         try
110         {
111             onFrame(frame);
112 
113             byte opcode = frame.getOpCode();
114             switch (opcode)
115             {
116                 case OpCode.CLOSE:
117                 {
118                     boolean validate = true;
119                     CloseFrame closeframe = (CloseFrame)frame;
120                     CloseInfo close = new CloseInfo(closeframe,validate);
121 
122                     // notify user websocket pojo
123                     onClose(close);
124 
125                     // process handshake
126                     session.getConnection().getIOState().onCloseRemote(close);
127 
128                     return;
129                 }
130                 case OpCode.PING:
131                 {
132                     ByteBuffer pongBuf;
133                     if (frame.hasPayload())
134                     {
135                         pongBuf = ByteBuffer.allocate(frame.getPayload().remaining());
136                         BufferUtil.put(frame.getPayload(),pongBuf);
137                         BufferUtil.flipToFlush(pongBuf,0);
138                     }
139                     else
140                     {
141                         pongBuf = ByteBuffer.allocate(0);
142                     }
143                     onPing(frame.getPayload());
144                     session.getRemote().sendPong(pongBuf);
145                     break;
146                 }
147                 case OpCode.PONG:
148                 {
149                     onPong(frame.getPayload());
150                     break;
151                 }
152                 case OpCode.BINARY:
153                 {
154                     onBinaryFrame(frame.getPayload(),frame.isFin());
155                     return;
156                 }
157                 case OpCode.TEXT:
158                 {
159                     onTextFrame(frame.getPayload(),frame.isFin());
160                     return;
161                 }
162                 case OpCode.CONTINUATION:
163                 {
164                     onContinuationFrame(frame.getPayload(),frame.isFin());
165                     return;
166                 }
167                 default:
168                 {
169                     LOG.debug("Unhandled OpCode: {}",opcode);
170                 }
171             }
172         }
173         catch (NotUtf8Exception e)
174         {
175             terminateConnection(StatusCode.BAD_PAYLOAD,e.getMessage());
176         }
177         catch (CloseException e)
178         {
179             terminateConnection(e.getStatusCode(),e.getMessage());
180         }
181         catch (Throwable t)
182         {
183             unhandled(t);
184         }
185     }
186 
187     @Override
188     public void onContinuationFrame(ByteBuffer buffer, boolean fin) throws IOException
189     {
190         if (activeMessage == null)
191         {
192             throw new IOException("Out of order Continuation frame encountered");
193         }
194 
195         appendMessage(buffer,fin);
196     }
197 
198     @Override
199     public void onPong(ByteBuffer buffer)
200     {
201         /* TODO: provide annotation in future */
202     }
203     
204     @Override
205     public void onPing(ByteBuffer buffer)
206     {
207         /* TODO: provide annotation in future */
208     }
209 
210     @Override
211     public void openSession(WebSocketSession session)
212     {
213         LOG.debug("openSession({})",session);
214         this.session = session;
215         try
216         {
217             this.onConnect();
218         }
219         catch (Throwable t)
220         {
221             unhandled(t);
222         }
223     }
224 
225     protected void terminateConnection(int statusCode, String rawreason)
226     {
227         LOG.debug("terminateConnection({},{})",statusCode,rawreason);
228         session.close(statusCode,CloseFrame.truncate(rawreason));
229     }
230 
231     private void unhandled(Throwable t)
232     {
233         LOG.warn("Unhandled Error (closing connection)",t);
234         onError(t);
235 
236         // Unhandled Error, close the connection.
237         switch (policy.getBehavior())
238         {
239             case SERVER:
240                 terminateConnection(StatusCode.SERVER_ERROR,t.getClass().getSimpleName());
241                 break;
242             case CLIENT:
243                 terminateConnection(StatusCode.POLICY_VIOLATION,t.getClass().getSimpleName());
244                 break;
245         }
246     }
247 }