1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.spdy.server.proxy;
21
22 import java.nio.ByteBuffer;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.TimeoutException;
25
26 import org.eclipse.jetty.client.HttpClient;
27 import org.eclipse.jetty.client.api.Request;
28 import org.eclipse.jetty.client.api.Response;
29 import org.eclipse.jetty.client.util.DeferredContentProvider;
30 import org.eclipse.jetty.http.HttpField;
31 import org.eclipse.jetty.http.HttpMethod;
32 import org.eclipse.jetty.http.HttpStatus;
33 import org.eclipse.jetty.http.HttpVersion;
34 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
35 import org.eclipse.jetty.spdy.api.DataInfo;
36 import org.eclipse.jetty.spdy.api.HeadersInfo;
37 import org.eclipse.jetty.spdy.api.ReplyInfo;
38 import org.eclipse.jetty.spdy.api.RstInfo;
39 import org.eclipse.jetty.spdy.api.Stream;
40 import org.eclipse.jetty.spdy.api.StreamFrameListener;
41 import org.eclipse.jetty.spdy.api.StreamStatus;
42 import org.eclipse.jetty.spdy.api.SynInfo;
43 import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
44 import org.eclipse.jetty.util.BufferUtil;
45 import org.eclipse.jetty.util.Callback;
46 import org.eclipse.jetty.util.Fields;
47 import org.eclipse.jetty.util.HttpCookieStore;
48 import org.eclipse.jetty.util.log.Log;
49 import org.eclipse.jetty.util.log.Logger;
50
51
52
53
54
55 public class HTTPProxyEngine extends ProxyEngine
56 {
57 private static final Logger LOG = Log.getLogger(HTTPProxyEngine.class);
58 private static final Callback LOGGING_CALLBACK = new LoggingCallback();
59
60 private final HttpClient httpClient;
61
62 public HTTPProxyEngine(HttpClient httpClient)
63 {
64 this.httpClient = httpClient;
65 configureHttpClient(httpClient);
66 }
67
68 private void configureHttpClient(HttpClient httpClient)
69 {
70
71 httpClient.setFollowRedirects(false);
72
73 httpClient.setCookieStore(new HttpCookieStore.Empty());
74 }
75
76 public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
77 {
78 short version = clientStream.getSession().getVersion();
79 String method = clientSynInfo.getHeaders().get(HTTPSPDYHeader.METHOD.name(version)).getValue();
80 String path = clientSynInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version)).getValue();
81
82 Fields headers = new Fields(clientSynInfo.getHeaders(), false);
83
84 removeHopHeaders(headers);
85 addRequestProxyHeaders(clientStream, headers);
86 customizeRequestHeaders(clientStream, headers);
87
88 String host = proxyServerInfo.getHost();
89 int port = proxyServerInfo.getAddress().getPort();
90
91 if (LOG.isDebugEnabled())
92 LOG.debug("Sending HTTP request to: {}", host + ":" + port);
93 final Request request = httpClient.newRequest(host, port)
94 .path(path)
95 .method(HttpMethod.fromString(method));
96 addNonSpdyHeadersToRequest(version, headers, request);
97
98 if (!clientSynInfo.isClose())
99 {
100 request.content(new DeferredContentProvider());
101 }
102
103 sendRequest(clientStream, request);
104
105 return new StreamFrameListener.Adapter()
106 {
107 @Override
108 public void onReply(Stream stream, ReplyInfo replyInfo)
109 {
110
111 throw new UnsupportedOperationException("Not Yet Implemented");
112 }
113
114 @Override
115 public void onHeaders(Stream stream, HeadersInfo headersInfo)
116 {
117 throw new UnsupportedOperationException("Not Yet Implemented");
118 }
119
120 @Override
121 public void onData(Stream clientStream, final DataInfo clientDataInfo)
122 {
123 if (LOG.isDebugEnabled())
124 LOG.debug("received clientDataInfo: {} for stream: {}", clientDataInfo, clientStream);
125
126 DeferredContentProvider contentProvider = (DeferredContentProvider)request.getContent();
127 contentProvider.offer(clientDataInfo.asByteBuffer(true));
128
129 if (clientDataInfo.isClose())
130 contentProvider.close();
131 }
132 };
133 }
134
135 private void sendRequest(final Stream clientStream, Request request)
136 {
137 request.send(new Response.Listener.Adapter()
138 {
139 private volatile boolean committed;
140
141 @Override
142 public void onHeaders(final Response response)
143 {
144 if (LOG.isDebugEnabled())
145 LOG.debug("onHeaders called with response: {}. Sending replyInfo to client.", response);
146 Fields responseHeaders = createResponseHeaders(clientStream, response);
147 removeHopHeaders(responseHeaders);
148 ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
149 clientStream.reply(replyInfo, new Callback.Adapter()
150 {
151 @Override
152 public void failed(Throwable x)
153 {
154 LOG.debug("failed: ", x);
155 response.abort(x);
156 }
157
158 @Override
159 public void succeeded()
160 {
161 committed = true;
162 }
163 });
164 }
165
166 @Override
167 public void onContent(final Response response, ByteBuffer content)
168 {
169 if (LOG.isDebugEnabled())
170 LOG.debug("onContent called with response: {} and content: {}. Sending response content to client.",
171 response, content);
172 final ByteBuffer contentCopy = httpClient.getByteBufferPool().acquire(content.remaining(), true);
173 BufferUtil.flipPutFlip(content, contentCopy);
174 ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(contentCopy, false);
175 clientStream.data(dataInfo, new Callback()
176 {
177 @Override
178 public void failed(Throwable x)
179 {
180 LOG.debug("failed: ", x);
181 releaseBuffer();
182 response.abort(x);
183 }
184
185 @Override
186 public void succeeded()
187 {
188 releaseBuffer();
189 }
190
191 private void releaseBuffer()
192 {
193 httpClient.getByteBufferPool().release(contentCopy);
194 }
195 });
196 }
197
198 @Override
199 public void onSuccess(Response response)
200 {
201 if (LOG.isDebugEnabled())
202 LOG.debug("onSuccess called. Closing client stream.");
203 clientStream.data(new ByteBufferDataInfo(BufferUtil.EMPTY_BUFFER, true), LOGGING_CALLBACK);
204 }
205
206 @Override
207 public void onFailure(Response response, Throwable failure)
208 {
209 LOG.debug("onFailure called: ", failure);
210 if (committed)
211 {
212 LOG.debug("clientStream already committed. Resetting stream.");
213 try
214 {
215 clientStream.getSession().rst(new RstInfo(clientStream.getId(), StreamStatus.INTERNAL_ERROR));
216 }
217 catch (InterruptedException | ExecutionException | TimeoutException e)
218 {
219 LOG.debug(e);
220 }
221 }
222 else
223 {
224 if (clientStream.isClosed())
225 return;
226 Fields responseHeaders = createResponseHeaders(clientStream, response);
227 if (failure instanceof TimeoutException)
228 responseHeaders.add(HTTPSPDYHeader.STATUS.name(clientStream.getSession().getVersion()),
229 String.valueOf(HttpStatus.GATEWAY_TIMEOUT_504));
230 else
231 responseHeaders.add(HTTPSPDYHeader.STATUS.name(clientStream.getSession().getVersion()),
232 String.valueOf(HttpStatus.BAD_GATEWAY_502));
233 ReplyInfo replyInfo = new ReplyInfo(responseHeaders, true);
234 clientStream.reply(replyInfo, LOGGING_CALLBACK);
235 }
236 }
237 });
238 }
239
240 private Fields createResponseHeaders(Stream clientStream, Response response)
241 {
242 Fields responseHeaders = new Fields();
243 for (HttpField header : response.getHeaders())
244 responseHeaders.add(header.getName(), header.getValue());
245 short version = clientStream.getSession().getVersion();
246 if (response.getStatus() > 0)
247 responseHeaders.add(HTTPSPDYHeader.STATUS.name(version),
248 String.valueOf(response.getStatus()));
249 responseHeaders.add(HTTPSPDYHeader.VERSION.name(version), HttpVersion.HTTP_1_1.asString());
250 addResponseProxyHeaders(clientStream, responseHeaders);
251 return responseHeaders;
252 }
253
254 private void addNonSpdyHeadersToRequest(short version, Fields headers, Request request)
255 {
256 for (Fields.Field header : headers)
257 if (HTTPSPDYHeader.from(version, header.getName()) == null)
258 request.header(header.getName(), header.getValue());
259 }
260
261 static class LoggingCallback extends Callback.Adapter
262 {
263 @Override
264 public void failed(Throwable x)
265 {
266 LOG.debug(x);
267 }
268
269 @Override
270 public void succeeded()
271 {
272 if (LOG.isDebugEnabled())
273 LOG.debug("succeeded");
274 }
275 }
276 }