1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.client.io;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.nio.ByteBuffer;
24 import java.nio.charset.StandardCharsets;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.Executor;
28
29 import org.eclipse.jetty.io.AbstractConnection;
30 import org.eclipse.jetty.io.ByteBufferPool;
31 import org.eclipse.jetty.io.EndPoint;
32 import org.eclipse.jetty.util.BufferUtil;
33 import org.eclipse.jetty.util.FutureCallback;
34 import org.eclipse.jetty.util.QuotedStringTokenizer;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37 import org.eclipse.jetty.websocket.api.UpgradeException;
38 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
39 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
40 import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
41 import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
42 import org.eclipse.jetty.websocket.common.AcceptHash;
43 import org.eclipse.jetty.websocket.common.SessionFactory;
44 import org.eclipse.jetty.websocket.common.WebSocketSession;
45 import org.eclipse.jetty.websocket.common.events.EventDriver;
46 import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
47 import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
48 import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser.ParseException;
49
50
51
52
53
54
55 public class UpgradeConnection extends AbstractConnection
56 {
57 public class SendUpgradeRequest extends FutureCallback implements Runnable
58 {
59 @Override
60 public void run()
61 {
62 URI uri = connectPromise.getRequest().getRequestURI();
63 request.setRequestURI(uri);
64
65 UpgradeListener handshakeListener = connectPromise.getUpgradeListener();
66 if (handshakeListener != null)
67 {
68 handshakeListener.onHandshakeRequest(request);
69 }
70
71 String rawRequest = request.generate();
72
73 ByteBuffer buf = BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8);
74 getEndPoint().write(this,buf);
75 }
76
77 @Override
78 public void succeeded()
79 {
80
81 super.succeeded();
82
83 fillInterested();
84 }
85
86 @Override
87 public void failed(Throwable cause)
88 {
89 super.failed(cause);
90
91 connectPromise.failed(cause);
92 }
93 }
94
95
96 private static final int SWITCHING_PROTOCOLS = 101;
97
98 private static final Logger LOG = Log.getLogger(UpgradeConnection.class);
99 private final ByteBufferPool bufferPool;
100 private final ConnectPromise connectPromise;
101 private final HttpResponseHeaderParser parser;
102 private ClientUpgradeRequest request;
103
104 public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
105 {
106 super(endp,executor);
107 this.connectPromise = connectPromise;
108 this.bufferPool = connectPromise.getClient().getBufferPool();
109 this.request = connectPromise.getRequest();
110
111
112 this.parser = new HttpResponseHeaderParser(new ClientUpgradeResponse());
113 }
114
115 public void disconnect(boolean onlyOutput)
116 {
117 EndPoint endPoint = getEndPoint();
118
119
120 LOG.debug("Shutting down output {}",endPoint);
121 endPoint.shutdownOutput();
122 if (!onlyOutput)
123 {
124 LOG.debug("Closing {}",endPoint);
125 endPoint.close();
126 }
127 }
128
129 private void notifyConnect(ClientUpgradeResponse response)
130 {
131 connectPromise.setResponse(response);
132
133 UpgradeListener handshakeListener = connectPromise.getUpgradeListener();
134 if (handshakeListener != null)
135 {
136 handshakeListener.onHandshakeResponse(response);
137 }
138 }
139
140 @Override
141 public void onFillable()
142 {
143 ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
144 BufferUtil.clear(buffer);
145 boolean readMore = false;
146 try
147 {
148 readMore = read(buffer);
149 }
150 finally
151 {
152 bufferPool.release(buffer);
153 }
154
155 if (readMore)
156 {
157 fillInterested();
158 }
159 }
160
161 @Override
162 public void onOpen()
163 {
164 super.onOpen();
165
166 getExecutor().execute(new SendUpgradeRequest());
167 }
168
169
170
171
172
173
174
175
176 private boolean read(ByteBuffer buffer)
177 {
178 EndPoint endPoint = getEndPoint();
179 try
180 {
181 while (true)
182 {
183 int filled = endPoint.fill(buffer);
184 if (filled == 0)
185 {
186 return true;
187 }
188 else if (filled < 0)
189 {
190 LOG.debug("read - EOF Reached");
191 return false;
192 }
193 else
194 {
195 if (LOG.isDebugEnabled())
196 {
197 LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
198 }
199 ClientUpgradeResponse resp = (ClientUpgradeResponse)parser.parse(buffer);
200 if (resp != null)
201 {
202
203 validateResponse(resp);
204 notifyConnect(resp);
205 upgradeConnection(resp);
206 if (buffer.hasRemaining())
207 {
208 LOG.debug("Has remaining client bytebuffer of {}",buffer.remaining());
209 }
210 return false;
211 }
212 }
213 }
214 }
215 catch (IOException | ParseException e)
216 {
217 UpgradeException ue = new UpgradeException(request.getRequestURI(),e);
218 connectPromise.failed(ue);
219 disconnect(false);
220 return false;
221 }
222 catch (UpgradeException e)
223 {
224 connectPromise.failed(e);
225 disconnect(false);
226 return false;
227 }
228 }
229
230 private void upgradeConnection(ClientUpgradeResponse response)
231 {
232 EndPoint endp = getEndPoint();
233 Executor executor = getExecutor();
234
235 EventDriver websocket = connectPromise.getDriver();
236 WebSocketPolicy policy = websocket.getPolicy();
237
238 WebSocketClientConnection connection = new WebSocketClientConnection(endp,executor,connectPromise,policy);
239
240 SessionFactory sessionFactory = connectPromise.getClient().getSessionFactory();
241 WebSocketSession session = sessionFactory.createSession(request.getRequestURI(),websocket,connection);
242 session.setPolicy(policy);
243 session.setUpgradeResponse(response);
244
245 connection.setSession(session);
246
247
248 ExtensionStack extensionStack = new ExtensionStack(connectPromise.getClient().getExtensionFactory());
249 extensionStack.negotiate(response.getExtensions());
250
251 extensionStack.configure(connection.getParser());
252 extensionStack.configure(connection.getGenerator());
253
254
255 connection.setNextIncomingFrames(extensionStack);
256 extensionStack.setNextIncoming(session);
257
258
259 session.setOutgoingHandler(extensionStack);
260 extensionStack.setNextOutgoing(connection);
261
262
263 endp.setConnection(connection);
264 connection.onOpen();
265 }
266
267 private void validateResponse(ClientUpgradeResponse response)
268 {
269
270 if (response.getStatusCode() != SWITCHING_PROTOCOLS)
271 {
272 throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Didn't switch protocols");
273 }
274
275
276 String connection = response.getHeader("Connection");
277 if (!"upgrade".equalsIgnoreCase(connection))
278 {
279 throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Connection is " + connection + " (expected upgrade)");
280 }
281
282
283 String reqKey = request.getKey();
284 String expectedHash = AcceptHash.hashKey(reqKey);
285 String respHash = response.getHeader("Sec-WebSocket-Accept");
286
287 response.setSuccess(true);
288 if (expectedHash.equalsIgnoreCase(respHash) == false)
289 {
290 response.setSuccess(false);
291 throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Invalid Sec-WebSocket-Accept hash");
292 }
293
294
295 List<ExtensionConfig> extensions = new ArrayList<>();
296 List<String> extValues = response.getHeaders("Sec-WebSocket-Extensions");
297 if (extValues != null)
298 {
299 for (String extVal : extValues)
300 {
301 QuotedStringTokenizer tok = new QuotedStringTokenizer(extVal,",");
302 while (tok.hasMoreTokens())
303 {
304 extensions.add(ExtensionConfig.parse(tok.nextToken()));
305 }
306 }
307 }
308 response.setExtensions(extensions);
309 }
310 }