1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.client.io;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.net.URI;
24 import java.nio.ByteBuffer;
25 import java.nio.charset.StandardCharsets;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.Executor;
29
30 import org.eclipse.jetty.io.AbstractConnection;
31 import org.eclipse.jetty.io.ByteBufferPool;
32 import org.eclipse.jetty.io.Connection;
33 import org.eclipse.jetty.io.EndPoint;
34 import org.eclipse.jetty.util.BufferUtil;
35 import org.eclipse.jetty.util.FutureCallback;
36 import org.eclipse.jetty.util.QuotedStringTokenizer;
37 import org.eclipse.jetty.util.log.Log;
38 import org.eclipse.jetty.util.log.Logger;
39 import org.eclipse.jetty.websocket.api.UpgradeException;
40 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
41 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
42 import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
43 import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
44 import org.eclipse.jetty.websocket.common.AcceptHash;
45 import org.eclipse.jetty.websocket.common.SessionFactory;
46 import org.eclipse.jetty.websocket.common.WebSocketSession;
47 import org.eclipse.jetty.websocket.common.events.EventDriver;
48 import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
49 import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
50 import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser.ParseException;
51
52
53
54
55
56
57
58
59 public class UpgradeConnection extends AbstractConnection implements Connection.UpgradeFrom
60 {
61 public class SendUpgradeRequest extends FutureCallback implements Runnable
62 {
63 private final Logger LOG = Log.getLogger(UpgradeConnection.SendUpgradeRequest.class);
64
65 @Override
66 public void run()
67 {
68 URI uri = connectPromise.getRequest().getRequestURI();
69 request.setRequestURI(uri);
70
71 UpgradeListener handshakeListener = connectPromise.getUpgradeListener();
72 if (handshakeListener != null)
73 {
74 handshakeListener.onHandshakeRequest(request);
75 }
76
77 String rawRequest = request.generate();
78
79 ByteBuffer buf = BufferUtil.toBuffer(rawRequest,StandardCharsets.UTF_8);
80 getEndPoint().write(this,buf);
81 }
82
83 @Override
84 public void succeeded()
85 {
86 if (LOG.isDebugEnabled())
87 {
88 LOG.debug("Upgrade Request Write Success");
89 }
90
91 super.succeeded();
92 state = State.RESPONSE;
93
94 fillInterested();
95 }
96
97 @Override
98 public void failed(Throwable cause)
99 {
100 if (LOG.isDebugEnabled())
101 {
102 LOG.debug("Upgrade Request Write Failure",cause);
103 }
104 super.failed(cause);
105 state = State.FAILURE;
106
107 connectPromise.failed(cause);
108 }
109 }
110
111
112 private static final int SWITCHING_PROTOCOLS = 101;
113
114 private enum State
115 {
116 REQUEST,
117 RESPONSE,
118 FAILURE,
119 UPGRADE
120 }
121
122 private static final Logger LOG = Log.getLogger(UpgradeConnection.class);
123 private final ByteBufferPool bufferPool;
124 private final ConnectPromise connectPromise;
125 private final HttpResponseHeaderParser parser;
126 private State state = State.REQUEST;
127 private ClientUpgradeRequest request;
128 private ClientUpgradeResponse response;
129
130 public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
131 {
132 super(endp,executor);
133 this.connectPromise = connectPromise;
134 this.bufferPool = connectPromise.getClient().getBufferPool();
135 this.request = connectPromise.getRequest();
136
137
138 this.parser = new HttpResponseHeaderParser(new ClientUpgradeResponse());
139 }
140
141 public void disconnect(boolean onlyOutput)
142 {
143 EndPoint endPoint = getEndPoint();
144
145
146 if (LOG.isDebugEnabled())
147 {
148 LOG.debug("Shutting down output {}",endPoint);
149 }
150
151 endPoint.shutdownOutput();
152 if (!onlyOutput)
153 {
154 if (LOG.isDebugEnabled())
155 {
156 LOG.debug("Closing {}",endPoint);
157 }
158 endPoint.close();
159 }
160 }
161
162 private void failUpgrade(Throwable cause)
163 {
164 close();
165 connectPromise.failed(cause);
166 }
167
168 private void notifyConnect(ClientUpgradeResponse response)
169 {
170 connectPromise.setResponse(response);
171
172 UpgradeListener handshakeListener = connectPromise.getUpgradeListener();
173 if (handshakeListener != null)
174 {
175 handshakeListener.onHandshakeResponse(response);
176 }
177 }
178
179 @Override
180 public ByteBuffer onUpgradeFrom()
181 {
182 return connectPromise.getResponse().getRemainingBuffer();
183 }
184
185 @Override
186 public void onFillable()
187 {
188 if (LOG.isDebugEnabled())
189 {
190 LOG.debug("onFillable");
191 }
192 ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
193 BufferUtil.clear(buffer);
194 try
195 {
196 read(buffer);
197 }
198 finally
199 {
200 bufferPool.release(buffer);
201 }
202
203 if (state == State.RESPONSE)
204 {
205
206 fillInterested();
207 }
208 else if (state == State.UPGRADE)
209 {
210
211 upgradeConnection(response);
212 }
213 }
214
215 @Override
216 public void onOpen()
217 {
218 super.onOpen();
219 getExecutor().execute(new SendUpgradeRequest());
220 }
221
222 @Override
223 public void onClose()
224 {
225 if (LOG.isDebugEnabled())
226 {
227 LOG.debug("Closed connection {}",this);
228 }
229 super.onClose();
230 }
231
232 @Override
233 protected boolean onReadTimeout()
234 {
235 if (LOG.isDebugEnabled())
236 {
237 LOG.debug("Timeout on connection {}",this);
238 }
239
240 failUpgrade(new IOException("Timeout while performing WebSocket Upgrade"));
241
242 return super.onReadTimeout();
243 }
244
245
246
247
248
249
250
251 private void read(ByteBuffer buffer)
252 {
253 EndPoint endPoint = getEndPoint();
254 try
255 {
256 while (true)
257 {
258 int filled = endPoint.fill(buffer);
259 if (filled == 0)
260 {
261 return;
262 }
263 else if (filled < 0)
264 {
265 LOG.warn("read - EOF Reached");
266 state = State.FAILURE;
267 failUpgrade(new EOFException("Reading WebSocket Upgrade response"));
268 return;
269 }
270 else
271 {
272 if (LOG.isDebugEnabled())
273 {
274 LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
275 }
276 response = (ClientUpgradeResponse)parser.parse(buffer);
277 if (response != null)
278 {
279
280 validateResponse(response);
281 notifyConnect(response);
282 state = State.UPGRADE;
283 return;
284 }
285 }
286 }
287 }
288 catch (IOException | ParseException e)
289 {
290 LOG.ignore(e);
291 state = State.FAILURE;
292 UpgradeException ue = new UpgradeException(request.getRequestURI(),e);
293 connectPromise.failed(ue);
294 disconnect(false);
295 }
296 catch (UpgradeException e)
297 {
298 LOG.ignore(e);
299 state = State.FAILURE;
300 connectPromise.failed(e);
301 disconnect(false);
302 }
303 }
304
305 private void upgradeConnection(ClientUpgradeResponse response)
306 {
307 EndPoint endp = getEndPoint();
308 Executor executor = getExecutor();
309
310 EventDriver websocket = connectPromise.getDriver();
311 WebSocketPolicy policy = websocket.getPolicy();
312
313 WebSocketClientConnection connection = new WebSocketClientConnection(endp,executor,connectPromise,policy);
314
315 SessionFactory sessionFactory = connectPromise.getClient().getSessionFactory();
316 WebSocketSession session = sessionFactory.createSession(request.getRequestURI(),websocket,connection);
317 session.setPolicy(policy);
318 session.setUpgradeRequest(request);
319 session.setUpgradeResponse(response);
320 connection.addListener(session);
321 connectPromise.setSession(session);
322
323
324 ExtensionStack extensionStack = new ExtensionStack(connectPromise.getClient().getExtensionFactory());
325 extensionStack.negotiate(response.getExtensions());
326
327 extensionStack.configure(connection.getParser());
328 extensionStack.configure(connection.getGenerator());
329
330
331 connection.setNextIncomingFrames(extensionStack);
332 extensionStack.setNextIncoming(session);
333
334
335 session.setOutgoingHandler(extensionStack);
336 extensionStack.setNextOutgoing(connection);
337
338 session.addManaged(extensionStack);
339 connectPromise.getClient().addManaged(session);
340
341
342 endp.upgrade(connection);
343 }
344
345 private void validateResponse(ClientUpgradeResponse response)
346 {
347
348 if (response.getStatusCode() != SWITCHING_PROTOCOLS)
349 {
350
351 throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Didn't switch protocols, expected status <" + SWITCHING_PROTOCOLS
352 + ">, but got <" + response.getStatusCode() + ">");
353 }
354
355
356 String connection = response.getHeader("Connection");
357 if (!"upgrade".equalsIgnoreCase(connection))
358 {
359 throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Connection is " + connection + " (expected upgrade)");
360 }
361
362
363 String reqKey = request.getKey();
364 String expectedHash = AcceptHash.hashKey(reqKey);
365 String respHash = response.getHeader("Sec-WebSocket-Accept");
366
367 response.setSuccess(true);
368 if (expectedHash.equalsIgnoreCase(respHash) == false)
369 {
370 response.setSuccess(false);
371 throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Invalid Sec-WebSocket-Accept hash");
372 }
373
374
375 List<ExtensionConfig> extensions = new ArrayList<>();
376 List<String> extValues = response.getHeaders("Sec-WebSocket-Extensions");
377 if (extValues != null)
378 {
379 for (String extVal : extValues)
380 {
381 QuotedStringTokenizer tok = new QuotedStringTokenizer(extVal,",");
382 while (tok.hasMoreTokens())
383 {
384 extensions.add(ExtensionConfig.parse(tok.nextToken()));
385 }
386 }
387 }
388 response.setExtensions(extensions);
389 }
390 }