View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.spdy.server.proxy;
21  
22  import java.nio.ByteBuffer;
23  import java.util.concurrent.ExecutionException;
24  import java.util.concurrent.TimeoutException;
25  
26  import org.eclipse.jetty.client.HttpClient;
27  import org.eclipse.jetty.client.api.Request;
28  import org.eclipse.jetty.client.api.Response;
29  import org.eclipse.jetty.client.util.DeferredContentProvider;
30  import org.eclipse.jetty.http.HttpField;
31  import org.eclipse.jetty.http.HttpMethod;
32  import org.eclipse.jetty.http.HttpStatus;
33  import org.eclipse.jetty.http.HttpVersion;
34  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
35  import org.eclipse.jetty.spdy.api.DataInfo;
36  import org.eclipse.jetty.spdy.api.HeadersInfo;
37  import org.eclipse.jetty.spdy.api.ReplyInfo;
38  import org.eclipse.jetty.spdy.api.RstInfo;
39  import org.eclipse.jetty.spdy.api.Stream;
40  import org.eclipse.jetty.spdy.api.StreamFrameListener;
41  import org.eclipse.jetty.spdy.api.StreamStatus;
42  import org.eclipse.jetty.spdy.api.SynInfo;
43  import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
44  import org.eclipse.jetty.util.BufferUtil;
45  import org.eclipse.jetty.util.Callback;
46  import org.eclipse.jetty.util.Fields;
47  import org.eclipse.jetty.util.HttpCookieStore;
48  import org.eclipse.jetty.util.log.Log;
49  import org.eclipse.jetty.util.log.Logger;
50  
51  /**
52   * <p>{@link HTTPProxyEngine} implements a SPDY to HTTP proxy, that is, converts SPDY events received by clients into
53   * HTTP events for the servers.</p>
54   */
55  public class HTTPProxyEngine extends ProxyEngine
56  {
57      private static final Logger LOG = Log.getLogger(HTTPProxyEngine.class);
58      private static final Callback LOGGING_CALLBACK = new LoggingCallback();
59  
60      private final HttpClient httpClient;
61  
62      public HTTPProxyEngine(HttpClient httpClient)
63      {
64          this.httpClient = httpClient;
65          configureHttpClient(httpClient);
66      }
67  
68      private void configureHttpClient(HttpClient httpClient)
69      {
70          // Redirects must be proxied as is, not followed
71          httpClient.setFollowRedirects(false);
72          // Must not store cookies, otherwise cookies of different clients will mix
73          httpClient.setCookieStore(new HttpCookieStore.Empty());
74      }
75  
76      public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
77      {
78          short version = clientStream.getSession().getVersion();
79          String method = clientSynInfo.getHeaders().get(HTTPSPDYHeader.METHOD.name(version)).getValue();
80          String path = clientSynInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).getValue();
81  
82          Fields headers = new Fields(clientSynInfo.getHeaders(), false);
83  
84          removeHopHeaders(headers);
85          addRequestProxyHeaders(clientStream, headers);
86          customizeRequestHeaders(clientStream, headers);
87  
88          String host = proxyServerInfo.getHost();
89          int port = proxyServerInfo.getAddress().getPort();
90  
91          if (LOG.isDebugEnabled())
92              LOG.debug("Sending HTTP request to: {}", host + ":" + port);
93          final Request request = httpClient.newRequest(host, port)
94                  .path(path)
95                  .method(HttpMethod.fromString(method));
96          addNonSpdyHeadersToRequest(version, headers, request);
97  
98          if (!clientSynInfo.isClose())
99          {
100             request.content(new DeferredContentProvider());
101         }
102 
103         sendRequest(clientStream, request);
104 
105         return new StreamFrameListener.Adapter()
106         {
107             @Override
108             public void onReply(Stream stream, ReplyInfo replyInfo)
109             {
110                 // We proxy to HTTP so we do not receive replies
111                 throw new UnsupportedOperationException("Not Yet Implemented");
112             }
113 
114             @Override
115             public void onHeaders(Stream stream, HeadersInfo headersInfo)
116             {
117                 throw new UnsupportedOperationException("Not Yet Implemented");
118             }
119 
120             @Override
121             public void onData(Stream clientStream, final DataInfo clientDataInfo)
122             {
123                 if (LOG.isDebugEnabled())
124                     LOG.debug("received clientDataInfo: {} for stream: {}", clientDataInfo, clientStream);
125 
126                 DeferredContentProvider contentProvider = (DeferredContentProvider)request.getContent();
127                 contentProvider.offer(clientDataInfo.asByteBuffer(true));
128 
129                 if (clientDataInfo.isClose())
130                     contentProvider.close();
131             }
132         };
133     }
134 
135     private void sendRequest(final Stream clientStream, Request request)
136     {
137         request.send(new Response.Listener.Adapter()
138         {
139             private volatile boolean committed;
140 
141             @Override
142             public void onHeaders(final Response response)
143             {
144                 if (LOG.isDebugEnabled())
145                     LOG.debug("onHeaders called with response: {}. Sending replyInfo to client.", response);
146                 Fields responseHeaders = createResponseHeaders(clientStream, response);
147                 removeHopHeaders(responseHeaders);
148                 ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
149                 clientStream.reply(replyInfo, new Callback.Adapter()
150                 {
151                     @Override
152                     public void failed(Throwable x)
153                     {
154                         LOG.debug("failed: ", x);
155                         response.abort(x);
156                     }
157 
158                     @Override
159                     public void succeeded()
160                     {
161                         committed = true;
162                     }
163                 });
164             }
165 
166             @Override
167             public void onContent(final Response response, ByteBuffer content)
168             {
169                 if (LOG.isDebugEnabled())
170                     LOG.debug("onContent called with response: {} and content: {}. Sending response content to client.",
171                         response, content);
172                 final ByteBuffer contentCopy = httpClient.getByteBufferPool().acquire(content.remaining(), true);
173                 BufferUtil.flipPutFlip(content, contentCopy);
174                 ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(contentCopy, false);
175                 clientStream.data(dataInfo, new Callback()
176                 {
177                     @Override
178                     public void failed(Throwable x)
179                     {
180                         LOG.debug("failed: ", x);
181                         releaseBuffer();
182                         response.abort(x);
183                     }
184 
185                     @Override
186                     public void succeeded()
187                     {
188                         releaseBuffer();
189                     }
190 
191                     private void releaseBuffer()
192                     {
193                         httpClient.getByteBufferPool().release(contentCopy);
194                     }
195                 });
196             }
197 
198             @Override
199             public void onSuccess(Response response)
200             {
201                 if (LOG.isDebugEnabled())
202                     LOG.debug("onSuccess called. Closing client stream.");
203                 clientStream.data(new ByteBufferDataInfo(BufferUtil.EMPTY_BUFFER, true), LOGGING_CALLBACK);
204             }
205 
206             @Override
207             public void onFailure(Response response, Throwable failure)
208             {
209                 LOG.debug("onFailure called: ", failure);
210                 if (committed)
211                 {
212                     LOG.debug("clientStream already committed. Resetting stream.");
213                     try
214                     {
215                         clientStream.getSession().rst(new RstInfo(clientStream.getId(), StreamStatus.INTERNAL_ERROR));
216                     }
217                     catch (InterruptedException | ExecutionException | TimeoutException e)
218                     {
219                         LOG.debug(e);
220                     }
221                 }
222                 else
223                 {
224                     if (clientStream.isClosed())
225                         return;
226                     Fields responseHeaders = createResponseHeaders(clientStream, response);
227                     if (failure instanceof TimeoutException)
228                         responseHeaders.add(HTTPSPDYHeader.STATUS.name(clientStream.getSession().getVersion()),
229                                 String.valueOf(HttpStatus.GATEWAY_TIMEOUT_504));
230                     else
231                         responseHeaders.add(HTTPSPDYHeader.STATUS.name(clientStream.getSession().getVersion()),
232                                 String.valueOf(HttpStatus.BAD_GATEWAY_502));
233                     ReplyInfo replyInfo = new ReplyInfo(responseHeaders, true);
234                     clientStream.reply(replyInfo, LOGGING_CALLBACK);
235                 }
236             }
237         });
238     }
239 
240     private Fields createResponseHeaders(Stream clientStream, Response response)
241     {
242         Fields responseHeaders = new Fields();
243         for (HttpField header : response.getHeaders())
244             responseHeaders.add(header.getName(), header.getValue());
245             short version = clientStream.getSession().getVersion();
246         if (response.getStatus() > 0)
247             responseHeaders.add(HTTPSPDYHeader.STATUS.name(version),
248                     String.valueOf(response.getStatus()));
249         responseHeaders.add(HTTPSPDYHeader.VERSION.name(version), HttpVersion.HTTP_1_1.asString());
250         addResponseProxyHeaders(clientStream, responseHeaders);
251         return responseHeaders;
252     }
253 
254     private void addNonSpdyHeadersToRequest(short version, Fields headers, Request request)
255     {
256         for (Fields.Field header : headers)
257             if (HTTPSPDYHeader.from(version, header.getName()) == null)
258                 request.header(header.getName(), header.getValue());
259     }
260 
261     static class LoggingCallback extends Callback.Adapter
262     {
263         @Override
264         public void failed(Throwable x)
265         {
266             LOG.debug(x);
267         }
268 
269         @Override
270         public void succeeded()
271         {
272             if (LOG.isDebugEnabled())
273                 LOG.debug("succeeded");
274         }
275     }
276 }