1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.events;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23
24 import org.eclipse.jetty.util.BufferUtil;
25 import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
26 import org.eclipse.jetty.util.log.Log;
27 import org.eclipse.jetty.util.log.Logger;
28 import org.eclipse.jetty.websocket.api.CloseException;
29 import org.eclipse.jetty.websocket.api.StatusCode;
30 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
31 import org.eclipse.jetty.websocket.api.extensions.Frame;
32 import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
33 import org.eclipse.jetty.websocket.common.CloseInfo;
34 import org.eclipse.jetty.websocket.common.OpCode;
35 import org.eclipse.jetty.websocket.common.WebSocketSession;
36 import org.eclipse.jetty.websocket.common.frames.CloseFrame;
37 import org.eclipse.jetty.websocket.common.message.MessageAppender;
38
39
40
41
42 public abstract class AbstractEventDriver implements IncomingFrames, EventDriver
43 {
44 private static final Logger LOG = Log.getLogger(AbstractEventDriver.class);
45 protected final Logger TARGET_LOG;
46 protected final WebSocketPolicy policy;
47 protected final Object websocket;
48 protected WebSocketSession session;
49 protected MessageAppender activeMessage;
50
51 public AbstractEventDriver(WebSocketPolicy policy, Object websocket)
52 {
53 this.policy = policy;
54 this.websocket = websocket;
55 this.TARGET_LOG = Log.getLogger(websocket.getClass());
56 }
57
58 protected void appendMessage(ByteBuffer buffer, boolean fin) throws IOException
59 {
60 activeMessage.appendFrame(buffer,fin);
61
62 if (fin)
63 {
64 activeMessage.messageComplete();
65 activeMessage = null;
66 }
67 }
68
69 protected void dispatch(Runnable runnable)
70 {
71 session.dispatch(runnable);
72 }
73
74 @Override
75 public WebSocketPolicy getPolicy()
76 {
77 return policy;
78 }
79
80 @Override
81 public WebSocketSession getSession()
82 {
83 return session;
84 }
85
86 @Override
87 public final void incomingError(Throwable e)
88 {
89 if (LOG.isDebugEnabled())
90 {
91 LOG.debug("incomingError(" + e.getClass().getName() + ")",e);
92 }
93
94 onError(e);
95 }
96
97 @Override
98 public void incomingFrame(Frame frame)
99 {
100 if (LOG.isDebugEnabled())
101 {
102 LOG.debug("incomingFrame({})",frame);
103 }
104
105 try
106 {
107 onFrame(frame);
108
109 byte opcode = frame.getOpCode();
110 switch (opcode)
111 {
112 case OpCode.CLOSE:
113 {
114 boolean validate = true;
115 CloseFrame closeframe = (CloseFrame)frame;
116 CloseInfo close = new CloseInfo(closeframe,validate);
117
118
119 session.getConnection().getIOState().onCloseRemote(close);
120
121 return;
122 }
123 case OpCode.PING:
124 {
125 if (LOG.isDebugEnabled())
126 {
127 LOG.debug("PING: {}",BufferUtil.toDetailString(frame.getPayload()));
128 }
129 ByteBuffer pongBuf;
130 if (frame.hasPayload())
131 {
132 pongBuf = ByteBuffer.allocate(frame.getPayload().remaining());
133 BufferUtil.put(frame.getPayload().slice(),pongBuf);
134 BufferUtil.flipToFlush(pongBuf,0);
135 }
136 else
137 {
138 pongBuf = ByteBuffer.allocate(0);
139 }
140 onPing(frame.getPayload());
141 session.getRemote().sendPong(pongBuf);
142 break;
143 }
144 case OpCode.PONG:
145 {
146 if (LOG.isDebugEnabled())
147 {
148 LOG.debug("PONG: {}",BufferUtil.toDetailString(frame.getPayload()));
149 }
150 onPong(frame.getPayload());
151 break;
152 }
153 case OpCode.BINARY:
154 {
155 onBinaryFrame(frame.getPayload(),frame.isFin());
156 return;
157 }
158 case OpCode.TEXT:
159 {
160 onTextFrame(frame.getPayload(),frame.isFin());
161 return;
162 }
163 case OpCode.CONTINUATION:
164 {
165 onContinuationFrame(frame.getPayload(),frame.isFin());
166 return;
167 }
168 default:
169 {
170 if (LOG.isDebugEnabled())
171 LOG.debug("Unhandled OpCode: {}",opcode);
172 }
173 }
174 }
175 catch (NotUtf8Exception e)
176 {
177 terminateConnection(StatusCode.BAD_PAYLOAD,e.getMessage());
178 }
179 catch (CloseException e)
180 {
181 terminateConnection(e.getStatusCode(),e.getMessage());
182 }
183 catch (Throwable t)
184 {
185 unhandled(t);
186 }
187 }
188
189 @Override
190 public void onContinuationFrame(ByteBuffer buffer, boolean fin) throws IOException
191 {
192 if (activeMessage == null)
193 {
194 throw new IOException("Out of order Continuation frame encountered");
195 }
196
197 appendMessage(buffer,fin);
198 }
199
200 @Override
201 public void onPong(ByteBuffer buffer)
202 {
203
204 }
205
206 @Override
207 public void onPing(ByteBuffer buffer)
208 {
209
210 }
211
212 @Override
213 public void openSession(WebSocketSession session)
214 {
215 if (LOG.isDebugEnabled())
216 LOG.debug("openSession({})",session);
217 this.session = session;
218 try
219 {
220 this.onConnect();
221 }
222 catch (Throwable t)
223 {
224 unhandled(t);
225 throw t;
226 }
227 }
228
229 protected void terminateConnection(int statusCode, String rawreason)
230 {
231 if (LOG.isDebugEnabled())
232 LOG.debug("terminateConnection({},{})",statusCode,rawreason);
233 session.close(statusCode,CloseFrame.truncate(rawreason));
234 }
235
236 private void unhandled(Throwable t)
237 {
238 TARGET_LOG.warn("Unhandled Error (closing connection)",t);
239 onError(t);
240
241
242 switch (policy.getBehavior())
243 {
244 case SERVER:
245 terminateConnection(StatusCode.SERVER_ERROR,t.getClass().getSimpleName());
246 break;
247 case CLIENT:
248 terminateConnection(StatusCode.POLICY_VIOLATION,t.getClass().getSimpleName());
249 break;
250 }
251 }
252 }