View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.client.io;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.nio.ByteBuffer;
24  import java.nio.charset.StandardCharsets;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.concurrent.Executor;
28  
29  import org.eclipse.jetty.io.AbstractConnection;
30  import org.eclipse.jetty.io.ByteBufferPool;
31  import org.eclipse.jetty.io.EndPoint;
32  import org.eclipse.jetty.util.BufferUtil;
33  import org.eclipse.jetty.util.FutureCallback;
34  import org.eclipse.jetty.util.QuotedStringTokenizer;
35  import org.eclipse.jetty.util.log.Log;
36  import org.eclipse.jetty.util.log.Logger;
37  import org.eclipse.jetty.websocket.api.UpgradeException;
38  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
39  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
40  import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
41  import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
42  import org.eclipse.jetty.websocket.common.AcceptHash;
43  import org.eclipse.jetty.websocket.common.SessionFactory;
44  import org.eclipse.jetty.websocket.common.WebSocketSession;
45  import org.eclipse.jetty.websocket.common.events.EventDriver;
46  import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
47  import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
48  import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser.ParseException;
49  
50  /**
51   * This is the initial connection handling that exists immediately after physical connection is established to destination server.
52   * <p>
53   * Eventually, upon successful Upgrade request/response, this connection swaps itself out for the WebSocektClientConnection handler.
54   */
55  public class UpgradeConnection extends AbstractConnection
56  {
57      public class SendUpgradeRequest extends FutureCallback implements Runnable
58      {
59          @Override
60          public void run()
61          {
62              URI uri = connectPromise.getRequest().getRequestURI();
63              request.setRequestURI(uri);
64  
65              UpgradeListener handshakeListener = connectPromise.getUpgradeListener();
66              if (handshakeListener != null)
67              {
68                  handshakeListener.onHandshakeRequest(request);
69              }
70  
71              String rawRequest = request.generate();
72  
73              ByteBuffer buf = BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8);
74              getEndPoint().write(this,buf);
75          }
76  
77          @Override
78          public void succeeded()
79          {
80              // Writing the request header is complete.
81              super.succeeded();
82              // start the interest in fill
83              fillInterested();
84          }
85  
86          @Override
87          public void failed(Throwable cause)
88          {
89              super.failed(cause);
90              // Fail the connect promise when a fundamental exception during connect occurs.
91              connectPromise.failed(cause);
92          }
93      }
94  
95      /** HTTP Response Code: 101 Switching Protocols */
96      private static final int SWITCHING_PROTOCOLS = 101;
97  
98      private static final Logger LOG = Log.getLogger(UpgradeConnection.class);
99      private final ByteBufferPool bufferPool;
100     private final ConnectPromise connectPromise;
101     private final HttpResponseHeaderParser parser;
102     private ClientUpgradeRequest request;
103 
104     public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
105     {
106         super(endp,executor);
107         this.connectPromise = connectPromise;
108         this.bufferPool = connectPromise.getClient().getBufferPool();
109         this.request = connectPromise.getRequest();
110 
111         // Setup the parser
112         this.parser = new HttpResponseHeaderParser(new ClientUpgradeResponse());
113     }
114 
115     public void disconnect(boolean onlyOutput)
116     {
117         EndPoint endPoint = getEndPoint();
118         // We need to gently close first, to allow
119         // SSL close alerts to be sent by Jetty
120         if (LOG.isDebugEnabled())
121             LOG.debug("Shutting down output {}",endPoint);
122         endPoint.shutdownOutput();
123         if (!onlyOutput)
124         {
125             if (LOG.isDebugEnabled())
126                 LOG.debug("Closing {}",endPoint);
127             endPoint.close();
128         }
129     }
130 
131     private void notifyConnect(ClientUpgradeResponse response)
132     {
133         connectPromise.setResponse(response);
134 
135         UpgradeListener handshakeListener = connectPromise.getUpgradeListener();
136         if (handshakeListener != null)
137         {
138             handshakeListener.onHandshakeResponse(response);
139         }
140     }
141 
142     @Override
143     public void onFillable()
144     {
145         ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
146         BufferUtil.clear(buffer);
147         boolean readMore = false;
148         try
149         {
150             readMore = read(buffer);
151         }
152         finally
153         {
154             bufferPool.release(buffer);
155         }
156 
157         if (readMore)
158         {
159             fillInterested();
160         }
161     }
162 
163     @Override
164     public void onOpen()
165     {
166         super.onOpen();
167         // TODO: handle timeout?
168         getExecutor().execute(new SendUpgradeRequest());
169     }
170 
171     /**
172      * Read / Parse the waiting read/fill buffer
173      * 
174      * @param buffer
175      *            the buffer to fill into from the endpoint
176      * @return true if there is more to read, false if reading should stop
177      */
178     private boolean read(ByteBuffer buffer)
179     {
180         EndPoint endPoint = getEndPoint();
181         try
182         {
183             while (true)
184             {
185                 int filled = endPoint.fill(buffer);
186                 if (filled == 0)
187                 {
188                     return true;
189                 }
190                 else if (filled < 0)
191                 {
192                     LOG.debug("read - EOF Reached");
193                     return false;
194                 }
195                 else
196                 {
197                     if (LOG.isDebugEnabled())
198                     {
199                         LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
200                     }
201                     ClientUpgradeResponse resp = (ClientUpgradeResponse)parser.parse(buffer);
202                     if (resp != null)
203                     {
204                         // Got a response!
205                         validateResponse(resp);
206                         notifyConnect(resp);
207                         upgradeConnection(resp);
208                         if (buffer.hasRemaining())
209                         {
210                             LOG.debug("Has remaining client bytebuffer of {}",buffer.remaining());
211                         }
212                         return false; // do no more reading
213                     }
214                 }
215             }
216         }
217         catch (IOException | ParseException e)
218         {
219             UpgradeException ue = new UpgradeException(request.getRequestURI(),e);
220             connectPromise.failed(ue);
221             disconnect(false);
222             return false;
223         }
224         catch (UpgradeException e)
225         {
226             connectPromise.failed(e);
227             disconnect(false);
228             return false;
229         }
230     }
231 
232     private void upgradeConnection(ClientUpgradeResponse response)
233     {
234         EndPoint endp = getEndPoint();
235         Executor executor = getExecutor();
236         
237         EventDriver websocket = connectPromise.getDriver();
238         WebSocketPolicy policy = websocket.getPolicy();
239 
240         WebSocketClientConnection connection = new WebSocketClientConnection(endp,executor,connectPromise,policy);
241 
242         SessionFactory sessionFactory = connectPromise.getClient().getSessionFactory();
243         WebSocketSession session = sessionFactory.createSession(request.getRequestURI(),websocket,connection);
244         session.setPolicy(policy);
245         session.setUpgradeResponse(response);
246 
247         connection.setSession(session);
248 
249         // Initialize / Negotiate Extensions
250         ExtensionStack extensionStack = new ExtensionStack(connectPromise.getClient().getExtensionFactory());
251         extensionStack.negotiate(response.getExtensions());
252 
253         extensionStack.configure(connection.getParser());
254         extensionStack.configure(connection.getGenerator());
255 
256         // Setup Incoming Routing
257         connection.setNextIncomingFrames(extensionStack);
258         extensionStack.setNextIncoming(session);
259 
260         // Setup Outgoing Routing
261         session.setOutgoingHandler(extensionStack);
262         extensionStack.setNextOutgoing(connection);
263 
264         session.addBean(extensionStack);
265         connectPromise.getClient().addManaged(session);
266 
267         // Now swap out the connection
268         endp.setConnection(connection);
269         connection.onOpen();
270     }
271 
272     private void validateResponse(ClientUpgradeResponse response)
273     {
274         // Validate Response Status Code
275         if (response.getStatusCode() != SWITCHING_PROTOCOLS)
276         {
277             throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Didn't switch protocols");
278         }
279 
280         // Validate Connection header
281         String connection = response.getHeader("Connection");
282         if (!"upgrade".equalsIgnoreCase(connection))
283         {
284             throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Connection is " + connection + " (expected upgrade)");
285         }
286 
287         // Check the Accept hash
288         String reqKey = request.getKey();
289         String expectedHash = AcceptHash.hashKey(reqKey);
290         String respHash = response.getHeader("Sec-WebSocket-Accept");
291 
292         response.setSuccess(true);
293         if (expectedHash.equalsIgnoreCase(respHash) == false)
294         {
295             response.setSuccess(false);
296             throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Invalid Sec-WebSocket-Accept hash");
297         }
298 
299         // Parse extensions
300         List<ExtensionConfig> extensions = new ArrayList<>();
301         List<String> extValues = response.getHeaders("Sec-WebSocket-Extensions");
302         if (extValues != null)
303         {
304             for (String extVal : extValues)
305             {
306                 QuotedStringTokenizer tok = new QuotedStringTokenizer(extVal,",");
307                 while (tok.hasMoreTokens())
308                 {
309                     extensions.add(ExtensionConfig.parse(tok.nextToken()));
310                 }
311             }
312         }
313         response.setExtensions(extensions);
314     }
315 }