View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.spdy.server.proxy;
21  
22  import java.nio.ByteBuffer;
23  import java.util.concurrent.ExecutionException;
24  import java.util.concurrent.TimeoutException;
25  
26  import org.eclipse.jetty.client.HttpClient;
27  import org.eclipse.jetty.client.api.Request;
28  import org.eclipse.jetty.client.api.Response;
29  import org.eclipse.jetty.client.util.DeferredContentProvider;
30  import org.eclipse.jetty.http.HttpField;
31  import org.eclipse.jetty.http.HttpMethod;
32  import org.eclipse.jetty.http.HttpStatus;
33  import org.eclipse.jetty.http.HttpVersion;
34  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
35  import org.eclipse.jetty.spdy.api.DataInfo;
36  import org.eclipse.jetty.spdy.api.HeadersInfo;
37  import org.eclipse.jetty.spdy.api.ReplyInfo;
38  import org.eclipse.jetty.spdy.api.RstInfo;
39  import org.eclipse.jetty.spdy.api.Stream;
40  import org.eclipse.jetty.spdy.api.StreamFrameListener;
41  import org.eclipse.jetty.spdy.api.StreamStatus;
42  import org.eclipse.jetty.spdy.api.SynInfo;
43  import org.eclipse.jetty.spdy.server.http.HTTPSPDYHeader;
44  import org.eclipse.jetty.util.BufferUtil;
45  import org.eclipse.jetty.util.Callback;
46  import org.eclipse.jetty.util.Fields;
47  import org.eclipse.jetty.util.HttpCookieStore;
48  import org.eclipse.jetty.util.log.Log;
49  import org.eclipse.jetty.util.log.Logger;
50  
51  /**
52   * <p>{@link HTTPProxyEngine} implements a SPDY to HTTP proxy, that is, converts SPDY events received by clients into
53   * HTTP events for the servers.</p>
54   */
55  public class HTTPProxyEngine extends ProxyEngine
56  {
57      private static final Logger LOG = Log.getLogger(HTTPProxyEngine.class);
58      private static final Callback LOGGING_CALLBACK = new LoggingCallback();
59  
60      private final HttpClient httpClient;
61  
62      public HTTPProxyEngine(HttpClient httpClient)
63      {
64          this.httpClient = httpClient;
65          configureHttpClient(httpClient);
66      }
67  
68      private void configureHttpClient(HttpClient httpClient)
69      {
70          // Redirects must be proxied as is, not followed
71          httpClient.setFollowRedirects(false);
72          // Must not store cookies, otherwise cookies of different clients will mix
73          httpClient.setCookieStore(new HttpCookieStore.Empty());
74      }
75  
76      public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
77      {
78          short version = clientStream.getSession().getVersion();
79          String method = clientSynInfo.getHeaders().get(HTTPSPDYHeader.METHOD.name(version)).value();
80          String path = clientSynInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value();
81  
82          Fields headers = new Fields(clientSynInfo.getHeaders(), false);
83  
84          removeHopHeaders(headers);
85          addRequestProxyHeaders(clientStream, headers);
86          customizeRequestHeaders(clientStream, headers);
87  
88          String host = proxyServerInfo.getHost();
89          int port = proxyServerInfo.getAddress().getPort();
90  
91          LOG.debug("Sending HTTP request to: {}", host + ":" + port);
92          final Request request = httpClient.newRequest(host, port)
93                  .path(path)
94                  .method(HttpMethod.fromString(method));
95          addNonSpdyHeadersToRequest(version, headers, request);
96  
97          if (!clientSynInfo.isClose())
98          {
99              request.content(new DeferredContentProvider());
100         }
101 
102         sendRequest(clientStream, request);
103 
104         return new StreamFrameListener.Adapter()
105         {
106             @Override
107             public void onReply(Stream stream, ReplyInfo replyInfo)
108             {
109                 // We proxy to HTTP so we do not receive replies
110                 throw new UnsupportedOperationException("Not Yet Implemented");
111             }
112 
113             @Override
114             public void onHeaders(Stream stream, HeadersInfo headersInfo)
115             {
116                 throw new UnsupportedOperationException("Not Yet Implemented");
117             }
118 
119             @Override
120             public void onData(Stream clientStream, final DataInfo clientDataInfo)
121             {
122                 LOG.debug("received clientDataInfo: {} for stream: {}", clientDataInfo, clientStream);
123 
124                 DeferredContentProvider contentProvider = (DeferredContentProvider)request.getContent();
125                 contentProvider.offer(clientDataInfo.asByteBuffer(true));
126 
127                 if (clientDataInfo.isClose())
128                     contentProvider.close();
129             }
130         };
131     }
132 
133     private void sendRequest(final Stream clientStream, Request request)
134     {
135         request.send(new Response.Listener.Empty()
136         {
137             private volatile boolean committed;
138 
139             @Override
140             public void onHeaders(final Response response)
141             {
142                 LOG.debug("onHeaders called with response: {}. Sending replyInfo to client.", response);
143                 Fields responseHeaders = createResponseHeaders(clientStream, response);
144                 ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
145                 clientStream.reply(replyInfo, new Callback.Adapter()
146                 {
147                     @Override
148                     public void failed(Throwable x)
149                     {
150                         LOG.debug("failed: ", x);
151                         response.abort(x);
152                     }
153 
154                     @Override
155                     public void succeeded()
156                     {
157                         committed = true;
158                     }
159                 });
160             }
161 
162             @Override
163             public void onContent(final Response response, ByteBuffer content)
164             {
165                 LOG.debug("onContent called with response: {} and content: {}. Sending response content to client.",
166                         response, content);
167                 final ByteBuffer contentCopy = httpClient.getByteBufferPool().acquire(content.remaining(), true);
168                 BufferUtil.flipPutFlip(content, contentCopy);
169                 ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(contentCopy, false);
170                 clientStream.data(dataInfo, new Callback()
171                 {
172                     @Override
173                     public void failed(Throwable x)
174                     {
175                         LOG.debug("failed: ", x);
176                         releaseBuffer();
177                         response.abort(x);
178                     }
179 
180                     @Override
181                     public void succeeded()
182                     {
183                         releaseBuffer();
184                     }
185 
186                     private void releaseBuffer()
187                     {
188                         httpClient.getByteBufferPool().release(contentCopy);
189                     }
190                 });
191             }
192 
193             @Override
194             public void onSuccess(Response response)
195             {
196                 LOG.debug("onSuccess called. Closing client stream.");
197                 clientStream.data(new ByteBufferDataInfo(BufferUtil.EMPTY_BUFFER, true), LOGGING_CALLBACK);
198             }
199 
200             @Override
201             public void onFailure(Response response, Throwable failure)
202             {
203                 LOG.debug("onFailure called: ", failure);
204                 if (committed)
205                 {
206                     LOG.debug("clientStream already committed. Resetting stream.");
207                     try
208                     {
209                         clientStream.getSession().rst(new RstInfo(clientStream.getId(), StreamStatus.INTERNAL_ERROR));
210                     }
211                     catch (InterruptedException | ExecutionException | TimeoutException e)
212                     {
213                         LOG.debug(e);
214                     }
215                 }
216                 else
217                 {
218                     if (clientStream.isClosed())
219                         return;
220                     Fields responseHeaders = createResponseHeaders(clientStream, response);
221                     if (failure instanceof TimeoutException)
222                         responseHeaders.add(HTTPSPDYHeader.STATUS.name(clientStream.getSession().getVersion()),
223                                 String.valueOf(HttpStatus.GATEWAY_TIMEOUT_504));
224                     else
225                         responseHeaders.add(HTTPSPDYHeader.STATUS.name(clientStream.getSession().getVersion()),
226                                 String.valueOf(HttpStatus.BAD_GATEWAY_502));
227                     ReplyInfo replyInfo = new ReplyInfo(responseHeaders, true);
228                     clientStream.reply(replyInfo, LOGGING_CALLBACK);
229                 }
230             }
231         });
232     }
233 
234     private Fields createResponseHeaders(Stream clientStream, Response response)
235     {
236         Fields responseHeaders = new Fields();
237         for (HttpField header : response.getHeaders())
238             responseHeaders.add(header.getName(), header.getValue());
239             short version = clientStream.getSession().getVersion();
240         if (response.getStatus() > 0)
241             responseHeaders.add(HTTPSPDYHeader.STATUS.name(version),
242                     String.valueOf(response.getStatus()));
243         responseHeaders.add(HTTPSPDYHeader.VERSION.name(version), HttpVersion.HTTP_1_1.asString());
244         addResponseProxyHeaders(clientStream, responseHeaders);
245         return responseHeaders;
246     }
247 
248     private void addNonSpdyHeadersToRequest(short version, Fields headers, Request request)
249     {
250         for (Fields.Field header : headers)
251             if (HTTPSPDYHeader.from(version, header.name()) == null)
252                 request.header(header.name(), header.value());
253     }
254 
255     static class LoggingCallback extends Callback.Adapter
256     {
257         @Override
258         public void failed(Throwable x)
259         {
260             LOG.debug(x);
261         }
262 
263         @Override
264         public void succeeded()
265         {
266             LOG.debug("succeeded");
267         }
268     }
269 }