View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.events;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.eclipse.jetty.util.BufferUtil;
25  import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
26  import org.eclipse.jetty.util.log.Log;
27  import org.eclipse.jetty.util.log.Logger;
28  import org.eclipse.jetty.websocket.api.CloseException;
29  import org.eclipse.jetty.websocket.api.StatusCode;
30  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
31  import org.eclipse.jetty.websocket.api.extensions.Frame;
32  import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
33  import org.eclipse.jetty.websocket.common.CloseInfo;
34  import org.eclipse.jetty.websocket.common.OpCode;
35  import org.eclipse.jetty.websocket.common.WebSocketSession;
36  import org.eclipse.jetty.websocket.common.frames.CloseFrame;
37  import org.eclipse.jetty.websocket.common.message.MessageAppender;
38  
39  /**
40   * EventDriver is the main interface between the User's WebSocket POJO and the internal jetty implementation of WebSocket.
41   */
42  public abstract class AbstractEventDriver implements IncomingFrames, EventDriver
43  {
44      private static final Logger LOG = Log.getLogger(AbstractEventDriver.class);
45      protected final Logger TARGET_LOG;
46      protected final WebSocketPolicy policy;
47      protected final Object websocket;
48      protected WebSocketSession session;
49      protected MessageAppender activeMessage;
50  
51      public AbstractEventDriver(WebSocketPolicy policy, Object websocket)
52      {
53          this.policy = policy;
54          this.websocket = websocket;
55          this.TARGET_LOG = Log.getLogger(websocket.getClass());
56      }
57  
58      protected void appendMessage(ByteBuffer buffer, boolean fin) throws IOException
59      {
60          activeMessage.appendFrame(buffer,fin);
61  
62          if (fin)
63          {
64              activeMessage.messageComplete();
65              activeMessage = null;
66          }
67      }
68  
69      protected void dispatch(Runnable runnable)
70      {
71          session.dispatch(runnable);
72      }
73  
74      @Override
75      public WebSocketPolicy getPolicy()
76      {
77          return policy;
78      }
79  
80      @Override
81      public WebSocketSession getSession()
82      {
83          return session;
84      }
85  
86      @Override
87      public final void incomingError(Throwable e)
88      {
89          if (LOG.isDebugEnabled())
90          {
91              LOG.debug("incomingError(" + e.getClass().getName() + ")",e);
92          }
93  
94          onError(e);
95      }
96  
97      @Override
98      public void incomingFrame(Frame frame)
99      {
100         if (LOG.isDebugEnabled())
101         {
102             LOG.debug("incomingFrame({})",frame);
103         }
104 
105         try
106         {
107             onFrame(frame);
108 
109             byte opcode = frame.getOpCode();
110             switch (opcode)
111             {
112                 case OpCode.CLOSE:
113                 {
114                     boolean validate = true;
115                     CloseFrame closeframe = (CloseFrame)frame;
116                     CloseInfo close = new CloseInfo(closeframe,validate);
117 
118                     // process handshake
119                     session.getConnection().getIOState().onCloseRemote(close);
120 
121                     return;
122                 }
123                 case OpCode.PING:
124                 {
125                     if (LOG.isDebugEnabled())
126                     {
127                         LOG.debug("PING: {}",BufferUtil.toDetailString(frame.getPayload()));
128                     }
129                     ByteBuffer pongBuf;
130                     if (frame.hasPayload())
131                     {
132                         pongBuf = ByteBuffer.allocate(frame.getPayload().remaining());
133                         BufferUtil.put(frame.getPayload().slice(),pongBuf);
134                         BufferUtil.flipToFlush(pongBuf,0);
135                     }
136                     else
137                     {
138                         pongBuf = ByteBuffer.allocate(0);
139                     }
140                     onPing(frame.getPayload());
141                     session.getRemote().sendPong(pongBuf);
142                     break;
143                 }
144                 case OpCode.PONG:
145                 {
146                     if (LOG.isDebugEnabled())
147                     {
148                         LOG.debug("PONG: {}",BufferUtil.toDetailString(frame.getPayload()));
149                     }
150                     onPong(frame.getPayload());
151                     break;
152                 }
153                 case OpCode.BINARY:
154                 {
155                     onBinaryFrame(frame.getPayload(),frame.isFin());
156                     return;
157                 }
158                 case OpCode.TEXT:
159                 {
160                     onTextFrame(frame.getPayload(),frame.isFin());
161                     return;
162                 }
163                 case OpCode.CONTINUATION:
164                 {
165                     onContinuationFrame(frame.getPayload(),frame.isFin());
166                     return;
167                 }
168                 default:
169                 {
170                     if (LOG.isDebugEnabled())
171                         LOG.debug("Unhandled OpCode: {}",opcode);
172                 }
173             }
174         }
175         catch (NotUtf8Exception e)
176         {
177             terminateConnection(StatusCode.BAD_PAYLOAD,e.getMessage());
178         }
179         catch (CloseException e)
180         {
181             terminateConnection(e.getStatusCode(),e.getMessage());
182         }
183         catch (Throwable t)
184         {
185             unhandled(t);
186         }
187     }
188 
189     @Override
190     public void onContinuationFrame(ByteBuffer buffer, boolean fin) throws IOException
191     {
192         if (activeMessage == null)
193         {
194             throw new IOException("Out of order Continuation frame encountered");
195         }
196 
197         appendMessage(buffer,fin);
198     }
199 
200     @Override
201     public void onPong(ByteBuffer buffer)
202     {
203         /* TODO: provide annotation in future */
204     }
205     
206     @Override
207     public void onPing(ByteBuffer buffer)
208     {
209         /* TODO: provide annotation in future */
210     }
211 
212     @Override
213     public void openSession(WebSocketSession session)
214     {
215         if (LOG.isDebugEnabled())
216             LOG.debug("openSession({})",session);
217         this.session = session;
218         try
219         {
220             this.onConnect();
221         }
222         catch (Throwable t)
223         {
224             unhandled(t);
225             throw t;
226         }
227     }
228 
229     protected void terminateConnection(int statusCode, String rawreason)
230     {
231         if (LOG.isDebugEnabled())
232             LOG.debug("terminateConnection({},{})",statusCode,rawreason);
233         session.close(statusCode,CloseFrame.truncate(rawreason));
234     }
235 
236     private void unhandled(Throwable t)
237     {
238         TARGET_LOG.warn("Unhandled Error (closing connection)",t);
239         onError(t);
240 
241         // Unhandled Error, close the connection.
242         switch (policy.getBehavior())
243         {
244             case SERVER:
245                 terminateConnection(StatusCode.SERVER_ERROR,t.getClass().getSimpleName());
246                 break;
247             case CLIENT:
248                 terminateConnection(StatusCode.POLICY_VIOLATION,t.getClass().getSimpleName());
249                 break;
250         }
251     }
252 }