View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.client.io;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.nio.ByteBuffer;
24  import java.nio.charset.StandardCharsets;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.concurrent.Executor;
28  
29  import org.eclipse.jetty.io.AbstractConnection;
30  import org.eclipse.jetty.io.ByteBufferPool;
31  import org.eclipse.jetty.io.EndPoint;
32  import org.eclipse.jetty.util.BufferUtil;
33  import org.eclipse.jetty.util.FutureCallback;
34  import org.eclipse.jetty.util.QuotedStringTokenizer;
35  import org.eclipse.jetty.util.StringUtil;
36  import org.eclipse.jetty.util.log.Log;
37  import org.eclipse.jetty.util.log.Logger;
38  import org.eclipse.jetty.websocket.api.UpgradeException;
39  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
40  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
41  import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
42  import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
43  import org.eclipse.jetty.websocket.common.AcceptHash;
44  import org.eclipse.jetty.websocket.common.WebSocketSession;
45  import org.eclipse.jetty.websocket.common.events.EventDriver;
46  import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
47  import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
48  import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser.ParseException;
49  
50  /**
51   * This is the initial connection handling that exists immediately after physical connection is established to destination server.
52   * <p>
53   * Eventually, upon successful Upgrade request/response, this connection swaps itself out for the WebSocektClientConnection handler.
54   */
55  public class UpgradeConnection extends AbstractConnection
56  {
57      public class SendUpgradeRequest extends FutureCallback implements Runnable
58      {
59          @Override
60          public void run()
61          {
62              URI uri = connectPromise.getRequest().getRequestURI();
63              request.setRequestURI(uri);
64              String rawRequest = request.generate();
65  
66              ByteBuffer buf = BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8);
67              getEndPoint().write(this,buf);
68          }
69  
70          @Override
71          public void succeeded()
72          {
73              // Writing the request header is complete.
74              super.succeeded();
75              // start the interest in fill
76              fillInterested();
77          }
78      }
79  
80      /** HTTP Response Code: 101 Switching Protocols */
81      private static final int SWITCHING_PROTOCOLS = 101;
82  
83      private static final Logger LOG = Log.getLogger(UpgradeConnection.class);
84      private final ByteBufferPool bufferPool;
85      private final ConnectPromise connectPromise;
86      private final HttpResponseHeaderParser parser;
87      private ClientUpgradeRequest request;
88  
89      public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
90      {
91          super(endp,executor);
92          this.connectPromise = connectPromise;
93          this.bufferPool = connectPromise.getClient().getBufferPool();
94          this.request = connectPromise.getRequest();
95  
96          // Setup the parser
97          this.parser = new HttpResponseHeaderParser(new ClientUpgradeResponse());
98      }
99  
100     public void disconnect(boolean onlyOutput)
101     {
102         EndPoint endPoint = getEndPoint();
103         // We need to gently close first, to allow
104         // SSL close alerts to be sent by Jetty
105         LOG.debug("Shutting down output {}",endPoint);
106         endPoint.shutdownOutput();
107         if (!onlyOutput)
108         {
109             LOG.debug("Closing {}",endPoint);
110             endPoint.close();
111         }
112     }
113 
114     private void notifyConnect(ClientUpgradeResponse response)
115     {
116         connectPromise.setResponse(response);
117     }
118 
119     @Override
120     public void onFillable()
121     {
122         ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
123         BufferUtil.clear(buffer);
124         boolean readMore = false;
125         try
126         {
127             readMore = read(buffer);
128         }
129         finally
130         {
131             bufferPool.release(buffer);
132         }
133 
134         if (readMore)
135         {
136             fillInterested();
137         }
138     }
139 
140     @Override
141     public void onOpen()
142     {
143         super.onOpen();
144         // TODO: handle timeout?
145         getExecutor().execute(new SendUpgradeRequest());
146     }
147 
148     /**
149      * Read / Parse the waiting read/fill buffer
150      * 
151      * @param buffer
152      *            the buffer to fill into from the endpoint
153      * @return true if there is more to read, false if reading should stop
154      */
155     private boolean read(ByteBuffer buffer)
156     {
157         EndPoint endPoint = getEndPoint();
158         try
159         {
160             while (true)
161             {
162                 int filled = endPoint.fill(buffer);
163                 if (filled == 0)
164                 {
165                     return true;
166                 }
167                 else if (filled < 0)
168                 {
169                     LOG.debug("read - EOF Reached");
170                     return false;
171                 }
172                 else
173                 {
174                     if (LOG.isDebugEnabled())
175                     {
176                         LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
177                     }
178                     ClientUpgradeResponse resp = (ClientUpgradeResponse)parser.parse(buffer);
179                     if (resp != null)
180                     {
181                         // Got a response!
182                         validateResponse(resp);
183                         notifyConnect(resp);
184                         upgradeConnection(resp);
185                         if (buffer.hasRemaining())
186                         {
187                             LOG.debug("Has remaining client bytebuffer of {}",buffer.remaining());
188                         }
189                         return false; // do no more reading
190                     }
191                 }
192             }
193         }
194         catch (IOException | ParseException e)
195         {
196             UpgradeException ue = new UpgradeException(request.getRequestURI(),e);
197             connectPromise.failed(ue);
198             disconnect(false);
199             return false;
200         }
201         catch (UpgradeException e)
202         {
203             connectPromise.failed(e);
204             disconnect(false);
205             return false;
206         }
207     }
208 
209     private void upgradeConnection(ClientUpgradeResponse response)
210     {
211         EndPoint endp = getEndPoint();
212         Executor executor = getExecutor();
213         
214         EventDriver websocket = connectPromise.getDriver();
215         WebSocketPolicy policy = websocket.getPolicy();
216 
217         WebSocketClientConnection connection = new WebSocketClientConnection(endp,executor,connectPromise,policy);
218 
219         WebSocketSession session = new WebSocketSession(request.getRequestURI(),websocket,connection);
220         session.setPolicy(policy);
221         session.setUpgradeResponse(response);
222 
223         connection.setSession(session);
224 
225         // Initialize / Negotiate Extensions
226         ExtensionStack extensionStack = new ExtensionStack(connectPromise.getClient().getExtensionFactory());
227         extensionStack.negotiate(response.getExtensions());
228 
229         extensionStack.configure(connection.getParser());
230         extensionStack.configure(connection.getGenerator());
231 
232         // Setup Incoming Routing
233         connection.setNextIncomingFrames(extensionStack);
234         extensionStack.setNextIncoming(session);
235 
236         // Setup Outgoing Routing
237         session.setOutgoingHandler(extensionStack);
238         extensionStack.setNextOutgoing(connection);
239 
240         // Now swap out the connection
241         endp.setConnection(connection);
242         connection.onOpen();
243     }
244 
245     private void validateResponse(ClientUpgradeResponse response)
246     {
247         // Validate Response Status Code
248         if (response.getStatusCode() != SWITCHING_PROTOCOLS)
249         {
250             throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Didn't switch protocols");
251         }
252 
253         // Validate Connection header
254         String connection = response.getHeader("Connection");
255         if (!"upgrade".equalsIgnoreCase(connection))
256         {
257             throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Connection is " + connection + " (expected upgrade)");
258         }
259 
260         // Check the Accept hash
261         String reqKey = request.getKey();
262         String expectedHash = AcceptHash.hashKey(reqKey);
263         String respHash = response.getHeader("Sec-WebSocket-Accept");
264 
265         response.setSuccess(true);
266         if (expectedHash.equalsIgnoreCase(respHash) == false)
267         {
268             response.setSuccess(false);
269             throw new UpgradeException(request.getRequestURI(),response.getStatusCode(),"Invalid Sec-WebSocket-Accept hash");
270         }
271 
272         // Parse extensions
273         List<ExtensionConfig> extensions = new ArrayList<>();
274         List<String> extValues = response.getHeaders("Sec-WebSocket-Extensions");
275         if (extValues != null)
276         {
277             for (String extVal : extValues)
278             {
279                 QuotedStringTokenizer tok = new QuotedStringTokenizer(extVal,",");
280                 while (tok.hasMoreTokens())
281                 {
282                     extensions.add(ExtensionConfig.parse(tok.nextToken()));
283                 }
284             }
285         }
286         response.setExtensions(extensions);
287     }
288 }