1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.spdy.server.proxy;
21
22 import java.nio.ByteBuffer;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.TimeoutException;
25
26 import org.eclipse.jetty.client.HttpClient;
27 import org.eclipse.jetty.client.api.Request;
28 import org.eclipse.jetty.client.api.Response;
29 import org.eclipse.jetty.client.util.DeferredContentProvider;
30 import org.eclipse.jetty.http.HttpField;
31 import org.eclipse.jetty.http.HttpMethod;
32 import org.eclipse.jetty.http.HttpStatus;
33 import org.eclipse.jetty.http.HttpVersion;
34 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
35 import org.eclipse.jetty.spdy.api.DataInfo;
36 import org.eclipse.jetty.spdy.api.HeadersInfo;
37 import org.eclipse.jetty.spdy.api.ReplyInfo;
38 import org.eclipse.jetty.spdy.api.RstInfo;
39 import org.eclipse.jetty.spdy.api.Stream;
40 import org.eclipse.jetty.spdy.api.StreamFrameListener;
41 import org.eclipse.jetty.spdy.api.StreamStatus;
42 import org.eclipse.jetty.spdy.api.SynInfo;
43 import org.eclipse.jetty.spdy.server.http.HTTPSPDYHeader;
44 import org.eclipse.jetty.util.BufferUtil;
45 import org.eclipse.jetty.util.Callback;
46 import org.eclipse.jetty.util.Fields;
47 import org.eclipse.jetty.util.HttpCookieStore;
48 import org.eclipse.jetty.util.log.Log;
49 import org.eclipse.jetty.util.log.Logger;
50
51
52
53
54
55 public class HTTPProxyEngine extends ProxyEngine
56 {
57 private static final Logger LOG = Log.getLogger(HTTPProxyEngine.class);
58 private static final Callback LOGGING_CALLBACK = new LoggingCallback();
59
60 private final HttpClient httpClient;
61
62 public HTTPProxyEngine(HttpClient httpClient)
63 {
64 this.httpClient = httpClient;
65 configureHttpClient(httpClient);
66 }
67
68 private void configureHttpClient(HttpClient httpClient)
69 {
70
71 httpClient.setFollowRedirects(false);
72
73 httpClient.setCookieStore(new HttpCookieStore.Empty());
74 }
75
76 public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
77 {
78 short version = clientStream.getSession().getVersion();
79 String method = clientSynInfo.getHeaders().get(HTTPSPDYHeader.METHOD.name(version)).value();
80 String path = clientSynInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).value();
81
82 Fields headers = new Fields(clientSynInfo.getHeaders(), false);
83
84 removeHopHeaders(headers);
85 addRequestProxyHeaders(clientStream, headers);
86 customizeRequestHeaders(clientStream, headers);
87
88 String host = proxyServerInfo.getHost();
89 int port = proxyServerInfo.getAddress().getPort();
90
91 LOG.debug("Sending HTTP request to: {}", host + ":" + port);
92 final Request request = httpClient.newRequest(host, port)
93 .path(path)
94 .method(HttpMethod.fromString(method));
95 addNonSpdyHeadersToRequest(version, headers, request);
96
97 if (!clientSynInfo.isClose())
98 {
99 request.content(new DeferredContentProvider());
100 }
101
102 sendRequest(clientStream, request);
103
104 return new StreamFrameListener.Adapter()
105 {
106 @Override
107 public void onReply(Stream stream, ReplyInfo replyInfo)
108 {
109
110 throw new UnsupportedOperationException("Not Yet Implemented");
111 }
112
113 @Override
114 public void onHeaders(Stream stream, HeadersInfo headersInfo)
115 {
116 throw new UnsupportedOperationException("Not Yet Implemented");
117 }
118
119 @Override
120 public void onData(Stream clientStream, final DataInfo clientDataInfo)
121 {
122 LOG.debug("received clientDataInfo: {} for stream: {}", clientDataInfo, clientStream);
123
124 DeferredContentProvider contentProvider = (DeferredContentProvider)request.getContent();
125 contentProvider.offer(clientDataInfo.asByteBuffer(true));
126
127 if (clientDataInfo.isClose())
128 contentProvider.close();
129 }
130 };
131 }
132
133 private void sendRequest(final Stream clientStream, Request request)
134 {
135 request.send(new Response.Listener.Empty()
136 {
137 private volatile boolean committed;
138
139 @Override
140 public void onHeaders(final Response response)
141 {
142 LOG.debug("onHeaders called with response: {}. Sending replyInfo to client.", response);
143 Fields responseHeaders = createResponseHeaders(clientStream, response);
144 removeHopHeaders(responseHeaders);
145 ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
146 clientStream.reply(replyInfo, new Callback.Adapter()
147 {
148 @Override
149 public void failed(Throwable x)
150 {
151 LOG.debug("failed: ", x);
152 response.abort(x);
153 }
154
155 @Override
156 public void succeeded()
157 {
158 committed = true;
159 }
160 });
161 }
162
163 @Override
164 public void onContent(final Response response, ByteBuffer content)
165 {
166 LOG.debug("onContent called with response: {} and content: {}. Sending response content to client.",
167 response, content);
168 final ByteBuffer contentCopy = httpClient.getByteBufferPool().acquire(content.remaining(), true);
169 BufferUtil.flipPutFlip(content, contentCopy);
170 ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(contentCopy, false);
171 clientStream.data(dataInfo, new Callback()
172 {
173 @Override
174 public void failed(Throwable x)
175 {
176 LOG.debug("failed: ", x);
177 releaseBuffer();
178 response.abort(x);
179 }
180
181 @Override
182 public void succeeded()
183 {
184 releaseBuffer();
185 }
186
187 private void releaseBuffer()
188 {
189 httpClient.getByteBufferPool().release(contentCopy);
190 }
191 });
192 }
193
194 @Override
195 public void onSuccess(Response response)
196 {
197 LOG.debug("onSuccess called. Closing client stream.");
198 clientStream.data(new ByteBufferDataInfo(BufferUtil.EMPTY_BUFFER, true), LOGGING_CALLBACK);
199 }
200
201 @Override
202 public void onFailure(Response response, Throwable failure)
203 {
204 LOG.debug("onFailure called: ", failure);
205 if (committed)
206 {
207 LOG.debug("clientStream already committed. Resetting stream.");
208 try
209 {
210 clientStream.getSession().rst(new RstInfo(clientStream.getId(), StreamStatus.INTERNAL_ERROR));
211 }
212 catch (InterruptedException | ExecutionException | TimeoutException e)
213 {
214 LOG.debug(e);
215 }
216 }
217 else
218 {
219 if (clientStream.isClosed())
220 return;
221 Fields responseHeaders = createResponseHeaders(clientStream, response);
222 if (failure instanceof TimeoutException)
223 responseHeaders.add(HTTPSPDYHeader.STATUS.name(clientStream.getSession().getVersion()),
224 String.valueOf(HttpStatus.GATEWAY_TIMEOUT_504));
225 else
226 responseHeaders.add(HTTPSPDYHeader.STATUS.name(clientStream.getSession().getVersion()),
227 String.valueOf(HttpStatus.BAD_GATEWAY_502));
228 ReplyInfo replyInfo = new ReplyInfo(responseHeaders, true);
229 clientStream.reply(replyInfo, LOGGING_CALLBACK);
230 }
231 }
232 });
233 }
234
235 private Fields createResponseHeaders(Stream clientStream, Response response)
236 {
237 Fields responseHeaders = new Fields();
238 for (HttpField header : response.getHeaders())
239 responseHeaders.add(header.getName(), header.getValue());
240 short version = clientStream.getSession().getVersion();
241 if (response.getStatus() > 0)
242 responseHeaders.add(HTTPSPDYHeader.STATUS.name(version),
243 String.valueOf(response.getStatus()));
244 responseHeaders.add(HTTPSPDYHeader.VERSION.name(version), HttpVersion.HTTP_1_1.asString());
245 addResponseProxyHeaders(clientStream, responseHeaders);
246 return responseHeaders;
247 }
248
249 private void addNonSpdyHeadersToRequest(short version, Fields headers, Request request)
250 {
251 for (Fields.Field header : headers)
252 if (HTTPSPDYHeader.from(version, header.name()) == null)
253 request.header(header.name(), header.value());
254 }
255
256 static class LoggingCallback extends Callback.Adapter
257 {
258 @Override
259 public void failed(Throwable x)
260 {
261 LOG.debug(x);
262 }
263
264 @Override
265 public void succeeded()
266 {
267 LOG.debug("succeeded");
268 }
269 }
270 }