1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.events;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23
24 import org.eclipse.jetty.util.BufferUtil;
25 import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
26 import org.eclipse.jetty.util.log.Log;
27 import org.eclipse.jetty.util.log.Logger;
28 import org.eclipse.jetty.websocket.api.CloseException;
29 import org.eclipse.jetty.websocket.api.StatusCode;
30 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
31 import org.eclipse.jetty.websocket.api.extensions.Frame;
32 import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
33 import org.eclipse.jetty.websocket.common.CloseInfo;
34 import org.eclipse.jetty.websocket.common.OpCode;
35 import org.eclipse.jetty.websocket.common.WebSocketSession;
36 import org.eclipse.jetty.websocket.common.frames.CloseFrame;
37 import org.eclipse.jetty.websocket.common.message.MessageAppender;
38
39
40
41
42 public abstract class AbstractEventDriver implements IncomingFrames, EventDriver
43 {
44 private static final Logger LOG = Log.getLogger(AbstractEventDriver.class);
45 protected final WebSocketPolicy policy;
46 protected final Object websocket;
47 protected WebSocketSession session;
48 protected MessageAppender activeMessage;
49
50 public AbstractEventDriver(WebSocketPolicy policy, Object websocket)
51 {
52 this.policy = policy;
53 this.websocket = websocket;
54 }
55
56 protected void appendMessage(ByteBuffer buffer, boolean fin) throws IOException
57 {
58 activeMessage.appendMessage(buffer,fin);
59
60 if (fin)
61 {
62 activeMessage.messageComplete();
63 activeMessage = null;
64 }
65 }
66
67 protected void dispatch(Runnable runnable)
68 {
69 session.dispatch(runnable);
70 }
71
72 @Override
73 public WebSocketPolicy getPolicy()
74 {
75 return policy;
76 }
77
78 @Override
79 public WebSocketSession getSession()
80 {
81 return session;
82 }
83
84 @Override
85 public final void incomingError(Throwable e)
86 {
87 if (LOG.isDebugEnabled())
88 {
89 LOG.debug("incoming(WebSocketException)",e);
90 }
91
92 if (e instanceof CloseException)
93 {
94 CloseException close = (CloseException)e;
95 terminateConnection(close.getStatusCode(),close.getMessage());
96 }
97
98 onError(e);
99 }
100
101 @Override
102 public void incomingFrame(Frame frame)
103 {
104 if (LOG.isDebugEnabled())
105 {
106 LOG.debug("{}.onFrame({})",websocket.getClass().getSimpleName(),frame);
107 }
108
109 try
110 {
111 onFrame(frame);
112
113 byte opcode = frame.getOpCode();
114 switch (opcode)
115 {
116 case OpCode.CLOSE:
117 {
118 boolean validate = true;
119 CloseFrame closeframe = (CloseFrame)frame;
120 CloseInfo close = new CloseInfo(closeframe,validate);
121
122
123 onClose(close);
124
125
126 session.getConnection().getIOState().onCloseRemote(close);
127
128 return;
129 }
130 case OpCode.PING:
131 {
132 ByteBuffer pongBuf;
133 if (frame.hasPayload())
134 {
135 pongBuf = ByteBuffer.allocate(frame.getPayload().remaining());
136 BufferUtil.put(frame.getPayload(),pongBuf);
137 BufferUtil.flipToFlush(pongBuf,0);
138 }
139 else
140 {
141 pongBuf = ByteBuffer.allocate(0);
142 }
143 onPing(frame.getPayload());
144 session.getRemote().sendPong(pongBuf);
145 break;
146 }
147 case OpCode.PONG:
148 {
149 onPong(frame.getPayload());
150 break;
151 }
152 case OpCode.BINARY:
153 {
154 onBinaryFrame(frame.getPayload(),frame.isFin());
155 return;
156 }
157 case OpCode.TEXT:
158 {
159 onTextFrame(frame.getPayload(),frame.isFin());
160 return;
161 }
162 case OpCode.CONTINUATION:
163 {
164 onContinuationFrame(frame.getPayload(),frame.isFin());
165 return;
166 }
167 default:
168 {
169 LOG.debug("Unhandled OpCode: {}",opcode);
170 }
171 }
172 }
173 catch (NotUtf8Exception e)
174 {
175 terminateConnection(StatusCode.BAD_PAYLOAD,e.getMessage());
176 }
177 catch (CloseException e)
178 {
179 terminateConnection(e.getStatusCode(),e.getMessage());
180 }
181 catch (Throwable t)
182 {
183 unhandled(t);
184 }
185 }
186
187 @Override
188 public void onContinuationFrame(ByteBuffer buffer, boolean fin) throws IOException
189 {
190 if (activeMessage == null)
191 {
192 throw new IOException("Out of order Continuation frame encountered");
193 }
194
195 appendMessage(buffer,fin);
196 }
197
198 @Override
199 public void onPong(ByteBuffer buffer)
200 {
201
202 }
203
204 @Override
205 public void onPing(ByteBuffer buffer)
206 {
207
208 }
209
210 @Override
211 public void openSession(WebSocketSession session)
212 {
213 LOG.debug("openSession({})",session);
214 this.session = session;
215 try
216 {
217 this.onConnect();
218 }
219 catch (Throwable t)
220 {
221 unhandled(t);
222 }
223 }
224
225 protected void terminateConnection(int statusCode, String rawreason)
226 {
227 LOG.debug("terminateConnection({},{})",statusCode,rawreason);
228 session.close(statusCode,CloseFrame.truncate(rawreason));
229 }
230
231 private void unhandled(Throwable t)
232 {
233 LOG.warn("Unhandled Error (closing connection)",t);
234 onError(t);
235
236
237 switch (policy.getBehavior())
238 {
239 case SERVER:
240 terminateConnection(StatusCode.SERVER_ERROR,t.getClass().getSimpleName());
241 break;
242 case CLIENT:
243 terminateConnection(StatusCode.POLICY_VIOLATION,t.getClass().getSimpleName());
244 break;
245 }
246 }
247 }