1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.proxy;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.WritePendingException;
25 import javax.servlet.ReadListener;
26 import javax.servlet.ServletConfig;
27 import javax.servlet.ServletException;
28 import javax.servlet.ServletInputStream;
29 import javax.servlet.ServletOutputStream;
30 import javax.servlet.WriteListener;
31 import javax.servlet.http.HttpServletRequest;
32 import javax.servlet.http.HttpServletResponse;
33
34 import org.eclipse.jetty.client.api.ContentProvider;
35 import org.eclipse.jetty.client.api.Request;
36 import org.eclipse.jetty.client.api.Response;
37 import org.eclipse.jetty.client.util.DeferredContentProvider;
38 import org.eclipse.jetty.util.Callback;
39
40 public class AsyncProxyServlet extends ProxyServlet
41 {
42 private static final String WRITE_LISTENER_ATTRIBUTE = AsyncProxyServlet.class.getName() + ".writeListener";
43
44 @Override
45 protected ContentProvider proxyRequestContent(Request proxyRequest, HttpServletRequest request) throws IOException
46 {
47 ServletInputStream input = request.getInputStream();
48 DeferredContentProvider provider = new DeferredContentProvider();
49 input.setReadListener(new StreamReader(proxyRequest, request, provider));
50 return provider;
51 }
52
53 @Override
54 protected void onResponseContent(HttpServletRequest request, HttpServletResponse response, Response proxyResponse, byte[] buffer, int offset, int length, Callback callback)
55 {
56 try
57 {
58 _log.debug("{} proxying content to downstream: {} bytes", getRequestId(request), length);
59 StreamWriter writeListener = (StreamWriter)request.getAttribute(WRITE_LISTENER_ATTRIBUTE);
60 if (writeListener == null)
61 {
62 writeListener = new StreamWriter(request, proxyResponse);
63 request.setAttribute(WRITE_LISTENER_ATTRIBUTE, writeListener);
64
65
66
67
68 writeListener.data(buffer, offset, length, callback);
69
70
71 response.getOutputStream().setWriteListener(writeListener);
72 }
73 else
74 {
75 writeListener.data(buffer, offset, length, callback);
76 writeListener.onWritePossible();
77 }
78 }
79 catch (Throwable x)
80 {
81 callback.failed(x);
82 onResponseFailure(request, response, proxyResponse, x);
83 }
84 }
85
86 public static class Transparent extends AsyncProxyServlet
87 {
88 private final TransparentDelegate delegate = new TransparentDelegate(this);
89
90 @Override
91 public void init(ServletConfig config) throws ServletException
92 {
93 super.init(config);
94 delegate.init(config);
95 }
96
97 @Override
98 protected URI rewriteURI(HttpServletRequest request)
99 {
100 return delegate.rewriteURI(request);
101 }
102 }
103
104 private class StreamReader implements ReadListener, Callback
105 {
106 private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
107 private final Request proxyRequest;
108 private final HttpServletRequest request;
109 private final DeferredContentProvider provider;
110
111 public StreamReader(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider)
112 {
113 this.proxyRequest = proxyRequest;
114 this.request = request;
115 this.provider = provider;
116 }
117
118 @Override
119 public void onDataAvailable() throws IOException
120 {
121 int requestId = getRequestId(request);
122 ServletInputStream input = request.getInputStream();
123 _log.debug("{} asynchronous read start on {}", requestId, input);
124
125
126
127 while (input.isReady() && !input.isFinished())
128 {
129 int read = input.read(buffer);
130 _log.debug("{} asynchronous read {} bytes on {}", requestId, read, input);
131 if (read > 0)
132 {
133 _log.debug("{} proxying content to upstream: {} bytes", requestId, read);
134 provider.offer(ByteBuffer.wrap(buffer, 0, read), this);
135
136 break;
137 }
138 }
139 if (!input.isFinished())
140 _log.debug("{} asynchronous read pending on {}", requestId, input);
141 }
142
143 @Override
144 public void onAllDataRead() throws IOException
145 {
146 _log.debug("{} proxying content to upstream completed", getRequestId(request));
147 provider.close();
148 }
149
150 @Override
151 public void onError(Throwable x)
152 {
153 failed(x);
154 }
155
156 @Override
157 public void succeeded()
158 {
159 try
160 {
161 if (request.getInputStream().isReady())
162 onDataAvailable();
163 }
164 catch (Throwable x)
165 {
166 failed(x);
167 }
168 }
169
170 @Override
171 public void failed(Throwable x)
172 {
173 onClientRequestFailure(proxyRequest, request, x);
174 }
175 }
176
177 private class StreamWriter implements WriteListener
178 {
179 private final HttpServletRequest request;
180 private final Response proxyResponse;
181 private WriteState state;
182 private byte[] buffer;
183 private int offset;
184 private int length;
185 private Callback callback;
186
187 private StreamWriter(HttpServletRequest request, Response proxyResponse)
188 {
189 this.request = request;
190 this.proxyResponse = proxyResponse;
191 this.state = WriteState.IDLE;
192 }
193
194 private void data(byte[] bytes, int offset, int length, Callback callback)
195 {
196 if (state != WriteState.IDLE)
197 throw new WritePendingException();
198 this.state = WriteState.READY;
199 this.buffer = bytes;
200 this.offset = offset;
201 this.length = length;
202 this.callback = callback;
203 }
204
205 @Override
206 public void onWritePossible() throws IOException
207 {
208 int requestId = getRequestId(request);
209 ServletOutputStream output = request.getAsyncContext().getResponse().getOutputStream();
210 if (state == WriteState.READY)
211 {
212
213 _log.debug("{} asynchronous write start of {} bytes on {}", requestId, length, output);
214 output.write(buffer, offset, length);
215 state = WriteState.PENDING;
216 if (output.isReady())
217 {
218 _log.debug("{} asynchronous write of {} bytes completed on {}", requestId, length, output);
219 complete();
220 }
221 else
222 {
223 _log.debug("{} asynchronous write of {} bytes pending on {}", requestId, length, output);
224 }
225 }
226 else if (state == WriteState.PENDING)
227 {
228
229 _log.debug("{} asynchronous write of {} bytes completing on {}", requestId, length, output);
230 complete();
231 }
232 else
233 {
234 throw new IllegalStateException();
235 }
236 }
237
238 private void complete()
239 {
240 buffer = null;
241 offset = 0;
242 length = 0;
243 Callback c = callback;
244 callback = null;
245 state = WriteState.IDLE;
246
247
248
249 c.succeeded();
250 }
251
252 @Override
253 public void onError(Throwable failure)
254 {
255 HttpServletResponse response = (HttpServletResponse)request.getAsyncContext().getResponse();
256 onResponseFailure(request, response, proxyResponse, failure);
257 }
258 }
259
260 private enum WriteState
261 {
262 READY, PENDING, IDLE
263 }
264 }