View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.client.io;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.nio.ByteBuffer;
24  import java.nio.charset.StandardCharsets;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.concurrent.Executor;
28  
29  import org.eclipse.jetty.io.AbstractConnection;
30  import org.eclipse.jetty.io.ByteBufferPool;
31  import org.eclipse.jetty.io.EndPoint;
32  import org.eclipse.jetty.util.BufferUtil;
33  import org.eclipse.jetty.util.FutureCallback;
34  import org.eclipse.jetty.util.QuotedStringTokenizer;
35  import org.eclipse.jetty.util.log.Log;
36  import org.eclipse.jetty.util.log.Logger;
37  import org.eclipse.jetty.websocket.api.UpgradeException;
38  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
39  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
40  import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
41  import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
42  import org.eclipse.jetty.websocket.common.AcceptHash;
43  import org.eclipse.jetty.websocket.common.SessionFactory;
44  import org.eclipse.jetty.websocket.common.WebSocketSession;
45  import org.eclipse.jetty.websocket.common.events.EventDriver;
46  import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
47  import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
48  import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser.ParseException;
49  
50  /**
51   * This is the initial connection handling that exists immediately after physical connection is established to destination server.
52   * <p>
53   * Eventually, upon successful Upgrade request/response, this connection swaps itself out for the WebSocektClientConnection handler.
54   */
55  public class UpgradeConnection extends AbstractConnection
56  {
57      public class SendUpgradeRequest extends FutureCallback implements Runnable
58      {
59          @Override
60          public void run()
61          {
62              URI uri = connectPromise.getRequest().getRequestURI();
63              request.setRequestURI(uri);
64  
65              UpgradeListener handshakeListener = connectPromise.getUpgradeListener();
66              if (handshakeListener != null)
67              {
68                  handshakeListener.onHandshakeRequest(request);
69              }
70  
71              String rawRequest = request.generate();
72  
73              ByteBuffer buf = BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8);
74              getEndPoint().write(this,buf);
75          }
76  
77          @Override
78          public void succeeded()
79          {
80              // Writing the request header is complete.
81              super.succeeded();
82              // start the interest in fill
83              fillInterested();
84          }
85  
86          @Override
87          public void failed(Throwable cause)
88          {
89              super.failed(cause);
90              // Fail the connect promise when a fundamental exception during connect occurs.
91              connectPromise.failed(cause);
92          }
93      }
94  
95      /** HTTP Response Code: 101 Switching Protocols */
96      private static final int SWITCHING_PROTOCOLS = 101;
97  
98      private static final Logger LOG = Log.getLogger(UpgradeConnection.class);
99      private final ByteBufferPool bufferPool;
100     private final ConnectPromise connectPromise;
101     private final HttpResponseHeaderParser parser;
102     private ClientUpgradeRequest request;
103 
104     public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
105     {
106         super(endp,executor);
107         this.connectPromise = connectPromise;
108         this.bufferPool = connectPromise.getClient().getBufferPool();
109         this.request = connectPromise.getRequest();
110 
111         // Setup the parser
112         this.parser = new HttpResponseHeaderParser(new ClientUpgradeResponse());
113     }
114 
115     public void disconnect(boolean onlyOutput)
116     {
117         EndPoint endPoint = getEndPoint();
118         // We need to gently close first, to allow
119         // SSL close alerts to be sent by Jetty
120         LOG.debug("Shutting down output {}",endPoint);
121         endPoint.shutdownOutput();
122         if (!onlyOutput)
123         {
124             LOG.debug("Closing {}",endPoint);
125             endPoint.close();
126         }
127     }
128 
129     private void notifyConnect(ClientUpgradeResponse response)
130     {
131         connectPromise.setResponse(response);
132 
133         UpgradeListener handshakeListener = connectPromise.getUpgradeListener();
134         if (handshakeListener != null)
135         {
136             handshakeListener.onHandshakeResponse(response);
137         }
138     }
139 
140     @Override
141     public void onFillable()
142     {
143         ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
144         BufferUtil.clear(buffer);
145         boolean readMore = false;
146         try
147         {
148             readMore = read(buffer);
149         }
150         finally
151         {
152             bufferPool.release(buffer);
153         }
154 
155         if (readMore)
156         {
157             fillInterested();
158         }
159     }
160 
161     @Override
162     public void onOpen()
163     {
164         super.onOpen();
165         // TODO: handle timeout?
166         getExecutor().execute(new SendUpgradeRequest());
167     }
168 
169     /**
170      * Read / Parse the waiting read/fill buffer
171      * 
172      * @param buffer
173      *            the buffer to fill into from the endpoint
174      * @return true if there is more to read, false if reading should stop
175      */
176     private boolean read(ByteBuffer buffer)
177     {
178         EndPoint endPoint = getEndPoint();
179         try
180         {
181             while (true)
182             {
183                 int filled = endPoint.fill(buffer);
184                 if (filled == 0)
185                 {
186                     return true;
187                 }
188                 else if (filled < 0)
189                 {
190                     LOG.debug("read - EOF Reached");
191                     return false;
192                 }
193                 else
194                 {
195                     if (LOG.isDebugEnabled())
196                     {
197                         LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
198                     }
199                     ClientUpgradeResponse resp = (ClientUpgradeResponse)parser.parse(buffer);
200                     if (resp != null)
201                     {
202                         // Got a response!
203                         validateResponse(resp);
204                         notifyConnect(resp);
205                         upgradeConnection(resp);
206                         if (buffer.hasRemaining())
207                         {
208                             LOG.debug("Has remaining client bytebuffer of {}",buffer.remaining());
209                         }
210                         return false; // do no more reading
211                     }
212                 }
213             }
214         }
215         catch (IOException | ParseException e)
216         {
217             UpgradeException ue = new UpgradeException(request.getRequestURI(),e);
218             connectPromise.failed(ue);
219             disconnect(false);
220             return false;
221         }
222         catch (UpgradeException e)
223         {
224             connectPromise.failed(e);
225             disconnect(false);
226             return false;
227         }
228     }
229 
230     private void upgradeConnection(ClientUpgradeResponse response)
231     {
232         EndPoint endp = getEndPoint();
233         Executor executor = getExecutor();
234         
235         EventDriver websocket = connectPromise.getDriver();
236         WebSocketPolicy policy = websocket.getPolicy();
237 
238         WebSocketClientConnection connection = new WebSocketClientConnection(endp,executor,connectPromise,policy);
239 
240         SessionFactory sessionFactory = connectPromise.getClient().getSessionFactory();
241         WebSocketSession session = sessionFactory.createSession(request.getRequestURI(),websocket,connection);
242         session.setPolicy(policy);
243         session.setUpgradeResponse(response);
244 
245         connection.setSession(session);
246 
247         // Initialize / Negotiate Extensions
248         ExtensionStack extensionStack = new ExtensionStack(connectPromise.getClient().getExtensionFactory());
249         extensionStack.negotiate(response.getExtensions());
250 
251         extensionStack.configure(connection.getParser());
252         extensionStack.configure(connection.getGenerator());
253 
254         // Setup Incoming Routing
255         connection.setNextIncomingFrames(extensionStack);
256         extensionStack.setNextIncoming(session);
257 
258         // Setup Outgoing Routing
259         session.setOutgoingHandler(extensionStack);
260         extensionStack.setNextOutgoing(connection);
261 
262         // Now swap out the connection
263         endp.setConnection(connection);
264         connection.onOpen();
265     }
266 
267     private void validateResponse(ClientUpgradeResponse response)
268     {
269         // Validate Response Status Code
270         if (response.getStatusCode() != SWITCHING_PROTOCOLS)
271         {
272             throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Didn't switch protocols");
273         }
274 
275         // Validate Connection header
276         String connection = response.getHeader("Connection");
277         if (!"upgrade".equalsIgnoreCase(connection))
278         {
279             throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Connection is " + connection + " (expected upgrade)");
280         }
281 
282         // Check the Accept hash
283         String reqKey = request.getKey();
284         String expectedHash = AcceptHash.hashKey(reqKey);
285         String respHash = response.getHeader("Sec-WebSocket-Accept");
286 
287         response.setSuccess(true);
288         if (expectedHash.equalsIgnoreCase(respHash) == false)
289         {
290             response.setSuccess(false);
291             throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Invalid Sec-WebSocket-Accept hash");
292         }
293 
294         // Parse extensions
295         List<ExtensionConfig> extensions = new ArrayList<>();
296         List<String> extValues = response.getHeaders("Sec-WebSocket-Extensions");
297         if (extValues != null)
298         {
299             for (String extVal : extValues)
300             {
301                 QuotedStringTokenizer tok = new QuotedStringTokenizer(extVal,",");
302                 while (tok.hasMoreTokens())
303                 {
304                     extensions.add(ExtensionConfig.parse(tok.nextToken()));
305                 }
306             }
307         }
308         response.setExtensions(extensions);
309     }
310 }