1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.client.io;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.nio.ByteBuffer;
24 import java.nio.charset.StandardCharsets;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.Executor;
28
29 import org.eclipse.jetty.io.AbstractConnection;
30 import org.eclipse.jetty.io.ByteBufferPool;
31 import org.eclipse.jetty.io.EndPoint;
32 import org.eclipse.jetty.util.BufferUtil;
33 import org.eclipse.jetty.util.FutureCallback;
34 import org.eclipse.jetty.util.QuotedStringTokenizer;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37 import org.eclipse.jetty.websocket.api.UpgradeException;
38 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
39 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
40 import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
41 import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
42 import org.eclipse.jetty.websocket.common.AcceptHash;
43 import org.eclipse.jetty.websocket.common.SessionFactory;
44 import org.eclipse.jetty.websocket.common.WebSocketSession;
45 import org.eclipse.jetty.websocket.common.events.EventDriver;
46 import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
47 import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
48 import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser.ParseException;
49
50
51
52
53
54
55 public class UpgradeConnection extends AbstractConnection
56 {
57 public class SendUpgradeRequest extends FutureCallback implements Runnable
58 {
59 @Override
60 public void run()
61 {
62 URI uri = connectPromise.getRequest().getRequestURI();
63 request.setRequestURI(uri);
64
65 UpgradeListener handshakeListener = connectPromise.getUpgradeListener();
66 if (handshakeListener != null)
67 {
68 handshakeListener.onHandshakeRequest(request);
69 }
70
71 String rawRequest = request.generate();
72
73 ByteBuffer buf = BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8);
74 getEndPoint().write(this,buf);
75 }
76
77 @Override
78 public void succeeded()
79 {
80
81 super.succeeded();
82
83 fillInterested();
84 }
85
86 @Override
87 public void failed(Throwable cause)
88 {
89 super.failed(cause);
90
91 connectPromise.failed(cause);
92 }
93 }
94
95
96 private static final int SWITCHING_PROTOCOLS = 101;
97
98 private static final Logger LOG = Log.getLogger(UpgradeConnection.class);
99 private final ByteBufferPool bufferPool;
100 private final ConnectPromise connectPromise;
101 private final HttpResponseHeaderParser parser;
102 private ClientUpgradeRequest request;
103
104 public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
105 {
106 super(endp,executor);
107 this.connectPromise = connectPromise;
108 this.bufferPool = connectPromise.getClient().getBufferPool();
109 this.request = connectPromise.getRequest();
110
111
112 this.parser = new HttpResponseHeaderParser(new ClientUpgradeResponse());
113 }
114
115 public void disconnect(boolean onlyOutput)
116 {
117 EndPoint endPoint = getEndPoint();
118
119
120 if (LOG.isDebugEnabled())
121 LOG.debug("Shutting down output {}",endPoint);
122 endPoint.shutdownOutput();
123 if (!onlyOutput)
124 {
125 if (LOG.isDebugEnabled())
126 LOG.debug("Closing {}",endPoint);
127 endPoint.close();
128 }
129 }
130
131 private void notifyConnect(ClientUpgradeResponse response)
132 {
133 connectPromise.setResponse(response);
134
135 UpgradeListener handshakeListener = connectPromise.getUpgradeListener();
136 if (handshakeListener != null)
137 {
138 handshakeListener.onHandshakeResponse(response);
139 }
140 }
141
142 @Override
143 public void onFillable()
144 {
145 ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
146 BufferUtil.clear(buffer);
147 boolean readMore = false;
148 try
149 {
150 readMore = read(buffer);
151 }
152 finally
153 {
154 bufferPool.release(buffer);
155 }
156
157 if (readMore)
158 {
159 fillInterested();
160 }
161 }
162
163 @Override
164 public void onOpen()
165 {
166 super.onOpen();
167
168 getExecutor().execute(new SendUpgradeRequest());
169 }
170
171
172
173
174
175
176
177
178 private boolean read(ByteBuffer buffer)
179 {
180 EndPoint endPoint = getEndPoint();
181 try
182 {
183 while (true)
184 {
185 int filled = endPoint.fill(buffer);
186 if (filled == 0)
187 {
188 return true;
189 }
190 else if (filled < 0)
191 {
192 LOG.debug("read - EOF Reached");
193 return false;
194 }
195 else
196 {
197 if (LOG.isDebugEnabled())
198 {
199 LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
200 }
201 ClientUpgradeResponse resp = (ClientUpgradeResponse)parser.parse(buffer);
202 if (resp != null)
203 {
204
205 validateResponse(resp);
206 notifyConnect(resp);
207 upgradeConnection(resp);
208 if (buffer.hasRemaining())
209 {
210 LOG.debug("Has remaining client bytebuffer of {}",buffer.remaining());
211 }
212 return false;
213 }
214 }
215 }
216 }
217 catch (IOException | ParseException e)
218 {
219 UpgradeException ue = new UpgradeException(request.getRequestURI(),e);
220 connectPromise.failed(ue);
221 disconnect(false);
222 return false;
223 }
224 catch (UpgradeException e)
225 {
226 connectPromise.failed(e);
227 disconnect(false);
228 return false;
229 }
230 }
231
232 private void upgradeConnection(ClientUpgradeResponse response)
233 {
234 EndPoint endp = getEndPoint();
235 Executor executor = getExecutor();
236
237 EventDriver websocket = connectPromise.getDriver();
238 WebSocketPolicy policy = websocket.getPolicy();
239
240 WebSocketClientConnection connection = new WebSocketClientConnection(endp,executor,connectPromise,policy);
241
242 SessionFactory sessionFactory = connectPromise.getClient().getSessionFactory();
243 WebSocketSession session = sessionFactory.createSession(request.getRequestURI(),websocket,connection);
244 session.setPolicy(policy);
245 session.setUpgradeResponse(response);
246
247 connection.setSession(session);
248
249
250 ExtensionStack extensionStack = new ExtensionStack(connectPromise.getClient().getExtensionFactory());
251 extensionStack.negotiate(response.getExtensions());
252
253 extensionStack.configure(connection.getParser());
254 extensionStack.configure(connection.getGenerator());
255
256
257 connection.setNextIncomingFrames(extensionStack);
258 extensionStack.setNextIncoming(session);
259
260
261 session.setOutgoingHandler(extensionStack);
262 extensionStack.setNextOutgoing(connection);
263
264 session.addBean(extensionStack);
265 connectPromise.getClient().addManaged(session);
266
267
268 endp.setConnection(connection);
269 connection.onOpen();
270 }
271
272 private void validateResponse(ClientUpgradeResponse response)
273 {
274
275 if (response.getStatusCode() != SWITCHING_PROTOCOLS)
276 {
277 throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Didn't switch protocols");
278 }
279
280
281 String connection = response.getHeader("Connection");
282 if (!"upgrade".equalsIgnoreCase(connection))
283 {
284 throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Connection is " + connection + " (expected upgrade)");
285 }
286
287
288 String reqKey = request.getKey();
289 String expectedHash = AcceptHash.hashKey(reqKey);
290 String respHash = response.getHeader("Sec-WebSocket-Accept");
291
292 response.setSuccess(true);
293 if (expectedHash.equalsIgnoreCase(respHash) == false)
294 {
295 response.setSuccess(false);
296 throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Invalid Sec-WebSocket-Accept hash");
297 }
298
299
300 List<ExtensionConfig> extensions = new ArrayList<>();
301 List<String> extValues = response.getHeaders("Sec-WebSocket-Extensions");
302 if (extValues != null)
303 {
304 for (String extVal : extValues)
305 {
306 QuotedStringTokenizer tok = new QuotedStringTokenizer(extVal,",");
307 while (tok.hasMoreTokens())
308 {
309 extensions.add(ExtensionConfig.parse(tok.nextToken()));
310 }
311 }
312 }
313 response.setExtensions(extensions);
314 }
315 }