1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.proxy;
20
21 import java.io.ByteArrayOutputStream;
22 import java.io.IOException;
23 import java.io.OutputStream;
24 import java.nio.ByteBuffer;
25 import java.util.ArrayDeque;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.TimeUnit;
30 import java.util.zip.GZIPOutputStream;
31
32 import javax.servlet.AsyncContext;
33 import javax.servlet.ReadListener;
34 import javax.servlet.ServletConfig;
35 import javax.servlet.ServletException;
36 import javax.servlet.ServletInputStream;
37 import javax.servlet.ServletOutputStream;
38 import javax.servlet.WriteListener;
39 import javax.servlet.http.HttpServletRequest;
40 import javax.servlet.http.HttpServletResponse;
41
42 import org.eclipse.jetty.client.ContentDecoder;
43 import org.eclipse.jetty.client.GZIPContentDecoder;
44 import org.eclipse.jetty.client.api.ContentProvider;
45 import org.eclipse.jetty.client.api.Request;
46 import org.eclipse.jetty.client.api.Response;
47 import org.eclipse.jetty.client.api.Result;
48 import org.eclipse.jetty.client.util.DeferredContentProvider;
49 import org.eclipse.jetty.http.HttpHeader;
50 import org.eclipse.jetty.http.HttpVersion;
51 import org.eclipse.jetty.io.RuntimeIOException;
52 import org.eclipse.jetty.util.BufferUtil;
53 import org.eclipse.jetty.util.Callback;
54 import org.eclipse.jetty.util.CountingCallback;
55 import org.eclipse.jetty.util.IteratingCallback;
56 import org.eclipse.jetty.util.component.Destroyable;
57
58
59
60
61
62
63
64
65
66
67 public class AsyncMiddleManServlet extends AbstractProxyServlet
68 {
69 private static final String PROXY_REQUEST_COMMITTED = AsyncMiddleManServlet.class.getName() + ".proxyRequestCommitted";
70 private static final String CLIENT_TRANSFORMER = AsyncMiddleManServlet.class.getName() + ".clientTransformer";
71 private static final String SERVER_TRANSFORMER = AsyncMiddleManServlet.class.getName() + ".serverTransformer";
72
73 @Override
74 protected void service(HttpServletRequest clientRequest, HttpServletResponse proxyResponse) throws ServletException, IOException
75 {
76 String rewrittenTarget = rewriteTarget(clientRequest);
77 if (_log.isDebugEnabled())
78 {
79 StringBuffer target = clientRequest.getRequestURL();
80 if (clientRequest.getQueryString() != null)
81 target.append("?").append(clientRequest.getQueryString());
82 _log.debug("{} rewriting: {} -> {}", getRequestId(clientRequest), target, rewrittenTarget);
83 }
84 if (rewrittenTarget == null)
85 {
86 onProxyRewriteFailed(clientRequest, proxyResponse);
87 return;
88 }
89
90 final Request proxyRequest = getHttpClient().newRequest(rewrittenTarget)
91 .method(clientRequest.getMethod())
92 .version(HttpVersion.fromString(clientRequest.getProtocol()));
93
94 boolean hasContent = hasContent(clientRequest);
95
96 copyRequestHeaders(clientRequest, proxyRequest);
97
98 addProxyHeaders(clientRequest, proxyRequest);
99
100 final AsyncContext asyncContext = clientRequest.startAsync();
101
102 asyncContext.setTimeout(0);
103 proxyRequest.timeout(getTimeout(), TimeUnit.MILLISECONDS);
104
105
106
107
108 if (hasContent)
109 proxyRequest.content(newProxyContentProvider(clientRequest, proxyResponse, proxyRequest));
110 else
111 sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
112 }
113
114 protected ContentProvider newProxyContentProvider(final HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest) throws IOException
115 {
116 ServletInputStream input = clientRequest.getInputStream();
117 DeferredContentProvider provider = new DeferredContentProvider()
118 {
119 @Override
120 public boolean offer(ByteBuffer buffer, Callback callback)
121 {
122 if (_log.isDebugEnabled())
123 _log.debug("{} proxying content to upstream: {} bytes", getRequestId(clientRequest), buffer.remaining());
124 return super.offer(buffer, callback);
125 }
126 };
127 input.setReadListener(newProxyReadListener(clientRequest, proxyResponse, proxyRequest, provider));
128 return provider;
129 }
130
131 protected ReadListener newProxyReadListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest, DeferredContentProvider provider)
132 {
133 return new ProxyReader(clientRequest, proxyResponse, proxyRequest, provider);
134 }
135
136 protected ProxyWriter newProxyWriteListener(HttpServletRequest clientRequest, Response proxyResponse)
137 {
138 return new ProxyWriter(clientRequest, proxyResponse);
139 }
140
141 protected Response.CompleteListener newProxyResponseListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse)
142 {
143 return new ProxyResponseListener(clientRequest, proxyResponse);
144 }
145
146 protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
147 {
148 return ContentTransformer.IDENTITY;
149 }
150
151 protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
152 {
153 return ContentTransformer.IDENTITY;
154 }
155
156 private void transform(ContentTransformer transformer, ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
157 {
158 try
159 {
160 transformer.transform(input, finished, output);
161 }
162 catch (Throwable x)
163 {
164 _log.info("Exception while transforming " + transformer, x);
165 throw x;
166 }
167 }
168
169 int readClientRequestContent(ServletInputStream input, byte[] buffer) throws IOException
170 {
171 return input.read(buffer);
172 }
173
174 void writeProxyResponseContent(ServletOutputStream output, ByteBuffer content) throws IOException
175 {
176 write(output, content);
177 }
178
179 private static void write(OutputStream output, ByteBuffer content) throws IOException
180 {
181 int length = content.remaining();
182 int offset = 0;
183 byte[] buffer;
184 if (content.hasArray())
185 {
186 offset = content.arrayOffset();
187 buffer = content.array();
188 }
189 else
190 {
191 buffer = new byte[length];
192 content.get(buffer);
193 }
194 output.write(buffer, offset, length);
195 }
196
197 private void cleanup(HttpServletRequest clientRequest)
198 {
199 ContentTransformer clientTransformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER);
200 if (clientTransformer instanceof Destroyable)
201 ((Destroyable)clientTransformer).destroy();
202 ContentTransformer serverTransformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
203 if (serverTransformer instanceof Destroyable)
204 ((Destroyable)serverTransformer).destroy();
205 }
206
207
208
209
210
211
212 public static class Transparent extends ProxyServlet
213 {
214 private final TransparentDelegate delegate = new TransparentDelegate(this);
215
216 @Override
217 public void init(ServletConfig config) throws ServletException
218 {
219 super.init(config);
220 delegate.init(config);
221 }
222
223 @Override
224 protected String rewriteTarget(HttpServletRequest request)
225 {
226 return delegate.rewriteTarget(request);
227 }
228 }
229
230 protected class ProxyReader extends IteratingCallback implements ReadListener
231 {
232 private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
233 private final List<ByteBuffer> buffers = new ArrayList<>();
234 private final HttpServletRequest clientRequest;
235 private final HttpServletResponse proxyResponse;
236 private final Request proxyRequest;
237 private final DeferredContentProvider provider;
238 private final int contentLength;
239 private int length;
240
241 protected ProxyReader(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest, DeferredContentProvider provider)
242 {
243 this.clientRequest = clientRequest;
244 this.proxyResponse = proxyResponse;
245 this.proxyRequest = proxyRequest;
246 this.provider = provider;
247 this.contentLength = clientRequest.getContentLength();
248 }
249
250 @Override
251 public void onDataAvailable() throws IOException
252 {
253 iterate();
254 }
255
256 @Override
257 public void onAllDataRead() throws IOException
258 {
259 if (!provider.isClosed())
260 {
261 process(BufferUtil.EMPTY_BUFFER, new Callback()
262 {
263 @Override
264 public void failed(Throwable x)
265 {
266 onError(x);
267 }
268 }, true);
269 }
270
271 if (_log.isDebugEnabled())
272 _log.debug("{} proxying content to upstream completed", getRequestId(clientRequest));
273 }
274
275 @Override
276 public void onError(Throwable t)
277 {
278 cleanup(clientRequest);
279 onClientRequestFailure(clientRequest, proxyRequest, proxyResponse, t);
280 }
281
282 @Override
283 protected Action process() throws Exception
284 {
285 ServletInputStream input = clientRequest.getInputStream();
286 while (input.isReady() && !input.isFinished())
287 {
288 int read = readClientRequestContent(input, buffer);
289
290 if (_log.isDebugEnabled())
291 _log.debug("{} asynchronous read {} bytes on {}", getRequestId(clientRequest), read, input);
292
293 if (read < 0)
294 return Action.SUCCEEDED;
295
296 if (contentLength > 0 && read > 0)
297 length += read;
298
299 ByteBuffer content = read > 0 ? ByteBuffer.wrap(buffer, 0, read) : BufferUtil.EMPTY_BUFFER;
300 boolean finished = length == contentLength;
301 process(content, this, finished);
302
303 if (read > 0)
304 return Action.SCHEDULED;
305 }
306
307 if (input.isFinished())
308 {
309 if (_log.isDebugEnabled())
310 _log.debug("{} asynchronous read complete on {}", getRequestId(clientRequest), input);
311 return Action.SUCCEEDED;
312 }
313 else
314 {
315 if (_log.isDebugEnabled())
316 _log.debug("{} asynchronous read pending on {}", getRequestId(clientRequest), input);
317 return Action.IDLE;
318 }
319 }
320
321 private void process(ByteBuffer content, Callback callback, boolean finished) throws IOException
322 {
323 ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER);
324 if (transformer == null)
325 {
326 transformer = newClientRequestContentTransformer(clientRequest, proxyRequest);
327 clientRequest.setAttribute(CLIENT_TRANSFORMER, transformer);
328 }
329
330 boolean committed = clientRequest.getAttribute(PROXY_REQUEST_COMMITTED) != null;
331
332 int contentBytes = content.remaining();
333
334
335 if (contentBytes == 0 && !finished)
336 {
337 callback.succeeded();
338 return;
339 }
340
341 transform(transformer, content, finished, buffers);
342
343 int newContentBytes = 0;
344 int size = buffers.size();
345 if (size > 0)
346 {
347 CountingCallback counter = new CountingCallback(callback, size);
348 for (int i = 0; i < size; ++i)
349 {
350 ByteBuffer buffer = buffers.get(i);
351 newContentBytes += buffer.remaining();
352 provider.offer(buffer, counter);
353 }
354 buffers.clear();
355 }
356
357 if (finished)
358 provider.close();
359
360 if (_log.isDebugEnabled())
361 _log.debug("{} upstream content transformation {} -> {} bytes", getRequestId(clientRequest), contentBytes, newContentBytes);
362
363 if (!committed && (size > 0 || finished))
364 {
365 proxyRequest.header(HttpHeader.CONTENT_LENGTH, null);
366 clientRequest.setAttribute(PROXY_REQUEST_COMMITTED, true);
367 sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
368 }
369
370 if (size == 0)
371 callback.succeeded();
372 }
373
374 @Override
375 protected void onCompleteFailure(Throwable x)
376 {
377 onError(x);
378 }
379 }
380
381 protected class ProxyResponseListener extends Response.Listener.Adapter implements Callback
382 {
383 private final String WRITE_LISTENER_ATTRIBUTE = AsyncMiddleManServlet.class.getName() + ".writeListener";
384
385 private final Callback complete = new CountingCallback(this, 2);
386 private final List<ByteBuffer> buffers = new ArrayList<>();
387 private final HttpServletRequest clientRequest;
388 private final HttpServletResponse proxyResponse;
389 private boolean hasContent;
390 private long contentLength;
391 private long length;
392 private Response response;
393
394 protected ProxyResponseListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse)
395 {
396 this.clientRequest = clientRequest;
397 this.proxyResponse = proxyResponse;
398 }
399
400 @Override
401 public void onBegin(Response serverResponse)
402 {
403 proxyResponse.setStatus(serverResponse.getStatus());
404 }
405
406 @Override
407 public void onHeaders(Response serverResponse)
408 {
409 contentLength = serverResponse.getHeaders().getLongField(HttpHeader.CONTENT_LENGTH.asString());
410 onServerResponseHeaders(clientRequest, proxyResponse, serverResponse);
411 }
412
413 @Override
414 public void onContent(final Response serverResponse, ByteBuffer content, final Callback callback)
415 {
416 try
417 {
418 int contentBytes = content.remaining();
419 if (_log.isDebugEnabled())
420 _log.debug("{} received server content: {} bytes", getRequestId(clientRequest), contentBytes);
421
422 hasContent = true;
423
424 ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE);
425 boolean committed = proxyWriter != null;
426 if (proxyWriter == null)
427 {
428 proxyWriter = newProxyWriteListener(clientRequest, serverResponse);
429 clientRequest.setAttribute(WRITE_LISTENER_ATTRIBUTE, proxyWriter);
430 }
431
432 ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
433 if (transformer == null)
434 {
435 transformer = newServerResponseContentTransformer(clientRequest, proxyResponse, serverResponse);
436 clientRequest.setAttribute(SERVER_TRANSFORMER, transformer);
437 }
438
439 length += contentBytes;
440
441 boolean finished = contentLength >= 0 && length == contentLength;
442 transform(transformer, content, finished, buffers);
443
444 int newContentBytes = 0;
445 int size = buffers.size();
446 if (size > 0)
447 {
448 Callback counter = size == 1 ? callback : new CountingCallback(callback, size);
449 for (int i = 0; i < size; ++i)
450 {
451 ByteBuffer buffer = buffers.get(i);
452 newContentBytes += buffer.remaining();
453 proxyWriter.offer(buffer, counter);
454 }
455 buffers.clear();
456 }
457 else
458 {
459 proxyWriter.offer(BufferUtil.EMPTY_BUFFER, callback);
460 }
461 if (finished)
462 proxyWriter.offer(BufferUtil.EMPTY_BUFFER, complete);
463
464 if (_log.isDebugEnabled())
465 _log.debug("{} downstream content transformation {} -> {} bytes", getRequestId(clientRequest), contentBytes, newContentBytes);
466
467 if (committed)
468 {
469 proxyWriter.onWritePossible();
470 }
471 else
472 {
473 if (contentLength >= 0)
474 proxyResponse.setContentLength(-1);
475
476
477
478
479
480
481
482
483 proxyResponse.getOutputStream().setWriteListener(proxyWriter);
484 }
485 }
486 catch (Throwable x)
487 {
488 callback.failed(x);
489 }
490 }
491
492 @Override
493 public void onSuccess(final Response serverResponse)
494 {
495 try
496 {
497 if (hasContent)
498 {
499
500
501 if (contentLength < 0)
502 {
503 ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE);
504 ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
505
506 transform(transformer, BufferUtil.EMPTY_BUFFER, true, buffers);
507
508 long newContentBytes = 0;
509 int size = buffers.size();
510 if (size > 0)
511 {
512 Callback callback = size == 1 ? complete : new CountingCallback(complete, size);
513 for (int i = 0; i < size; ++i)
514 {
515 ByteBuffer buffer = buffers.get(i);
516 newContentBytes += buffer.remaining();
517 proxyWriter.offer(buffer, callback);
518 }
519 buffers.clear();
520 }
521 else
522 {
523 proxyWriter.offer(BufferUtil.EMPTY_BUFFER, complete);
524 }
525
526 if (_log.isDebugEnabled())
527 _log.debug("{} downstream content transformation to {} bytes", getRequestId(clientRequest), newContentBytes);
528
529 proxyWriter.onWritePossible();
530 }
531 }
532 else
533 {
534 complete.succeeded();
535 }
536 }
537 catch (Throwable x)
538 {
539 complete.failed(x);
540 }
541 }
542
543 @Override
544 public void onComplete(Result result)
545 {
546 response = result.getResponse();
547 if (result.isSucceeded())
548 complete.succeeded();
549 else
550 complete.failed(result.getFailure());
551 }
552
553 @Override
554 public void succeeded()
555 {
556 cleanup(clientRequest);
557 onProxyResponseSuccess(clientRequest, proxyResponse, response);
558 }
559
560 @Override
561 public void failed(Throwable failure)
562 {
563 cleanup(clientRequest);
564 onProxyResponseFailure(clientRequest, proxyResponse, response, failure);
565 }
566 }
567
568 protected class ProxyWriter implements WriteListener
569 {
570 private final Queue<DeferredContentProvider.Chunk> chunks = new ArrayDeque<>();
571 private final HttpServletRequest clientRequest;
572 private final Response serverResponse;
573 private DeferredContentProvider.Chunk chunk;
574 private boolean writePending;
575
576 protected ProxyWriter(HttpServletRequest clientRequest, Response serverResponse)
577 {
578 this.clientRequest = clientRequest;
579 this.serverResponse = serverResponse;
580 }
581
582 public boolean offer(ByteBuffer content, Callback callback)
583 {
584 if (_log.isDebugEnabled())
585 _log.debug("{} proxying content to downstream: {} bytes {}", getRequestId(clientRequest), content.remaining(), callback);
586 return chunks.offer(new DeferredContentProvider.Chunk(content, callback));
587 }
588
589 @Override
590 public void onWritePossible() throws IOException
591 {
592 ServletOutputStream output = clientRequest.getAsyncContext().getResponse().getOutputStream();
593
594
595 if (writePending)
596 {
597 if (_log.isDebugEnabled())
598 _log.debug("{} pending async write complete of {} on {}", getRequestId(clientRequest), chunk, output);
599 writePending = false;
600 if (succeed(chunk.callback))
601 return;
602 }
603
604 int length = 0;
605 DeferredContentProvider.Chunk chunk = null;
606 while (output.isReady())
607 {
608 if (chunk != null)
609 {
610 if (_log.isDebugEnabled())
611 _log.debug("{} async write complete of {} ({} bytes) on {}", getRequestId(clientRequest), chunk, length, output);
612 if (succeed(chunk.callback))
613 return;
614 }
615
616 this.chunk = chunk = chunks.poll();
617 if (chunk == null)
618 return;
619
620 length = chunk.buffer.remaining();
621 if (length > 0)
622 writeProxyResponseContent(output, chunk.buffer);
623 }
624
625 if (_log.isDebugEnabled())
626 _log.debug("{} async write pending of {} ({} bytes) on {}", getRequestId(clientRequest), chunk, length, output);
627 writePending = true;
628 }
629
630 private boolean succeed(Callback callback)
631 {
632
633
634
635
636
637
638
639
640
641
642 callback.succeeded();
643 return writePending;
644 }
645
646 @Override
647 public void onError(Throwable failure)
648 {
649 DeferredContentProvider.Chunk chunk = this.chunk;
650 if (chunk != null)
651 chunk.callback.failed(failure);
652 else
653 serverResponse.abort(failure);
654 }
655 }
656
657
658
659
660
661
662
663
664
665
666 public interface ContentTransformer
667 {
668
669
670
671 public static final ContentTransformer IDENTITY = new IdentityContentTransformer();
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719 public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException;
720 }
721
722 private static class IdentityContentTransformer implements ContentTransformer
723 {
724 @Override
725 public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output)
726 {
727 output.add(input);
728 }
729 }
730
731 public static class GZIPContentTransformer implements ContentTransformer
732 {
733 private final List<ByteBuffer> buffers = new ArrayList<>(2);
734 private final ContentDecoder decoder = new GZIPContentDecoder();
735 private final ContentTransformer transformer;
736 private final ByteArrayOutputStream out;
737 private final GZIPOutputStream gzipOut;
738
739 public GZIPContentTransformer(ContentTransformer transformer)
740 {
741 try
742 {
743 this.transformer = transformer;
744 this.out = new ByteArrayOutputStream();
745 this.gzipOut = new GZIPOutputStream(out);
746 }
747 catch (IOException x)
748 {
749 throw new RuntimeIOException(x);
750 }
751 }
752
753 @Override
754 public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
755 {
756 if (!input.hasRemaining())
757 {
758 if (finished)
759 transformer.transform(input, true, buffers);
760 }
761 else
762 {
763 while (input.hasRemaining())
764 {
765 ByteBuffer decoded = decoder.decode(input);
766 if (decoded.hasRemaining())
767 transformer.transform(decoded, finished && !input.hasRemaining(), buffers);
768 }
769 }
770
771 if (!buffers.isEmpty() || finished)
772 {
773 ByteBuffer result = gzip(buffers, finished);
774 buffers.clear();
775 output.add(result);
776 }
777 }
778
779 private ByteBuffer gzip(List<ByteBuffer> buffers, boolean finished) throws IOException
780 {
781 for (ByteBuffer buffer : buffers)
782 write(gzipOut, buffer);
783 if (finished)
784 gzipOut.close();
785 byte[] gzipBytes = out.toByteArray();
786 out.reset();
787 return ByteBuffer.wrap(gzipBytes);
788 }
789 }
790 }