View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.spdy.server.proxy;
21  
22  import java.nio.ByteBuffer;
23  import java.util.concurrent.ExecutionException;
24  import java.util.concurrent.TimeoutException;
25  
26  import org.eclipse.jetty.client.HttpClient;
27  import org.eclipse.jetty.client.api.Request;
28  import org.eclipse.jetty.client.api.Response;
29  import org.eclipse.jetty.client.util.DeferredContentProvider;
30  import org.eclipse.jetty.http.HttpField;
31  import org.eclipse.jetty.http.HttpMethod;
32  import org.eclipse.jetty.http.HttpStatus;
33  import org.eclipse.jetty.http.HttpVersion;
34  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
35  import org.eclipse.jetty.spdy.api.DataInfo;
36  import org.eclipse.jetty.spdy.api.HeadersInfo;
37  import org.eclipse.jetty.spdy.api.ReplyInfo;
38  import org.eclipse.jetty.spdy.api.RstInfo;
39  import org.eclipse.jetty.spdy.api.Stream;
40  import org.eclipse.jetty.spdy.api.StreamFrameListener;
41  import org.eclipse.jetty.spdy.api.StreamStatus;
42  import org.eclipse.jetty.spdy.api.SynInfo;
43  import org.eclipse.jetty.spdy.server.http.HTTPSPDYHeader;
44  import org.eclipse.jetty.util.BufferUtil;
45  import org.eclipse.jetty.util.Callback;
46  import org.eclipse.jetty.util.Fields;
47  import org.eclipse.jetty.util.HttpCookieStore;
48  import org.eclipse.jetty.util.log.Log;
49  import org.eclipse.jetty.util.log.Logger;
50  
51  /**
52   * <p>{@link HTTPProxyEngine} implements a SPDY to HTTP proxy, that is, converts SPDY events received by clients into
53   * HTTP events for the servers.</p>
54   */
55  public class HTTPProxyEngine extends ProxyEngine
56  {
57      private static final Logger LOG = Log.getLogger(HTTPProxyEngine.class);
58      private static final Callback LOGGING_CALLBACK = new LoggingCallback();
59  
60      private final HttpClient httpClient;
61  
62      public HTTPProxyEngine(HttpClient httpClient)
63      {
64          this.httpClient = httpClient;
65          configureHttpClient(httpClient);
66      }
67  
68      private void configureHttpClient(HttpClient httpClient)
69      {
70          // Redirects must be proxied as is, not followed
71          httpClient.setFollowRedirects(false);
72          // Must not store cookies, otherwise cookies of different clients will mix
73          httpClient.setCookieStore(new HttpCookieStore.Empty());
74      }
75  
76      public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
77      {
78          short version = clientStream.getSession().getVersion();
79          String method = clientSynInfo.getHeaders().get(HTTPSPDYHeader.METHOD.name(version)).value();
80          String path = clientSynInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value();
81  
82          Fields headers = new Fields(clientSynInfo.getHeaders(), false);
83  
84          removeHopHeaders(headers);
85          addRequestProxyHeaders(clientStream, headers);
86          customizeRequestHeaders(clientStream, headers);
87  
88          String host = proxyServerInfo.getHost();
89          int port = proxyServerInfo.getAddress().getPort();
90  
91          LOG.debug("Sending HTTP request to: {}", host + ":" + port);
92          final Request request = httpClient.newRequest(host, port)
93                  .path(path)
94                  .method(HttpMethod.fromString(method));
95          addNonSpdyHeadersToRequest(version, headers, request);
96  
97          if (!clientSynInfo.isClose())
98          {
99              request.content(new DeferredContentProvider());
100         }
101 
102         sendRequest(clientStream, request);
103 
104         return new StreamFrameListener.Adapter()
105         {
106             @Override
107             public void onReply(Stream stream, ReplyInfo replyInfo)
108             {
109                 // We proxy to HTTP so we do not receive replies
110                 throw new UnsupportedOperationException("Not Yet Implemented");
111             }
112 
113             @Override
114             public void onHeaders(Stream stream, HeadersInfo headersInfo)
115             {
116                 throw new UnsupportedOperationException("Not Yet Implemented");
117             }
118 
119             @Override
120             public void onData(Stream clientStream, final DataInfo clientDataInfo)
121             {
122                 LOG.debug("received clientDataInfo: {} for stream: {}", clientDataInfo, clientStream);
123 
124                 DeferredContentProvider contentProvider = (DeferredContentProvider)request.getContent();
125                 contentProvider.offer(clientDataInfo.asByteBuffer(true));
126 
127                 if (clientDataInfo.isClose())
128                     contentProvider.close();
129             }
130         };
131     }
132 
133     private void sendRequest(final Stream clientStream, Request request)
134     {
135         request.send(new Response.Listener.Empty()
136         {
137             private volatile boolean committed;
138 
139             @Override
140             public void onHeaders(final Response response)
141             {
142                 LOG.debug("onHeaders called with response: {}. Sending replyInfo to client.", response);
143                 Fields responseHeaders = createResponseHeaders(clientStream, response);
144                 removeHopHeaders(responseHeaders);
145                 ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
146                 clientStream.reply(replyInfo, new Callback.Adapter()
147                 {
148                     @Override
149                     public void failed(Throwable x)
150                     {
151                         LOG.debug("failed: ", x);
152                         response.abort(x);
153                     }
154 
155                     @Override
156                     public void succeeded()
157                     {
158                         committed = true;
159                     }
160                 });
161             }
162 
163             @Override
164             public void onContent(final Response response, ByteBuffer content)
165             {
166                 LOG.debug("onContent called with response: {} and content: {}. Sending response content to client.",
167                         response, content);
168                 final ByteBuffer contentCopy = httpClient.getByteBufferPool().acquire(content.remaining(), true);
169                 BufferUtil.flipPutFlip(content, contentCopy);
170                 ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(contentCopy, false);
171                 clientStream.data(dataInfo, new Callback()
172                 {
173                     @Override
174                     public void failed(Throwable x)
175                     {
176                         LOG.debug("failed: ", x);
177                         releaseBuffer();
178                         response.abort(x);
179                     }
180 
181                     @Override
182                     public void succeeded()
183                     {
184                         releaseBuffer();
185                     }
186 
187                     private void releaseBuffer()
188                     {
189                         httpClient.getByteBufferPool().release(contentCopy);
190                     }
191                 });
192             }
193 
194             @Override
195             public void onSuccess(Response response)
196             {
197                 LOG.debug("onSuccess called. Closing client stream.");
198                 clientStream.data(new ByteBufferDataInfo(BufferUtil.EMPTY_BUFFER, true), LOGGING_CALLBACK);
199             }
200 
201             @Override
202             public void onFailure(Response response, Throwable failure)
203             {
204                 LOG.debug("onFailure called: ", failure);
205                 if (committed)
206                 {
207                     LOG.debug("clientStream already committed. Resetting stream.");
208                     try
209                     {
210                         clientStream.getSession().rst(new RstInfo(clientStream.getId(), StreamStatus.INTERNAL_ERROR));
211                     }
212                     catch (InterruptedException | ExecutionException | TimeoutException e)
213                     {
214                         LOG.debug(e);
215                     }
216                 }
217                 else
218                 {
219                     if (clientStream.isClosed())
220                         return;
221                     Fields responseHeaders = createResponseHeaders(clientStream, response);
222                     if (failure instanceof TimeoutException)
223                         responseHeaders.add(HTTPSPDYHeader.STATUS.name(clientStream.getSession().getVersion()),
224                                 String.valueOf(HttpStatus.GATEWAY_TIMEOUT_504));
225                     else
226                         responseHeaders.add(HTTPSPDYHeader.STATUS.name(clientStream.getSession().getVersion()),
227                                 String.valueOf(HttpStatus.BAD_GATEWAY_502));
228                     ReplyInfo replyInfo = new ReplyInfo(responseHeaders, true);
229                     clientStream.reply(replyInfo, LOGGING_CALLBACK);
230                 }
231             }
232         });
233     }
234 
235     private Fields createResponseHeaders(Stream clientStream, Response response)
236     {
237         Fields responseHeaders = new Fields();
238         for (HttpField header : response.getHeaders())
239             responseHeaders.add(header.getName(), header.getValue());
240             short version = clientStream.getSession().getVersion();
241         if (response.getStatus() > 0)
242             responseHeaders.add(HTTPSPDYHeader.STATUS.name(version),
243                     String.valueOf(response.getStatus()));
244         responseHeaders.add(HTTPSPDYHeader.VERSION.name(version), HttpVersion.HTTP_1_1.asString());
245         addResponseProxyHeaders(clientStream, responseHeaders);
246         return responseHeaders;
247     }
248 
249     private void addNonSpdyHeadersToRequest(short version, Fields headers, Request request)
250     {
251         for (Fields.Field header : headers)
252             if (HTTPSPDYHeader.from(version, header.name()) == null)
253                 request.header(header.name(), header.value());
254     }
255 
256     static class LoggingCallback extends Callback.Adapter
257     {
258         @Override
259         public void failed(Throwable x)
260         {
261             LOG.debug(x);
262         }
263 
264         @Override
265         public void succeeded()
266         {
267             LOG.debug("succeeded");
268         }
269     }
270 }