1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.spdy.server.proxy;
21
22 import java.nio.ByteBuffer;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.TimeoutException;
25
26 import org.eclipse.jetty.client.HttpClient;
27 import org.eclipse.jetty.client.api.Request;
28 import org.eclipse.jetty.client.api.Response;
29 import org.eclipse.jetty.client.util.DeferredContentProvider;
30 import org.eclipse.jetty.http.HttpField;
31 import org.eclipse.jetty.http.HttpMethod;
32 import org.eclipse.jetty.http.HttpStatus;
33 import org.eclipse.jetty.http.HttpVersion;
34 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
35 import org.eclipse.jetty.spdy.api.DataInfo;
36 import org.eclipse.jetty.spdy.api.HeadersInfo;
37 import org.eclipse.jetty.spdy.api.ReplyInfo;
38 import org.eclipse.jetty.spdy.api.RstInfo;
39 import org.eclipse.jetty.spdy.api.Stream;
40 import org.eclipse.jetty.spdy.api.StreamFrameListener;
41 import org.eclipse.jetty.spdy.api.StreamStatus;
42 import org.eclipse.jetty.spdy.api.SynInfo;
43 import org.eclipse.jetty.spdy.server.http.HTTPSPDYHeader;
44 import org.eclipse.jetty.util.BufferUtil;
45 import org.eclipse.jetty.util.Callback;
46 import org.eclipse.jetty.util.Fields;
47 import org.eclipse.jetty.util.HttpCookieStore;
48 import org.eclipse.jetty.util.log.Log;
49 import org.eclipse.jetty.util.log.Logger;
50
51
52
53
54
55 public class HTTPProxyEngine extends ProxyEngine
56 {
57 private static final Logger LOG = Log.getLogger(HTTPProxyEngine.class);
58 private static final Callback LOGGING_CALLBACK = new LoggingCallback();
59
60 private final HttpClient httpClient;
61
62 public HTTPProxyEngine(HttpClient httpClient)
63 {
64 this.httpClient = httpClient;
65 configureHttpClient(httpClient);
66 }
67
68 private void configureHttpClient(HttpClient httpClient)
69 {
70
71 httpClient.setFollowRedirects(false);
72
73 httpClient.setCookieStore(new HttpCookieStore.Empty());
74 }
75
76 public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
77 {
78 short version = clientStream.getSession().getVersion();
79 String method = clientSynInfo.getHeaders().get(HTTPSPDYHeader.METHOD.name(version)).value();
80 String path = clientSynInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value();
81
82 Fields headers = new Fields(clientSynInfo.getHeaders(), false);
83
84 removeHopHeaders(headers);
85 addRequestProxyHeaders(clientStream, headers);
86 customizeRequestHeaders(clientStream, headers);
87
88 String host = proxyServerInfo.getHost();
89 int port = proxyServerInfo.getAddress().getPort();
90
91 LOG.debug("Sending HTTP request to: {}", host + ":" + port);
92 final Request request = httpClient.newRequest(host, port)
93 .path(path)
94 .method(HttpMethod.fromString(method));
95 addNonSpdyHeadersToRequest(version, headers, request);
96
97 if (!clientSynInfo.isClose())
98 {
99 request.content(new DeferredContentProvider());
100 }
101
102 sendRequest(clientStream, request);
103
104 return new StreamFrameListener.Adapter()
105 {
106 @Override
107 public void onReply(Stream stream, ReplyInfo replyInfo)
108 {
109
110 throw new UnsupportedOperationException("Not Yet Implemented");
111 }
112
113 @Override
114 public void onHeaders(Stream stream, HeadersInfo headersInfo)
115 {
116 throw new UnsupportedOperationException("Not Yet Implemented");
117 }
118
119 @Override
120 public void onData(Stream clientStream, final DataInfo clientDataInfo)
121 {
122 LOG.debug("received clientDataInfo: {} for stream: {}", clientDataInfo, clientStream);
123
124 DeferredContentProvider contentProvider = (DeferredContentProvider)request.getContent();
125 contentProvider.offer(clientDataInfo.asByteBuffer(true));
126
127 if (clientDataInfo.isClose())
128 contentProvider.close();
129 }
130 };
131 }
132
133 private void sendRequest(final Stream clientStream, Request request)
134 {
135 request.send(new Response.Listener.Empty()
136 {
137 private volatile boolean committed;
138
139 @Override
140 public void onHeaders(final Response response)
141 {
142 LOG.debug("onHeaders called with response: {}. Sending replyInfo to client.", response);
143 Fields responseHeaders = createResponseHeaders(clientStream, response);
144 ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
145 clientStream.reply(replyInfo, new Callback.Adapter()
146 {
147 @Override
148 public void failed(Throwable x)
149 {
150 LOG.debug("failed: ", x);
151 response.abort(x);
152 }
153
154 @Override
155 public void succeeded()
156 {
157 committed = true;
158 }
159 });
160 }
161
162 @Override
163 public void onContent(final Response response, ByteBuffer content)
164 {
165 LOG.debug("onContent called with response: {} and content: {}. Sending response content to client.",
166 response, content);
167 final ByteBuffer contentCopy = httpClient.getByteBufferPool().acquire(content.remaining(), true);
168 BufferUtil.flipPutFlip(content, contentCopy);
169 ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(contentCopy, false);
170 clientStream.data(dataInfo, new Callback()
171 {
172 @Override
173 public void failed(Throwable x)
174 {
175 LOG.debug("failed: ", x);
176 releaseBuffer();
177 response.abort(x);
178 }
179
180 @Override
181 public void succeeded()
182 {
183 releaseBuffer();
184 }
185
186 private void releaseBuffer()
187 {
188 httpClient.getByteBufferPool().release(contentCopy);
189 }
190 });
191 }
192
193 @Override
194 public void onSuccess(Response response)
195 {
196 LOG.debug("onSuccess called. Closing client stream.");
197 clientStream.data(new ByteBufferDataInfo(BufferUtil.EMPTY_BUFFER, true), LOGGING_CALLBACK);
198 }
199
200 @Override
201 public void onFailure(Response response, Throwable failure)
202 {
203 LOG.debug("onFailure called: ", failure);
204 if (committed)
205 {
206 LOG.debug("clientStream already committed. Resetting stream.");
207 try
208 {
209 clientStream.getSession().rst(new RstInfo(clientStream.getId(), StreamStatus.INTERNAL_ERROR));
210 }
211 catch (InterruptedException | ExecutionException | TimeoutException e)
212 {
213 LOG.debug(e);
214 }
215 }
216 else
217 {
218 if (clientStream.isClosed())
219 return;
220 Fields responseHeaders = createResponseHeaders(clientStream, response);
221 if (failure instanceof TimeoutException)
222 responseHeaders.add(HTTPSPDYHeader.STATUS.name(clientStream.getSession().getVersion()),
223 String.valueOf(HttpStatus.GATEWAY_TIMEOUT_504));
224 else
225 responseHeaders.add(HTTPSPDYHeader.STATUS.name(clientStream.getSession().getVersion()),
226 String.valueOf(HttpStatus.BAD_GATEWAY_502));
227 ReplyInfo replyInfo = new ReplyInfo(responseHeaders, true);
228 clientStream.reply(replyInfo, LOGGING_CALLBACK);
229 }
230 }
231 });
232 }
233
234 private Fields createResponseHeaders(Stream clientStream, Response response)
235 {
236 Fields responseHeaders = new Fields();
237 for (HttpField header : response.getHeaders())
238 responseHeaders.add(header.getName(), header.getValue());
239 short version = clientStream.getSession().getVersion();
240 if (response.getStatus() > 0)
241 responseHeaders.add(HTTPSPDYHeader.STATUS.name(version),
242 String.valueOf(response.getStatus()));
243 responseHeaders.add(HTTPSPDYHeader.VERSION.name(version), HttpVersion.HTTP_1_1.asString());
244 addResponseProxyHeaders(clientStream, responseHeaders);
245 return responseHeaders;
246 }
247
248 private void addNonSpdyHeadersToRequest(short version, Fields headers, Request request)
249 {
250 for (Fields.Field header : headers)
251 if (HTTPSPDYHeader.from(version, header.name()) == null)
252 request.header(header.name(), header.value());
253 }
254
255 static class LoggingCallback extends Callback.Adapter
256 {
257 @Override
258 public void failed(Throwable x)
259 {
260 LOG.debug(x);
261 }
262
263 @Override
264 public void succeeded()
265 {
266 LOG.debug("succeeded");
267 }
268 }
269 }