1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.proxy;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.WritePendingException;
25 import javax.servlet.ReadListener;
26 import javax.servlet.ServletConfig;
27 import javax.servlet.ServletException;
28 import javax.servlet.ServletInputStream;
29 import javax.servlet.ServletOutputStream;
30 import javax.servlet.WriteListener;
31 import javax.servlet.http.HttpServletRequest;
32 import javax.servlet.http.HttpServletResponse;
33
34 import org.eclipse.jetty.client.api.ContentProvider;
35 import org.eclipse.jetty.client.api.Request;
36 import org.eclipse.jetty.client.api.Response;
37 import org.eclipse.jetty.client.util.DeferredContentProvider;
38 import org.eclipse.jetty.util.Callback;
39 import org.eclipse.jetty.util.IteratingCallback;
40
41 public class AsyncProxyServlet extends ProxyServlet
42 {
43 private static final String WRITE_LISTENER_ATTRIBUTE = AsyncProxyServlet.class.getName() + ".writeListener";
44
45 @Override
46 protected ContentProvider proxyRequestContent(Request proxyRequest, HttpServletRequest request) throws IOException
47 {
48 ServletInputStream input = request.getInputStream();
49 DeferredContentProvider provider = new DeferredContentProvider();
50 input.setReadListener(newReadListener(proxyRequest, request, provider));
51 return provider;
52 }
53
54 protected ReadListener newReadListener(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider)
55 {
56 return new StreamReader(proxyRequest, request, provider);
57 }
58
59 @Override
60 protected void onResponseContent(HttpServletRequest request, HttpServletResponse response, Response proxyResponse, byte[] buffer, int offset, int length, Callback callback)
61 {
62 try
63 {
64 if (_log.isDebugEnabled())
65 _log.debug("{} proxying content to downstream: {} bytes", getRequestId(request), length);
66 StreamWriter writeListener = (StreamWriter)request.getAttribute(WRITE_LISTENER_ATTRIBUTE);
67 if (writeListener == null)
68 {
69 writeListener = newWriteListener(request, proxyResponse);
70 request.setAttribute(WRITE_LISTENER_ATTRIBUTE, writeListener);
71
72
73
74
75 writeListener.data(buffer, offset, length, callback);
76
77
78 response.getOutputStream().setWriteListener(writeListener);
79 }
80 else
81 {
82 writeListener.data(buffer, offset, length, callback);
83 writeListener.onWritePossible();
84 }
85 }
86 catch (Throwable x)
87 {
88 callback.failed(x);
89 proxyResponse.abort(x);
90 }
91 }
92
93 protected StreamWriter newWriteListener(HttpServletRequest request, Response proxyResponse)
94 {
95 return new StreamWriter(request, proxyResponse);
96 }
97
98 public static class Transparent extends AsyncProxyServlet
99 {
100 private final TransparentDelegate delegate = new TransparentDelegate(this);
101
102 @Override
103 public void init(ServletConfig config) throws ServletException
104 {
105 super.init(config);
106 delegate.init(config);
107 }
108
109 @Override
110 protected URI rewriteURI(HttpServletRequest request)
111 {
112 return delegate.rewriteURI(request);
113 }
114 }
115
116 protected class StreamReader extends IteratingCallback implements ReadListener
117 {
118 private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
119 private final Request proxyRequest;
120 private final HttpServletRequest request;
121 private final DeferredContentProvider provider;
122
123 protected StreamReader(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider)
124 {
125 this.proxyRequest = proxyRequest;
126 this.request = request;
127 this.provider = provider;
128 }
129
130 @Override
131 public void onDataAvailable() throws IOException
132 {
133 iterate();
134 }
135
136 @Override
137 public void onAllDataRead() throws IOException
138 {
139 if (_log.isDebugEnabled())
140 _log.debug("{} proxying content to upstream completed", getRequestId(request));
141 provider.close();
142 }
143
144 @Override
145 public void onError(Throwable t)
146 {
147 onClientRequestFailure(proxyRequest, request, t);
148 }
149
150 @Override
151 protected Action process() throws Exception
152 {
153 int requestId = _log.isDebugEnabled() ? getRequestId(request) : 0;
154 ServletInputStream input = request.getInputStream();
155
156
157
158 while (input.isReady() && !input.isFinished())
159 {
160 int read = input.read(buffer);
161 if (_log.isDebugEnabled())
162 _log.debug("{} asynchronous read {} bytes on {}", requestId, read, input);
163 if (read > 0)
164 {
165 if (_log.isDebugEnabled())
166 _log.debug("{} proxying content to upstream: {} bytes", requestId, read);
167 onRequestContent(proxyRequest, request, provider, buffer, 0, read, this);
168 return Action.SCHEDULED;
169 }
170 }
171
172 if (input.isFinished())
173 {
174 if (_log.isDebugEnabled())
175 _log.debug("{} asynchronous read complete on {}", requestId, input);
176 return Action.SUCCEEDED;
177 }
178 else
179 {
180 if (_log.isDebugEnabled())
181 _log.debug("{} asynchronous read pending on {}", requestId, input);
182 return Action.IDLE;
183 }
184 }
185
186 protected void onRequestContent(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider, byte[] buffer, int offset, int length, Callback callback)
187 {
188 provider.offer(ByteBuffer.wrap(buffer, offset, length), callback);
189 }
190
191 @Override
192 public void failed(Throwable x)
193 {
194 super.failed(x);
195 onError(x);
196 }
197 }
198
199 protected class StreamWriter implements WriteListener
200 {
201 private final HttpServletRequest request;
202 private final Response proxyResponse;
203 private WriteState state;
204 private byte[] buffer;
205 private int offset;
206 private int length;
207 private Callback callback;
208
209 protected StreamWriter(HttpServletRequest request, Response proxyResponse)
210 {
211 this.request = request;
212 this.proxyResponse = proxyResponse;
213 this.state = WriteState.IDLE;
214 }
215
216 protected void data(byte[] bytes, int offset, int length, Callback callback)
217 {
218 if (state != WriteState.IDLE)
219 throw new WritePendingException();
220 this.state = WriteState.READY;
221 this.buffer = bytes;
222 this.offset = offset;
223 this.length = length;
224 this.callback = callback;
225 }
226
227 @Override
228 public void onWritePossible() throws IOException
229 {
230 int requestId = getRequestId(request);
231 ServletOutputStream output = request.getAsyncContext().getResponse().getOutputStream();
232 if (state == WriteState.READY)
233 {
234
235 if (_log.isDebugEnabled())
236 _log.debug("{} asynchronous write start of {} bytes on {}", requestId, length, output);
237 output.write(buffer, offset, length);
238 state = WriteState.PENDING;
239 if (output.isReady())
240 {
241 if (_log.isDebugEnabled())
242 _log.debug("{} asynchronous write of {} bytes completed on {}", requestId, length, output);
243 complete();
244 }
245 else
246 {
247 if (_log.isDebugEnabled())
248 _log.debug("{} asynchronous write of {} bytes pending on {}", requestId, length, output);
249 }
250 }
251 else if (state == WriteState.PENDING)
252 {
253
254 if (_log.isDebugEnabled())
255 _log.debug("{} asynchronous write of {} bytes completing on {}", requestId, length, output);
256 complete();
257 }
258 else
259 {
260 throw new IllegalStateException();
261 }
262 }
263
264 protected void complete()
265 {
266 buffer = null;
267 offset = 0;
268 length = 0;
269 Callback c = callback;
270 callback = null;
271 state = WriteState.IDLE;
272
273
274
275 c.succeeded();
276 }
277
278 @Override
279 public void onError(Throwable failure)
280 {
281 proxyResponse.abort(failure);
282 }
283 }
284
285 private enum WriteState
286 {
287 READY, PENDING, IDLE
288 }
289 }