1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.proxy;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.WritePendingException;
24
25 import javax.servlet.ReadListener;
26 import javax.servlet.ServletConfig;
27 import javax.servlet.ServletException;
28 import javax.servlet.ServletInputStream;
29 import javax.servlet.ServletOutputStream;
30 import javax.servlet.WriteListener;
31 import javax.servlet.http.HttpServletRequest;
32 import javax.servlet.http.HttpServletResponse;
33
34 import org.eclipse.jetty.client.api.ContentProvider;
35 import org.eclipse.jetty.client.api.Request;
36 import org.eclipse.jetty.client.api.Response;
37 import org.eclipse.jetty.client.util.DeferredContentProvider;
38 import org.eclipse.jetty.util.Callback;
39 import org.eclipse.jetty.util.IteratingCallback;
40
41
42
43
44
45
46
47
48
49 public class AsyncProxyServlet extends ProxyServlet
50 {
51 private static final String WRITE_LISTENER_ATTRIBUTE = AsyncProxyServlet.class.getName() + ".writeListener";
52
53 @Override
54 protected ContentProvider proxyRequestContent(HttpServletRequest request, HttpServletResponse response, Request proxyRequest) throws IOException
55 {
56 ServletInputStream input = request.getInputStream();
57 DeferredContentProvider provider = new DeferredContentProvider();
58 input.setReadListener(newReadListener(request, response, proxyRequest, provider));
59 return provider;
60 }
61
62 protected ReadListener newReadListener(HttpServletRequest request, HttpServletResponse response, Request proxyRequest, DeferredContentProvider provider)
63 {
64 return new StreamReader(request, response, proxyRequest, provider);
65 }
66
67 @Override
68 protected void onResponseContent(HttpServletRequest request, HttpServletResponse response, Response proxyResponse, byte[] buffer, int offset, int length, Callback callback)
69 {
70 try
71 {
72 if (_log.isDebugEnabled())
73 _log.debug("{} proxying content to downstream: {} bytes", getRequestId(request), length);
74 StreamWriter writeListener = (StreamWriter)request.getAttribute(WRITE_LISTENER_ATTRIBUTE);
75 if (writeListener == null)
76 {
77 writeListener = newWriteListener(request, proxyResponse);
78 request.setAttribute(WRITE_LISTENER_ATTRIBUTE, writeListener);
79
80
81
82
83 writeListener.data(buffer, offset, length, callback);
84
85
86 response.getOutputStream().setWriteListener(writeListener);
87 }
88 else
89 {
90 writeListener.data(buffer, offset, length, callback);
91 writeListener.onWritePossible();
92 }
93 }
94 catch (Throwable x)
95 {
96 callback.failed(x);
97 proxyResponse.abort(x);
98 }
99 }
100
101 protected StreamWriter newWriteListener(HttpServletRequest request, Response proxyResponse)
102 {
103 return new StreamWriter(request, proxyResponse);
104 }
105
106
107
108
109
110
111 public static class Transparent extends AsyncProxyServlet
112 {
113 private final TransparentDelegate delegate = new TransparentDelegate(this);
114
115 @Override
116 public void init(ServletConfig config) throws ServletException
117 {
118 super.init(config);
119 delegate.init(config);
120 }
121
122 @Override
123 protected String rewriteTarget(HttpServletRequest clientRequest)
124 {
125 return delegate.rewriteTarget(clientRequest);
126 }
127 }
128
129 protected class StreamReader extends IteratingCallback implements ReadListener
130 {
131 private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
132 private final HttpServletRequest request;
133 private final HttpServletResponse response;
134 private final Request proxyRequest;
135 private final DeferredContentProvider provider;
136
137 protected StreamReader(HttpServletRequest request, HttpServletResponse response, Request proxyRequest, DeferredContentProvider provider)
138 {
139 this.request = request;
140 this.response = response;
141 this.proxyRequest = proxyRequest;
142 this.provider = provider;
143 }
144
145 @Override
146 public void onDataAvailable() throws IOException
147 {
148 iterate();
149 }
150
151 @Override
152 public void onAllDataRead() throws IOException
153 {
154 if (_log.isDebugEnabled())
155 _log.debug("{} proxying content to upstream completed", getRequestId(request));
156 provider.close();
157 }
158
159 @Override
160 public void onError(Throwable t)
161 {
162 onClientRequestFailure(request, proxyRequest, response, t);
163 }
164
165 @Override
166 protected Action process() throws Exception
167 {
168 int requestId = _log.isDebugEnabled() ? getRequestId(request) : 0;
169 ServletInputStream input = request.getInputStream();
170
171
172
173 while (input.isReady() && !input.isFinished())
174 {
175 int read = input.read(buffer);
176 if (_log.isDebugEnabled())
177 _log.debug("{} asynchronous read {} bytes on {}", requestId, read, input);
178 if (read > 0)
179 {
180 if (_log.isDebugEnabled())
181 _log.debug("{} proxying content to upstream: {} bytes", requestId, read);
182 onRequestContent(request, proxyRequest, provider, buffer, 0, read, this);
183 return Action.SCHEDULED;
184 }
185 }
186
187 if (input.isFinished())
188 {
189 if (_log.isDebugEnabled())
190 _log.debug("{} asynchronous read complete on {}", requestId, input);
191 return Action.SUCCEEDED;
192 }
193 else
194 {
195 if (_log.isDebugEnabled())
196 _log.debug("{} asynchronous read pending on {}", requestId, input);
197 return Action.IDLE;
198 }
199 }
200
201 protected void onRequestContent(HttpServletRequest request, Request proxyRequest, DeferredContentProvider provider, byte[] buffer, int offset, int length, Callback callback)
202 {
203 provider.offer(ByteBuffer.wrap(buffer, offset, length), callback);
204 }
205
206 @Override
207 public void failed(Throwable x)
208 {
209 super.failed(x);
210 onError(x);
211 }
212 }
213
214 protected class StreamWriter implements WriteListener
215 {
216 private final HttpServletRequest request;
217 private final Response proxyResponse;
218 private WriteState state;
219 private byte[] buffer;
220 private int offset;
221 private int length;
222 private Callback callback;
223
224 protected StreamWriter(HttpServletRequest request, Response proxyResponse)
225 {
226 this.request = request;
227 this.proxyResponse = proxyResponse;
228 this.state = WriteState.IDLE;
229 }
230
231 protected void data(byte[] bytes, int offset, int length, Callback callback)
232 {
233 if (state != WriteState.IDLE)
234 throw new WritePendingException();
235 this.state = WriteState.READY;
236 this.buffer = bytes;
237 this.offset = offset;
238 this.length = length;
239 this.callback = callback;
240 }
241
242 @Override
243 public void onWritePossible() throws IOException
244 {
245 int requestId = getRequestId(request);
246 ServletOutputStream output = request.getAsyncContext().getResponse().getOutputStream();
247 if (state == WriteState.READY)
248 {
249
250 if (_log.isDebugEnabled())
251 _log.debug("{} asynchronous write start of {} bytes on {}", requestId, length, output);
252 output.write(buffer, offset, length);
253 state = WriteState.PENDING;
254 if (output.isReady())
255 {
256 if (_log.isDebugEnabled())
257 _log.debug("{} asynchronous write of {} bytes completed on {}", requestId, length, output);
258 complete();
259 }
260 else
261 {
262 if (_log.isDebugEnabled())
263 _log.debug("{} asynchronous write of {} bytes pending on {}", requestId, length, output);
264 }
265 }
266 else if (state == WriteState.PENDING)
267 {
268
269 if (_log.isDebugEnabled())
270 _log.debug("{} asynchronous write of {} bytes completing on {}", requestId, length, output);
271 complete();
272 }
273 else
274 {
275 throw new IllegalStateException();
276 }
277 }
278
279 protected void complete()
280 {
281 buffer = null;
282 offset = 0;
283 length = 0;
284 Callback c = callback;
285 callback = null;
286 state = WriteState.IDLE;
287
288
289
290 c.succeeded();
291 }
292
293 @Override
294 public void onError(Throwable failure)
295 {
296 proxyResponse.abort(failure);
297 }
298 }
299
300 private enum WriteState
301 {
302 READY, PENDING, IDLE
303 }
304 }