1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.proxy;
20
21 import java.io.ByteArrayOutputStream;
22 import java.io.IOException;
23 import java.io.OutputStream;
24 import java.nio.ByteBuffer;
25 import java.util.ArrayDeque;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.TimeUnit;
30 import java.util.zip.GZIPOutputStream;
31
32 import javax.servlet.AsyncContext;
33 import javax.servlet.ReadListener;
34 import javax.servlet.ServletConfig;
35 import javax.servlet.ServletException;
36 import javax.servlet.ServletInputStream;
37 import javax.servlet.ServletOutputStream;
38 import javax.servlet.WriteListener;
39 import javax.servlet.http.HttpServletRequest;
40 import javax.servlet.http.HttpServletResponse;
41
42 import org.eclipse.jetty.client.ContentDecoder;
43 import org.eclipse.jetty.client.GZIPContentDecoder;
44 import org.eclipse.jetty.client.api.ContentProvider;
45 import org.eclipse.jetty.client.api.Request;
46 import org.eclipse.jetty.client.api.Response;
47 import org.eclipse.jetty.client.api.Result;
48 import org.eclipse.jetty.client.util.DeferredContentProvider;
49 import org.eclipse.jetty.http.HttpHeader;
50 import org.eclipse.jetty.http.HttpVersion;
51 import org.eclipse.jetty.io.RuntimeIOException;
52 import org.eclipse.jetty.util.BufferUtil;
53 import org.eclipse.jetty.util.Callback;
54 import org.eclipse.jetty.util.CountingCallback;
55 import org.eclipse.jetty.util.IteratingCallback;
56 import org.eclipse.jetty.util.component.Destroyable;
57
58
59
60
61
62
63
64
65
66
67 public class AsyncMiddleManServlet extends AbstractProxyServlet
68 {
69 private static final String PROXY_REQUEST_COMMITTED = AsyncMiddleManServlet.class.getName() + ".proxyRequestCommitted";
70 private static final String CLIENT_TRANSFORMER = AsyncMiddleManServlet.class.getName() + ".clientTransformer";
71 private static final String SERVER_TRANSFORMER = AsyncMiddleManServlet.class.getName() + ".serverTransformer";
72
73 @Override
74 protected void service(HttpServletRequest clientRequest, HttpServletResponse proxyResponse) throws ServletException, IOException
75 {
76 String rewrittenTarget = rewriteTarget(clientRequest);
77 if (_log.isDebugEnabled())
78 {
79 StringBuffer target = clientRequest.getRequestURL();
80 if (clientRequest.getQueryString() != null)
81 target.append("?").append(clientRequest.getQueryString());
82 _log.debug("{} rewriting: {} -> {}", getRequestId(clientRequest), target, rewrittenTarget);
83 }
84 if (rewrittenTarget == null)
85 {
86 onProxyRewriteFailed(clientRequest, proxyResponse);
87 return;
88 }
89
90 final Request proxyRequest = getHttpClient().newRequest(rewrittenTarget)
91 .method(clientRequest.getMethod())
92 .version(HttpVersion.fromString(clientRequest.getProtocol()));
93
94 boolean hasContent = hasContent(clientRequest);
95
96 copyRequestHeaders(clientRequest, proxyRequest);
97
98 addProxyHeaders(clientRequest, proxyRequest);
99
100 final AsyncContext asyncContext = clientRequest.startAsync();
101
102 asyncContext.setTimeout(0);
103 proxyRequest.timeout(getTimeout(), TimeUnit.MILLISECONDS);
104
105
106
107
108 if (hasContent)
109 proxyRequest.content(newProxyContentProvider(clientRequest, proxyResponse, proxyRequest));
110 else
111 sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
112 }
113
114 protected ContentProvider newProxyContentProvider(final HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest) throws IOException
115 {
116 ServletInputStream input = clientRequest.getInputStream();
117 DeferredContentProvider provider = new DeferredContentProvider()
118 {
119 @Override
120 public boolean offer(ByteBuffer buffer, Callback callback)
121 {
122 if (_log.isDebugEnabled())
123 _log.debug("{} proxying content to upstream: {} bytes", getRequestId(clientRequest), buffer.remaining());
124 return super.offer(buffer, callback);
125 }
126 };
127 input.setReadListener(newProxyReadListener(clientRequest, proxyResponse, proxyRequest, provider));
128 return provider;
129 }
130
131 protected ReadListener newProxyReadListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest, DeferredContentProvider provider)
132 {
133 return new ProxyReader(clientRequest, proxyResponse, proxyRequest, provider);
134 }
135
136 protected ProxyWriter newProxyWriteListener(HttpServletRequest clientRequest, Response proxyResponse)
137 {
138 return new ProxyWriter(clientRequest, proxyResponse);
139 }
140
141 @Override
142 protected Response.CompleteListener newProxyResponseListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse)
143 {
144 return new ProxyResponseListener(clientRequest, proxyResponse);
145 }
146
147 protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
148 {
149 return ContentTransformer.IDENTITY;
150 }
151
152 protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
153 {
154 return ContentTransformer.IDENTITY;
155 }
156
157 private void transform(ContentTransformer transformer, ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
158 {
159 try
160 {
161 transformer.transform(input, finished, output);
162 }
163 catch (Throwable x)
164 {
165 _log.info("Exception while transforming " + transformer, x);
166 throw x;
167 }
168 }
169
170 int readClientRequestContent(ServletInputStream input, byte[] buffer) throws IOException
171 {
172 return input.read(buffer);
173 }
174
175 void writeProxyResponseContent(ServletOutputStream output, ByteBuffer content) throws IOException
176 {
177 write(output, content);
178 }
179
180 private static void write(OutputStream output, ByteBuffer content) throws IOException
181 {
182 int length = content.remaining();
183 int offset = 0;
184 byte[] buffer;
185 if (content.hasArray())
186 {
187 offset = content.arrayOffset();
188 buffer = content.array();
189 }
190 else
191 {
192 buffer = new byte[length];
193 content.get(buffer);
194 }
195 output.write(buffer, offset, length);
196 }
197
198 private void cleanup(HttpServletRequest clientRequest)
199 {
200 ContentTransformer clientTransformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER);
201 if (clientTransformer instanceof Destroyable)
202 ((Destroyable)clientTransformer).destroy();
203 ContentTransformer serverTransformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
204 if (serverTransformer instanceof Destroyable)
205 ((Destroyable)serverTransformer).destroy();
206 }
207
208
209
210
211
212
213 public static class Transparent extends ProxyServlet
214 {
215 private final TransparentDelegate delegate = new TransparentDelegate(this);
216
217 @Override
218 public void init(ServletConfig config) throws ServletException
219 {
220 super.init(config);
221 delegate.init(config);
222 }
223
224 @Override
225 protected String rewriteTarget(HttpServletRequest request)
226 {
227 return delegate.rewriteTarget(request);
228 }
229 }
230
231 protected class ProxyReader extends IteratingCallback implements ReadListener
232 {
233 private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
234 private final List<ByteBuffer> buffers = new ArrayList<>();
235 private final HttpServletRequest clientRequest;
236 private final HttpServletResponse proxyResponse;
237 private final Request proxyRequest;
238 private final DeferredContentProvider provider;
239 private final int contentLength;
240 private int length;
241
242 protected ProxyReader(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest, DeferredContentProvider provider)
243 {
244 this.clientRequest = clientRequest;
245 this.proxyResponse = proxyResponse;
246 this.proxyRequest = proxyRequest;
247 this.provider = provider;
248 this.contentLength = clientRequest.getContentLength();
249 }
250
251 @Override
252 public void onDataAvailable() throws IOException
253 {
254 iterate();
255 }
256
257 @Override
258 public void onAllDataRead() throws IOException
259 {
260 if (!provider.isClosed())
261 {
262 process(BufferUtil.EMPTY_BUFFER, new Callback()
263 {
264 @Override
265 public void failed(Throwable x)
266 {
267 onError(x);
268 }
269 }, true);
270 }
271
272 if (_log.isDebugEnabled())
273 _log.debug("{} proxying content to upstream completed", getRequestId(clientRequest));
274 }
275
276 @Override
277 public void onError(Throwable t)
278 {
279 cleanup(clientRequest);
280 onClientRequestFailure(clientRequest, proxyRequest, proxyResponse, t);
281 }
282
283 @Override
284 protected Action process() throws Exception
285 {
286 ServletInputStream input = clientRequest.getInputStream();
287 while (input.isReady() && !input.isFinished())
288 {
289 int read = readClientRequestContent(input, buffer);
290
291 if (_log.isDebugEnabled())
292 _log.debug("{} asynchronous read {} bytes on {}", getRequestId(clientRequest), read, input);
293
294 if (read < 0)
295 return Action.SUCCEEDED;
296
297 if (contentLength > 0 && read > 0)
298 length += read;
299
300 ByteBuffer content = read > 0 ? ByteBuffer.wrap(buffer, 0, read) : BufferUtil.EMPTY_BUFFER;
301 boolean finished = length == contentLength;
302 process(content, this, finished);
303
304 if (read > 0)
305 return Action.SCHEDULED;
306 }
307
308 if (input.isFinished())
309 {
310 if (_log.isDebugEnabled())
311 _log.debug("{} asynchronous read complete on {}", getRequestId(clientRequest), input);
312 return Action.SUCCEEDED;
313 }
314 else
315 {
316 if (_log.isDebugEnabled())
317 _log.debug("{} asynchronous read pending on {}", getRequestId(clientRequest), input);
318 return Action.IDLE;
319 }
320 }
321
322 private void process(ByteBuffer content, Callback callback, boolean finished) throws IOException
323 {
324 ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER);
325 if (transformer == null)
326 {
327 transformer = newClientRequestContentTransformer(clientRequest, proxyRequest);
328 clientRequest.setAttribute(CLIENT_TRANSFORMER, transformer);
329 }
330
331 boolean committed = clientRequest.getAttribute(PROXY_REQUEST_COMMITTED) != null;
332
333 int contentBytes = content.remaining();
334
335
336 if (contentBytes == 0 && !finished)
337 {
338 callback.succeeded();
339 return;
340 }
341
342 transform(transformer, content, finished, buffers);
343
344 int newContentBytes = 0;
345 int size = buffers.size();
346 if (size > 0)
347 {
348 CountingCallback counter = new CountingCallback(callback, size);
349 for (int i = 0; i < size; ++i)
350 {
351 ByteBuffer buffer = buffers.get(i);
352 newContentBytes += buffer.remaining();
353 provider.offer(buffer, counter);
354 }
355 buffers.clear();
356 }
357
358 if (finished)
359 provider.close();
360
361 if (_log.isDebugEnabled())
362 _log.debug("{} upstream content transformation {} -> {} bytes", getRequestId(clientRequest), contentBytes, newContentBytes);
363
364 if (!committed && (size > 0 || finished))
365 {
366 proxyRequest.header(HttpHeader.CONTENT_LENGTH, null);
367 clientRequest.setAttribute(PROXY_REQUEST_COMMITTED, true);
368 sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
369 }
370
371 if (size == 0)
372 callback.succeeded();
373 }
374
375 @Override
376 protected void onCompleteFailure(Throwable x)
377 {
378 onError(x);
379 }
380 }
381
382 protected class ProxyResponseListener extends Response.Listener.Adapter implements Callback
383 {
384 private final String WRITE_LISTENER_ATTRIBUTE = AsyncMiddleManServlet.class.getName() + ".writeListener";
385
386 private final Callback complete = new CountingCallback(this, 2);
387 private final List<ByteBuffer> buffers = new ArrayList<>();
388 private final HttpServletRequest clientRequest;
389 private final HttpServletResponse proxyResponse;
390 private boolean hasContent;
391 private long contentLength;
392 private long length;
393 private Response response;
394
395 protected ProxyResponseListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse)
396 {
397 this.clientRequest = clientRequest;
398 this.proxyResponse = proxyResponse;
399 }
400
401 @Override
402 public void onBegin(Response serverResponse)
403 {
404 proxyResponse.setStatus(serverResponse.getStatus());
405 }
406
407 @Override
408 public void onHeaders(Response serverResponse)
409 {
410 contentLength = serverResponse.getHeaders().getLongField(HttpHeader.CONTENT_LENGTH.asString());
411 onServerResponseHeaders(clientRequest, proxyResponse, serverResponse);
412 }
413
414 @Override
415 public void onContent(final Response serverResponse, ByteBuffer content, final Callback callback)
416 {
417 try
418 {
419 int contentBytes = content.remaining();
420 if (_log.isDebugEnabled())
421 _log.debug("{} received server content: {} bytes", getRequestId(clientRequest), contentBytes);
422
423 hasContent = true;
424
425 ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE);
426 boolean committed = proxyWriter != null;
427 if (proxyWriter == null)
428 {
429 proxyWriter = newProxyWriteListener(clientRequest, serverResponse);
430 clientRequest.setAttribute(WRITE_LISTENER_ATTRIBUTE, proxyWriter);
431 }
432
433 ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
434 if (transformer == null)
435 {
436 transformer = newServerResponseContentTransformer(clientRequest, proxyResponse, serverResponse);
437 clientRequest.setAttribute(SERVER_TRANSFORMER, transformer);
438 }
439
440 length += contentBytes;
441
442 boolean finished = contentLength >= 0 && length == contentLength;
443 transform(transformer, content, finished, buffers);
444
445 int newContentBytes = 0;
446 int size = buffers.size();
447 if (size > 0)
448 {
449 Callback counter = size == 1 ? callback : new CountingCallback(callback, size);
450 for (int i = 0; i < size; ++i)
451 {
452 ByteBuffer buffer = buffers.get(i);
453 newContentBytes += buffer.remaining();
454 proxyWriter.offer(buffer, counter);
455 }
456 buffers.clear();
457 }
458 else
459 {
460 proxyWriter.offer(BufferUtil.EMPTY_BUFFER, callback);
461 }
462 if (finished)
463 proxyWriter.offer(BufferUtil.EMPTY_BUFFER, complete);
464
465 if (_log.isDebugEnabled())
466 _log.debug("{} downstream content transformation {} -> {} bytes", getRequestId(clientRequest), contentBytes, newContentBytes);
467
468 if (committed)
469 {
470 proxyWriter.onWritePossible();
471 }
472 else
473 {
474 if (contentLength >= 0)
475 proxyResponse.setContentLength(-1);
476
477
478
479
480
481
482
483
484 proxyResponse.getOutputStream().setWriteListener(proxyWriter);
485 }
486 }
487 catch (Throwable x)
488 {
489 callback.failed(x);
490 }
491 }
492
493 @Override
494 public void onSuccess(final Response serverResponse)
495 {
496 try
497 {
498 if (hasContent)
499 {
500
501
502 if (contentLength < 0)
503 {
504 ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE);
505 ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
506
507 transform(transformer, BufferUtil.EMPTY_BUFFER, true, buffers);
508
509 long newContentBytes = 0;
510 int size = buffers.size();
511 if (size > 0)
512 {
513 Callback callback = size == 1 ? complete : new CountingCallback(complete, size);
514 for (int i = 0; i < size; ++i)
515 {
516 ByteBuffer buffer = buffers.get(i);
517 newContentBytes += buffer.remaining();
518 proxyWriter.offer(buffer, callback);
519 }
520 buffers.clear();
521 }
522 else
523 {
524 proxyWriter.offer(BufferUtil.EMPTY_BUFFER, complete);
525 }
526
527 if (_log.isDebugEnabled())
528 _log.debug("{} downstream content transformation to {} bytes", getRequestId(clientRequest), newContentBytes);
529
530 proxyWriter.onWritePossible();
531 }
532 }
533 else
534 {
535 complete.succeeded();
536 }
537 }
538 catch (Throwable x)
539 {
540 complete.failed(x);
541 }
542 }
543
544 @Override
545 public void onComplete(Result result)
546 {
547 response = result.getResponse();
548 if (result.isSucceeded())
549 complete.succeeded();
550 else
551 complete.failed(result.getFailure());
552 }
553
554 @Override
555 public void succeeded()
556 {
557 cleanup(clientRequest);
558 onProxyResponseSuccess(clientRequest, proxyResponse, response);
559 }
560
561 @Override
562 public void failed(Throwable failure)
563 {
564 cleanup(clientRequest);
565 onProxyResponseFailure(clientRequest, proxyResponse, response, failure);
566 }
567 }
568
569 protected class ProxyWriter implements WriteListener
570 {
571 private final Queue<DeferredContentProvider.Chunk> chunks = new ArrayDeque<>();
572 private final HttpServletRequest clientRequest;
573 private final Response serverResponse;
574 private DeferredContentProvider.Chunk chunk;
575 private boolean writePending;
576
577 protected ProxyWriter(HttpServletRequest clientRequest, Response serverResponse)
578 {
579 this.clientRequest = clientRequest;
580 this.serverResponse = serverResponse;
581 }
582
583 public boolean offer(ByteBuffer content, Callback callback)
584 {
585 if (_log.isDebugEnabled())
586 _log.debug("{} proxying content to downstream: {} bytes {}", getRequestId(clientRequest), content.remaining(), callback);
587 return chunks.offer(new DeferredContentProvider.Chunk(content, callback));
588 }
589
590 @Override
591 public void onWritePossible() throws IOException
592 {
593 ServletOutputStream output = clientRequest.getAsyncContext().getResponse().getOutputStream();
594
595
596 if (writePending)
597 {
598 if (_log.isDebugEnabled())
599 _log.debug("{} pending async write complete of {} on {}", getRequestId(clientRequest), chunk, output);
600 writePending = false;
601 if (succeed(chunk.callback))
602 return;
603 }
604
605 int length = 0;
606 DeferredContentProvider.Chunk chunk = null;
607 while (output.isReady())
608 {
609 if (chunk != null)
610 {
611 if (_log.isDebugEnabled())
612 _log.debug("{} async write complete of {} ({} bytes) on {}", getRequestId(clientRequest), chunk, length, output);
613 if (succeed(chunk.callback))
614 return;
615 }
616
617 this.chunk = chunk = chunks.poll();
618 if (chunk == null)
619 return;
620
621 length = chunk.buffer.remaining();
622 if (length > 0)
623 writeProxyResponseContent(output, chunk.buffer);
624 }
625
626 if (_log.isDebugEnabled())
627 _log.debug("{} async write pending of {} ({} bytes) on {}", getRequestId(clientRequest), chunk, length, output);
628 writePending = true;
629 }
630
631 private boolean succeed(Callback callback)
632 {
633
634
635
636
637
638
639
640
641
642
643 callback.succeeded();
644 return writePending;
645 }
646
647 @Override
648 public void onError(Throwable failure)
649 {
650 DeferredContentProvider.Chunk chunk = this.chunk;
651 if (chunk != null)
652 chunk.callback.failed(failure);
653 else
654 serverResponse.abort(failure);
655 }
656 }
657
658
659
660
661
662
663
664
665
666
667 public interface ContentTransformer
668 {
669
670
671
672 public static final ContentTransformer IDENTITY = new IdentityContentTransformer();
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720 public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException;
721 }
722
723 private static class IdentityContentTransformer implements ContentTransformer
724 {
725 @Override
726 public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output)
727 {
728 output.add(input);
729 }
730 }
731
732 public static class GZIPContentTransformer implements ContentTransformer
733 {
734 private final List<ByteBuffer> buffers = new ArrayList<>(2);
735 private final ContentDecoder decoder = new GZIPContentDecoder();
736 private final ContentTransformer transformer;
737 private final ByteArrayOutputStream out;
738 private final GZIPOutputStream gzipOut;
739
740 public GZIPContentTransformer(ContentTransformer transformer)
741 {
742 try
743 {
744 this.transformer = transformer;
745 this.out = new ByteArrayOutputStream();
746 this.gzipOut = new GZIPOutputStream(out);
747 }
748 catch (IOException x)
749 {
750 throw new RuntimeIOException(x);
751 }
752 }
753
754 @Override
755 public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
756 {
757 if (!input.hasRemaining())
758 {
759 if (finished)
760 transformer.transform(input, true, buffers);
761 }
762 else
763 {
764 while (input.hasRemaining())
765 {
766 ByteBuffer decoded = decoder.decode(input);
767 if (decoded.hasRemaining())
768 transformer.transform(decoded, finished && !input.hasRemaining(), buffers);
769 }
770 }
771
772 if (!buffers.isEmpty() || finished)
773 {
774 ByteBuffer result = gzip(buffers, finished);
775 buffers.clear();
776 output.add(result);
777 }
778 }
779
780 private ByteBuffer gzip(List<ByteBuffer> buffers, boolean finished) throws IOException
781 {
782 for (ByteBuffer buffer : buffers)
783 write(gzipOut, buffer);
784 if (finished)
785 gzipOut.close();
786 byte[] gzipBytes = out.toByteArray();
787 out.reset();
788 return ByteBuffer.wrap(gzipBytes);
789 }
790 }
791 }