1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.events;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23
24 import org.eclipse.jetty.util.BufferUtil;
25 import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
26 import org.eclipse.jetty.util.component.AbstractLifeCycle;
27 import org.eclipse.jetty.util.log.Log;
28 import org.eclipse.jetty.util.log.Logger;
29 import org.eclipse.jetty.websocket.api.BatchMode;
30 import org.eclipse.jetty.websocket.api.CloseException;
31 import org.eclipse.jetty.websocket.api.StatusCode;
32 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
33 import org.eclipse.jetty.websocket.api.extensions.Frame;
34 import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
35 import org.eclipse.jetty.websocket.common.CloseInfo;
36 import org.eclipse.jetty.websocket.common.OpCode;
37 import org.eclipse.jetty.websocket.common.WebSocketSession;
38 import org.eclipse.jetty.websocket.common.frames.CloseFrame;
39 import org.eclipse.jetty.websocket.common.message.MessageAppender;
40
41
42
43
44 public abstract class AbstractEventDriver extends AbstractLifeCycle implements IncomingFrames, EventDriver
45 {
46 private static final Logger LOG = Log.getLogger(AbstractEventDriver.class);
47 protected final Logger TARGET_LOG;
48 protected WebSocketPolicy policy;
49 protected final Object websocket;
50 protected WebSocketSession session;
51 protected MessageAppender activeMessage;
52
53 public AbstractEventDriver(WebSocketPolicy policy, Object websocket)
54 {
55 this.policy = policy;
56 this.websocket = websocket;
57 this.TARGET_LOG = Log.getLogger(websocket.getClass());
58 }
59
60 protected void appendMessage(ByteBuffer buffer, boolean fin) throws IOException
61 {
62 activeMessage.appendFrame(buffer,fin);
63
64 if (fin)
65 {
66 activeMessage.messageComplete();
67 activeMessage = null;
68 }
69 }
70
71 protected void dispatch(Runnable runnable)
72 {
73 session.dispatch(runnable);
74 }
75
76 @Override
77 public WebSocketPolicy getPolicy()
78 {
79 return policy;
80 }
81
82 @Override
83 public WebSocketSession getSession()
84 {
85 return session;
86 }
87
88 @Override
89 public final void incomingError(Throwable e)
90 {
91 if (LOG.isDebugEnabled())
92 {
93 LOG.debug("incomingError(" + e.getClass().getName() + ")",e);
94 }
95
96 onError(e);
97 }
98
99 @Override
100 public void incomingFrame(Frame frame)
101 {
102 if (LOG.isDebugEnabled())
103 {
104 LOG.debug("incomingFrame({})",frame);
105 }
106
107 try
108 {
109 onFrame(frame);
110
111 byte opcode = frame.getOpCode();
112 switch (opcode)
113 {
114 case OpCode.CLOSE:
115 {
116 boolean validate = true;
117 CloseFrame closeframe = (CloseFrame)frame;
118 CloseInfo close = new CloseInfo(closeframe,validate);
119
120
121 session.getConnection().getIOState().onCloseRemote(close);
122
123 return;
124 }
125 case OpCode.PING:
126 {
127 if (LOG.isDebugEnabled())
128 {
129 LOG.debug("PING: {}",BufferUtil.toDetailString(frame.getPayload()));
130 }
131 ByteBuffer pongBuf;
132 if (frame.hasPayload())
133 {
134 pongBuf = ByteBuffer.allocate(frame.getPayload().remaining());
135 BufferUtil.put(frame.getPayload().slice(),pongBuf);
136 BufferUtil.flipToFlush(pongBuf,0);
137 }
138 else
139 {
140 pongBuf = ByteBuffer.allocate(0);
141 }
142 onPing(frame.getPayload());
143 session.getRemote().sendPong(pongBuf);
144 break;
145 }
146 case OpCode.PONG:
147 {
148 if (LOG.isDebugEnabled())
149 {
150 LOG.debug("PONG: {}",BufferUtil.toDetailString(frame.getPayload()));
151 }
152 onPong(frame.getPayload());
153 break;
154 }
155 case OpCode.BINARY:
156 {
157 onBinaryFrame(frame.getPayload(),frame.isFin());
158 return;
159 }
160 case OpCode.TEXT:
161 {
162 onTextFrame(frame.getPayload(),frame.isFin());
163 return;
164 }
165 case OpCode.CONTINUATION:
166 {
167 onContinuationFrame(frame.getPayload(),frame.isFin());
168 return;
169 }
170 default:
171 {
172 if (LOG.isDebugEnabled())
173 LOG.debug("Unhandled OpCode: {}",opcode);
174 }
175 }
176 }
177 catch (NotUtf8Exception e)
178 {
179 terminateConnection(StatusCode.BAD_PAYLOAD,e.getMessage());
180 }
181 catch (CloseException e)
182 {
183 terminateConnection(e.getStatusCode(),e.getMessage());
184 }
185 catch (Throwable t)
186 {
187 unhandled(t);
188 }
189 }
190
191 @Override
192 public void onContinuationFrame(ByteBuffer buffer, boolean fin) throws IOException
193 {
194 if (activeMessage == null)
195 {
196 throw new IOException("Out of order Continuation frame encountered");
197 }
198
199 appendMessage(buffer,fin);
200 }
201
202 @Override
203 public void onPong(ByteBuffer buffer)
204 {
205
206 }
207
208 @Override
209 public void onPing(ByteBuffer buffer)
210 {
211
212 }
213
214 @Override
215 public BatchMode getBatchMode()
216 {
217 return null;
218 }
219
220 @Override
221 public void openSession(WebSocketSession session)
222 {
223 if (LOG.isDebugEnabled())
224 LOG.debug("openSession({})",session);
225 this.session = session;
226 this.session.getContainerScope().getObjectFactory().decorate(this.websocket);
227 try
228 {
229 this.onConnect();
230 }
231 catch (Throwable t)
232 {
233 unhandled(t);
234 throw t;
235 }
236 }
237
238 @Override
239 protected void doStop() throws Exception
240 {
241 session = null;
242 }
243
244 protected void terminateConnection(int statusCode, String rawreason)
245 {
246 if (LOG.isDebugEnabled())
247 LOG.debug("terminateConnection({},{})",statusCode,rawreason);
248 session.close(statusCode,CloseFrame.truncate(rawreason));
249 }
250
251 private void unhandled(Throwable t)
252 {
253 TARGET_LOG.warn("Unhandled Error (closing connection)",t);
254 onError(t);
255
256 if (t instanceof CloseException)
257 {
258 terminateConnection(((CloseException)t).getStatusCode(),t.getClass().getSimpleName());
259 return;
260 }
261
262
263 switch (policy.getBehavior())
264 {
265 case SERVER:
266 terminateConnection(StatusCode.SERVER_ERROR,t.getClass().getSimpleName());
267 break;
268 case CLIENT:
269 terminateConnection(StatusCode.POLICY_VIOLATION,t.getClass().getSimpleName());
270 break;
271 }
272 }
273 }