View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.client.io;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.net.URI;
24  import java.nio.ByteBuffer;
25  import java.nio.charset.StandardCharsets;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.concurrent.Executor;
29  
30  import org.eclipse.jetty.io.AbstractConnection;
31  import org.eclipse.jetty.io.ByteBufferPool;
32  import org.eclipse.jetty.io.Connection;
33  import org.eclipse.jetty.io.EndPoint;
34  import org.eclipse.jetty.util.BufferUtil;
35  import org.eclipse.jetty.util.FutureCallback;
36  import org.eclipse.jetty.util.QuotedStringTokenizer;
37  import org.eclipse.jetty.util.log.Log;
38  import org.eclipse.jetty.util.log.Logger;
39  import org.eclipse.jetty.websocket.api.UpgradeException;
40  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
41  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
42  import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
43  import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
44  import org.eclipse.jetty.websocket.common.AcceptHash;
45  import org.eclipse.jetty.websocket.common.SessionFactory;
46  import org.eclipse.jetty.websocket.common.WebSocketSession;
47  import org.eclipse.jetty.websocket.common.events.EventDriver;
48  import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
49  import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
50  import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser.ParseException;
51  
52  /**
53   * This is the initial connection handling that exists immediately after physical connection is established to
54   * destination server.
55   * <p>
56   * Eventually, upon successful Upgrade request/response, this connection swaps itself out for the
57   * WebSocektClientConnection handler.
58   */
59  public class UpgradeConnection extends AbstractConnection implements Connection.UpgradeFrom
60  {
61      public class SendUpgradeRequest extends FutureCallback implements Runnable
62      {
63          private final Logger LOG = Log.getLogger(UpgradeConnection.SendUpgradeRequest.class);
64          
65          @Override
66          public void run()
67          {
68              URI uri = connectPromise.getRequest().getRequestURI();
69              request.setRequestURI(uri);
70  
71              UpgradeListener handshakeListener = connectPromise.getUpgradeListener();
72              if (handshakeListener != null)
73              {
74                  handshakeListener.onHandshakeRequest(request);
75              }
76  
77              String rawRequest = request.generate();
78  
79              ByteBuffer buf = BufferUtil.toBuffer(rawRequest,StandardCharsets.UTF_8);
80              getEndPoint().write(this,buf);
81          }
82  
83          @Override
84          public void succeeded()
85          {
86              if (LOG.isDebugEnabled())
87              {
88                  LOG.debug("Upgrade Request Write Success");
89              }
90              // Writing the request header is complete.
91              super.succeeded();
92              state = State.RESPONSE;
93              // start the interest in fill
94              fillInterested();
95          }
96  
97          @Override
98          public void failed(Throwable cause)
99          {
100             if (LOG.isDebugEnabled())
101             {
102                 LOG.debug("Upgrade Request Write Failure",cause);
103             }
104             super.failed(cause);
105             state = State.FAILURE;
106             // Fail the connect promise when a fundamental exception during connect occurs.
107             connectPromise.failed(cause);
108         }
109     }
110 
111     /** HTTP Response Code: 101 Switching Protocols */
112     private static final int SWITCHING_PROTOCOLS = 101;
113 
114     private enum State
115     {
116         REQUEST,
117         RESPONSE,
118         FAILURE,
119         UPGRADE
120     }
121 
122     private static final Logger LOG = Log.getLogger(UpgradeConnection.class);
123     private final ByteBufferPool bufferPool;
124     private final ConnectPromise connectPromise;
125     private final HttpResponseHeaderParser parser;
126     private State state = State.REQUEST;
127     private ClientUpgradeRequest request;
128     private ClientUpgradeResponse response;
129 
130     public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
131     {
132         super(endp,executor);
133         this.connectPromise = connectPromise;
134         this.bufferPool = connectPromise.getClient().getBufferPool();
135         this.request = connectPromise.getRequest();
136 
137         // Setup the parser
138         this.parser = new HttpResponseHeaderParser(new ClientUpgradeResponse());
139     }
140 
141     public void disconnect(boolean onlyOutput)
142     {
143         EndPoint endPoint = getEndPoint();
144         // We need to gently close first, to allow
145         // SSL close alerts to be sent by Jetty
146         if (LOG.isDebugEnabled())
147         {
148             LOG.debug("Shutting down output {}",endPoint);
149         }
150         
151         endPoint.shutdownOutput();
152         if (!onlyOutput)
153         {
154             if (LOG.isDebugEnabled())
155             {
156                 LOG.debug("Closing {}",endPoint);
157             }
158             endPoint.close();
159         }
160     }
161 
162     private void failUpgrade(Throwable cause)
163     {
164         close();
165         connectPromise.failed(cause);
166     }
167 
168     private void notifyConnect(ClientUpgradeResponse response)
169     {
170         connectPromise.setResponse(response);
171 
172         UpgradeListener handshakeListener = connectPromise.getUpgradeListener();
173         if (handshakeListener != null)
174         {
175             handshakeListener.onHandshakeResponse(response);
176         }
177     }
178     
179     @Override
180     public ByteBuffer onUpgradeFrom()
181     {
182         return connectPromise.getResponse().getRemainingBuffer();
183     }
184 
185     @Override
186     public void onFillable()
187     {
188         if (LOG.isDebugEnabled())
189         {
190             LOG.debug("onFillable");
191         }
192         ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
193         BufferUtil.clear(buffer);
194         try
195         {
196             read(buffer);
197         }
198         finally
199         {
200             bufferPool.release(buffer);
201         }
202 
203         if (state == State.RESPONSE)
204         {
205             // Continue Reading
206             fillInterested();
207         }
208         else if (state == State.UPGRADE)
209         {
210             // Stop Reading, upgrade the connection now
211             upgradeConnection(response);
212         }
213     }
214 
215     @Override
216     public void onOpen()
217     {
218         super.onOpen();
219         getExecutor().execute(new SendUpgradeRequest());
220     }
221 
222     @Override
223     public void onClose()
224     {
225         if (LOG.isDebugEnabled())
226         {
227             LOG.debug("Closed connection {}",this);
228         }
229         super.onClose();
230     }
231 
232     @Override
233     protected boolean onReadTimeout()
234     {
235         if (LOG.isDebugEnabled())
236         {
237             LOG.debug("Timeout on connection {}",this);
238         }
239 
240         failUpgrade(new IOException("Timeout while performing WebSocket Upgrade"));
241 
242         return super.onReadTimeout();
243     }
244 
245     /**
246      * Read / Parse the waiting read/fill buffer
247      * 
248      * @param buffer
249      *            the buffer to fill into from the endpoint
250      */
251     private void read(ByteBuffer buffer)
252     {
253         EndPoint endPoint = getEndPoint();
254         try
255         {
256             while (true)
257             {
258                 int filled = endPoint.fill(buffer);
259                 if (filled == 0)
260                 {
261                     return;
262                 }
263                 else if (filled < 0)
264                 {
265                     LOG.warn("read - EOF Reached");
266                     state = State.FAILURE;
267                     failUpgrade(new EOFException("Reading WebSocket Upgrade response"));
268                     return;
269                 }
270                 else
271                 {
272                     if (LOG.isDebugEnabled())
273                     {
274                         LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
275                     }
276                     response = (ClientUpgradeResponse)parser.parse(buffer);
277                     if (response != null)
278                     {
279                         // Got a response!
280                         validateResponse(response);
281                         notifyConnect(response);
282                         state = State.UPGRADE;
283                         return; // do no more reading
284                     }
285                 }
286             }
287         }
288         catch (IOException | ParseException e)
289         {
290             LOG.ignore(e);
291             state = State.FAILURE;
292             UpgradeException ue = new UpgradeException(request.getRequestURI(),e);
293             connectPromise.failed(ue);
294             disconnect(false);
295         }
296         catch (UpgradeException e)
297         {
298             LOG.ignore(e);
299             state = State.FAILURE;
300             connectPromise.failed(e);
301             disconnect(false);
302         }
303     }
304 
305     private void upgradeConnection(ClientUpgradeResponse response)
306     {
307         EndPoint endp = getEndPoint();
308         Executor executor = getExecutor();
309 
310         EventDriver websocket = connectPromise.getDriver();
311         WebSocketPolicy policy = websocket.getPolicy();
312 
313         WebSocketClientConnection connection = new WebSocketClientConnection(endp,executor,connectPromise,policy);
314 
315         SessionFactory sessionFactory = connectPromise.getClient().getSessionFactory();
316         WebSocketSession session = sessionFactory.createSession(request.getRequestURI(),websocket,connection);
317         session.setPolicy(policy);
318         session.setUpgradeRequest(request);
319         session.setUpgradeResponse(response);
320         connection.addListener(session);
321         connectPromise.setSession(session);
322 
323         // Initialize / Negotiate Extensions
324         ExtensionStack extensionStack = new ExtensionStack(connectPromise.getClient().getExtensionFactory());
325         extensionStack.negotiate(response.getExtensions());
326 
327         extensionStack.configure(connection.getParser());
328         extensionStack.configure(connection.getGenerator());
329 
330         // Setup Incoming Routing
331         connection.setNextIncomingFrames(extensionStack);
332         extensionStack.setNextIncoming(session);
333 
334         // Setup Outgoing Routing
335         session.setOutgoingHandler(extensionStack);
336         extensionStack.setNextOutgoing(connection);
337 
338         session.addManaged(extensionStack);
339         connectPromise.getClient().addManaged(session);
340 
341         // Now swap out the connection
342         endp.upgrade(connection);
343     }
344 
345     private void validateResponse(ClientUpgradeResponse response)
346     {
347         // Validate Response Status Code
348         if (response.getStatusCode() != SWITCHING_PROTOCOLS)
349         {
350             // TODO: use jetty-http and org.eclipse.jetty.http.HttpStatus for more meaningful exception messages 
351             throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Didn't switch protocols, expected status <" + SWITCHING_PROTOCOLS
352                     + ">, but got <" + response.getStatusCode() + ">");
353         }
354 
355         // Validate Connection header
356         String connection = response.getHeader("Connection");
357         if (!"upgrade".equalsIgnoreCase(connection))
358         {
359             throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Connection is " + connection + " (expected upgrade)");
360         }
361 
362         // Check the Accept hash
363         String reqKey = request.getKey();
364         String expectedHash = AcceptHash.hashKey(reqKey);
365         String respHash = response.getHeader("Sec-WebSocket-Accept");
366 
367         response.setSuccess(true);
368         if (expectedHash.equalsIgnoreCase(respHash) == false)
369         {
370             response.setSuccess(false);
371             throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Invalid Sec-WebSocket-Accept hash");
372         }
373 
374         // Parse extensions
375         List<ExtensionConfig> extensions = new ArrayList<>();
376         List<String> extValues = response.getHeaders("Sec-WebSocket-Extensions");
377         if (extValues != null)
378         {
379             for (String extVal : extValues)
380             {
381                 QuotedStringTokenizer tok = new QuotedStringTokenizer(extVal,",");
382                 while (tok.hasMoreTokens())
383                 {
384                     extensions.add(ExtensionConfig.parse(tok.nextToken()));
385                 }
386             }
387         }
388         response.setExtensions(extensions);
389     }
390 }