1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.client.io;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.Executor;
27
28 import org.eclipse.jetty.io.AbstractConnection;
29 import org.eclipse.jetty.io.ByteBufferPool;
30 import org.eclipse.jetty.io.EndPoint;
31 import org.eclipse.jetty.util.BufferUtil;
32 import org.eclipse.jetty.util.FutureCallback;
33 import org.eclipse.jetty.util.QuotedStringTokenizer;
34 import org.eclipse.jetty.util.StringUtil;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37 import org.eclipse.jetty.websocket.api.UpgradeException;
38 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
39 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
40 import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
41 import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
42 import org.eclipse.jetty.websocket.client.io.HttpResponseHeaderParser.ParseException;
43 import org.eclipse.jetty.websocket.common.AcceptHash;
44 import org.eclipse.jetty.websocket.common.WebSocketSession;
45 import org.eclipse.jetty.websocket.common.events.EventDriver;
46 import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
47
48
49
50
51
52
53 public class UpgradeConnection extends AbstractConnection
54 {
55 public class SendUpgradeRequest extends FutureCallback implements Runnable
56 {
57 @Override
58 public void run()
59 {
60 URI uri = connectPromise.getRequest().getRequestURI();
61 request.setRequestURI(uri);
62 String rawRequest = request.generate();
63
64 ByteBuffer buf = BufferUtil.toBuffer(rawRequest,StringUtil.__UTF8_CHARSET);
65 getEndPoint().write(this,buf);
66 }
67
68 @Override
69 public void succeeded()
70 {
71
72 super.succeeded();
73
74 fillInterested();
75 }
76 }
77
78
79 private static final int SWITCHING_PROTOCOLS = 101;
80
81 private static final Logger LOG = Log.getLogger(UpgradeConnection.class);
82 private final ByteBufferPool bufferPool;
83 private final ConnectPromise connectPromise;
84 private final HttpResponseHeaderParser parser;
85 private ClientUpgradeRequest request;
86
87 public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
88 {
89 super(endp,executor);
90 this.connectPromise = connectPromise;
91 this.bufferPool = connectPromise.getClient().getBufferPool();
92 this.request = connectPromise.getRequest();
93
94
95 this.parser = new HttpResponseHeaderParser();
96 }
97
98 public void disconnect(boolean onlyOutput)
99 {
100 EndPoint endPoint = getEndPoint();
101
102
103 LOG.debug("Shutting down output {}",endPoint);
104 endPoint.shutdownOutput();
105 if (!onlyOutput)
106 {
107 LOG.debug("Closing {}",endPoint);
108 endPoint.close();
109 }
110 }
111
112 private void notifyConnect(ClientUpgradeResponse response)
113 {
114 connectPromise.setResponse(response);
115 }
116
117 @Override
118 public void onFillable()
119 {
120 ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
121 BufferUtil.clear(buffer);
122 boolean readMore = false;
123 try
124 {
125 readMore = read(buffer);
126 }
127 finally
128 {
129 bufferPool.release(buffer);
130 }
131
132 if (readMore)
133 {
134 fillInterested();
135 }
136 }
137
138 @Override
139 public void onOpen()
140 {
141 super.onOpen();
142
143 getExecutor().execute(new SendUpgradeRequest());
144 }
145
146
147
148
149
150
151
152
153 private boolean read(ByteBuffer buffer)
154 {
155 EndPoint endPoint = getEndPoint();
156 try
157 {
158 while (true)
159 {
160 int filled = endPoint.fill(buffer);
161 if (filled == 0)
162 {
163 return true;
164 }
165 else if (filled < 0)
166 {
167 LOG.debug("read - EOF Reached");
168 return false;
169 }
170 else
171 {
172 if (LOG.isDebugEnabled())
173 {
174 LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
175 }
176 ClientUpgradeResponse resp = parser.parse(buffer);
177 if (resp != null)
178 {
179
180 validateResponse(resp);
181 notifyConnect(resp);
182 upgradeConnection(resp);
183 if (buffer.hasRemaining())
184 {
185 LOG.debug("Has remaining client bytebuffer of {}",buffer.remaining());
186 }
187 return false;
188 }
189 }
190 }
191 }
192 catch (IOException | ParseException e)
193 {
194 UpgradeException ue = new UpgradeException(request.getRequestURI(),e);
195 connectPromise.failed(ue);
196 disconnect(false);
197 return false;
198 }
199 catch (UpgradeException e)
200 {
201 connectPromise.failed(e);
202 disconnect(false);
203 return false;
204 }
205 }
206
207 private void upgradeConnection(ClientUpgradeResponse response)
208 {
209 EndPoint endp = getEndPoint();
210 Executor executor = getExecutor();
211 WebSocketClientConnection connection = new WebSocketClientConnection(endp,executor,connectPromise);
212
213
214 EventDriver websocket = connectPromise.getDriver();
215 WebSocketPolicy policy = connectPromise.getClient().getPolicy();
216
217 WebSocketSession session = new WebSocketSession(request.getRequestURI(),websocket,connection);
218 session.setPolicy(policy);
219 session.setUpgradeResponse(response);
220
221 connection.setSession(session);
222
223
224 ExtensionStack extensionStack = new ExtensionStack(connectPromise.getClient().getExtensionFactory());
225 extensionStack.negotiate(response.getExtensions());
226
227 extensionStack.configure(connection.getParser());
228 extensionStack.configure(connection.getGenerator());
229
230
231 connection.setNextIncomingFrames(extensionStack);
232 extensionStack.setNextIncoming(session);
233
234
235 session.setOutgoingHandler(extensionStack);
236 extensionStack.setNextOutgoing(connection);
237
238
239 endp.setConnection(connection);
240 connection.onOpen();
241 }
242
243 private void validateResponse(ClientUpgradeResponse response)
244 {
245
246 if (response.getStatusCode() != SWITCHING_PROTOCOLS)
247 {
248 throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Didn't switch protocols");
249 }
250
251
252 String connection = response.getHeader("Connection");
253 if (!"upgrade".equalsIgnoreCase(connection))
254 {
255 throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Connection is " + connection + " (expected upgrade)");
256 }
257
258
259 String reqKey = request.getKey();
260 String expectedHash = AcceptHash.hashKey(reqKey);
261 String respHash = response.getHeader("Sec-WebSocket-Accept");
262
263 response.setSuccess(true);
264 if (expectedHash.equalsIgnoreCase(respHash) == false)
265 {
266 response.setSuccess(false);
267 throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Invalid Sec-WebSocket-Accept hash");
268 }
269
270
271 List<ExtensionConfig> extensions = new ArrayList<>();
272 List<String> extValues = response.getHeaders("Sec-WebSocket-Extensions");
273 if (extValues != null)
274 {
275 for (String extVal : extValues)
276 {
277 QuotedStringTokenizer tok = new QuotedStringTokenizer(extVal,",");
278 while (tok.hasMoreTokens())
279 {
280 extensions.add(ExtensionConfig.parse(tok.nextToken()));
281 }
282 }
283 }
284 response.setExtensions(extensions);
285 }
286 }