View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.client.io;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.concurrent.Executor;
27  
28  import org.eclipse.jetty.io.AbstractConnection;
29  import org.eclipse.jetty.io.ByteBufferPool;
30  import org.eclipse.jetty.io.EndPoint;
31  import org.eclipse.jetty.util.BufferUtil;
32  import org.eclipse.jetty.util.FutureCallback;
33  import org.eclipse.jetty.util.QuotedStringTokenizer;
34  import org.eclipse.jetty.util.StringUtil;
35  import org.eclipse.jetty.util.log.Log;
36  import org.eclipse.jetty.util.log.Logger;
37  import org.eclipse.jetty.websocket.api.UpgradeException;
38  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
39  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
40  import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
41  import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
42  import org.eclipse.jetty.websocket.client.io.HttpResponseHeaderParser.ParseException;
43  import org.eclipse.jetty.websocket.common.AcceptHash;
44  import org.eclipse.jetty.websocket.common.WebSocketSession;
45  import org.eclipse.jetty.websocket.common.events.EventDriver;
46  import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
47  
48  /**
49   * This is the initial connection handling that exists immediately after physical connection is established to destination server.
50   * <p>
51   * Eventually, upon successful Upgrade request/response, this connection swaps itself out for the WebSocektClientConnection handler.
52   */
53  public class UpgradeConnection extends AbstractConnection
54  {
55      public class SendUpgradeRequest extends FutureCallback implements Runnable
56      {
57          @Override
58          public void run()
59          {
60              URI uri = connectPromise.getRequest().getRequestURI();
61              request.setRequestURI(uri);
62              String rawRequest = request.generate();
63  
64              ByteBuffer buf = BufferUtil.toBuffer(rawRequest,StringUtil.__UTF8_CHARSET);
65              getEndPoint().write(this,buf);
66          }
67  
68          @Override
69          public void succeeded()
70          {
71              // Writing the request header is complete.
72              super.succeeded();
73              // start the interest in fill
74              fillInterested();
75          }
76      }
77  
78      /** HTTP Response Code: 101 Switching Protocols */
79      private static final int SWITCHING_PROTOCOLS = 101;
80  
81      private static final Logger LOG = Log.getLogger(UpgradeConnection.class);
82      private final ByteBufferPool bufferPool;
83      private final ConnectPromise connectPromise;
84      private final HttpResponseHeaderParser parser;
85      private ClientUpgradeRequest request;
86  
87      public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
88      {
89          super(endp,executor);
90          this.connectPromise = connectPromise;
91          this.bufferPool = connectPromise.getClient().getBufferPool();
92          this.request = connectPromise.getRequest();
93  
94          // Setup the parser
95          this.parser = new HttpResponseHeaderParser();
96      }
97  
98      public void disconnect(boolean onlyOutput)
99      {
100         EndPoint endPoint = getEndPoint();
101         // We need to gently close first, to allow
102         // SSL close alerts to be sent by Jetty
103         LOG.debug("Shutting down output {}",endPoint);
104         endPoint.shutdownOutput();
105         if (!onlyOutput)
106         {
107             LOG.debug("Closing {}",endPoint);
108             endPoint.close();
109         }
110     }
111 
112     private void notifyConnect(ClientUpgradeResponse response)
113     {
114         connectPromise.setResponse(response);
115     }
116 
117     @Override
118     public void onFillable()
119     {
120         ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
121         BufferUtil.clear(buffer);
122         boolean readMore = false;
123         try
124         {
125             readMore = read(buffer);
126         }
127         finally
128         {
129             bufferPool.release(buffer);
130         }
131 
132         if (readMore)
133         {
134             fillInterested();
135         }
136     }
137 
138     @Override
139     public void onOpen()
140     {
141         super.onOpen();
142         // TODO: handle timeout
143         getExecutor().execute(new SendUpgradeRequest());
144     }
145 
146     /**
147      * Read / Parse the waiting read/fill buffer
148      * 
149      * @param buffer
150      *            the buffer to fill into from the endpoint
151      * @return true if there is more to read, false if reading should stop
152      */
153     private boolean read(ByteBuffer buffer)
154     {
155         EndPoint endPoint = getEndPoint();
156         try
157         {
158             while (true)
159             {
160                 int filled = endPoint.fill(buffer);
161                 if (filled == 0)
162                 {
163                     return true;
164                 }
165                 else if (filled < 0)
166                 {
167                     LOG.debug("read - EOF Reached");
168                     return false;
169                 }
170                 else
171                 {
172                     if (LOG.isDebugEnabled())
173                     {
174                         LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
175                     }
176                     ClientUpgradeResponse resp = parser.parse(buffer);
177                     if (resp != null)
178                     {
179                         // Got a response!
180                         validateResponse(resp);
181                         notifyConnect(resp);
182                         upgradeConnection(resp);
183                         if (buffer.hasRemaining())
184                         {
185                             LOG.debug("Has remaining client bytebuffer of {}",buffer.remaining());
186                         }
187                         return false; // do no more reading
188                     }
189                 }
190             }
191         }
192         catch (IOException | ParseException e)
193         {
194             UpgradeException ue = new UpgradeException(request.getRequestURI(),e);
195             connectPromise.failed(ue);
196             disconnect(false);
197             return false;
198         }
199         catch (UpgradeException e)
200         {
201             connectPromise.failed(e);
202             disconnect(false);
203             return false;
204         }
205     }
206 
207     private void upgradeConnection(ClientUpgradeResponse response)
208     {
209         EndPoint endp = getEndPoint();
210         Executor executor = getExecutor();
211         WebSocketClientConnection connection = new WebSocketClientConnection(endp,executor,connectPromise);
212 
213         // Initialize / Negotiate Extensions
214         EventDriver websocket = connectPromise.getDriver();
215         WebSocketPolicy policy = connectPromise.getClient().getPolicy();
216 
217         WebSocketSession session = new WebSocketSession(request.getRequestURI(),websocket,connection);
218         session.setPolicy(policy);
219         session.setUpgradeResponse(response);
220 
221         connection.setSession(session);
222 
223         // Initialize / Negotiate Extensions
224         ExtensionStack extensionStack = new ExtensionStack(connectPromise.getClient().getExtensionFactory());
225         extensionStack.negotiate(response.getExtensions());
226 
227         extensionStack.configure(connection.getParser());
228         extensionStack.configure(connection.getGenerator());
229 
230         // Setup Incoming Routing
231         connection.setNextIncomingFrames(extensionStack);
232         extensionStack.setNextIncoming(session);
233 
234         // Setup Outgoing Routing
235         session.setOutgoingHandler(extensionStack);
236         extensionStack.setNextOutgoing(connection);
237 
238         // Now swap out the connection
239         endp.setConnection(connection);
240         connection.onOpen();
241     }
242 
243     private void validateResponse(ClientUpgradeResponse response)
244     {
245         // Validate Response Status Code
246         if (response.getStatusCode() != SWITCHING_PROTOCOLS)
247         {
248             throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Didn't switch protocols");
249         }
250 
251         // Validate Connection header
252         String connection = response.getHeader("Connection");
253         if (!"upgrade".equalsIgnoreCase(connection))
254         {
255             throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Connection is " + connection + " (expected upgrade)");
256         }
257 
258         // Check the Accept hash
259         String reqKey = request.getKey();
260         String expectedHash = AcceptHash.hashKey(reqKey);
261         String respHash = response.getHeader("Sec-WebSocket-Accept");
262 
263         response.setSuccess(true);
264         if (expectedHash.equalsIgnoreCase(respHash) == false)
265         {
266             response.setSuccess(false);
267             throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Invalid Sec-WebSocket-Accept hash");
268         }
269 
270         // Parse extensions
271         List<ExtensionConfig> extensions = new ArrayList<>();
272         List<String> extValues = response.getHeaders("Sec-WebSocket-Extensions");
273         if (extValues != null)
274         {
275             for (String extVal : extValues)
276             {
277                 QuotedStringTokenizer tok = new QuotedStringTokenizer(extVal,",");
278                 while (tok.hasMoreTokens())
279                 {
280                     extensions.add(ExtensionConfig.parse(tok.nextToken()));
281                 }
282             }
283         }
284         response.setExtensions(extensions);
285     }
286 }