View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.proxy;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.IOException;
23  import java.io.OutputStream;
24  import java.nio.ByteBuffer;
25  import java.util.ArrayDeque;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.Queue;
29  import java.util.concurrent.TimeUnit;
30  import java.util.zip.GZIPOutputStream;
31  
32  import javax.servlet.AsyncContext;
33  import javax.servlet.ReadListener;
34  import javax.servlet.ServletConfig;
35  import javax.servlet.ServletException;
36  import javax.servlet.ServletInputStream;
37  import javax.servlet.ServletOutputStream;
38  import javax.servlet.WriteListener;
39  import javax.servlet.http.HttpServletRequest;
40  import javax.servlet.http.HttpServletResponse;
41  
42  import org.eclipse.jetty.client.ContentDecoder;
43  import org.eclipse.jetty.client.GZIPContentDecoder;
44  import org.eclipse.jetty.client.api.ContentProvider;
45  import org.eclipse.jetty.client.api.Request;
46  import org.eclipse.jetty.client.api.Response;
47  import org.eclipse.jetty.client.api.Result;
48  import org.eclipse.jetty.client.util.DeferredContentProvider;
49  import org.eclipse.jetty.http.HttpHeader;
50  import org.eclipse.jetty.http.HttpVersion;
51  import org.eclipse.jetty.io.RuntimeIOException;
52  import org.eclipse.jetty.util.BufferUtil;
53  import org.eclipse.jetty.util.Callback;
54  import org.eclipse.jetty.util.CountingCallback;
55  import org.eclipse.jetty.util.IteratingCallback;
56  import org.eclipse.jetty.util.component.Destroyable;
57  
58  /**
59   * <p>Servlet 3.1 asynchronous proxy servlet with capability
60   * to intercept and modify request/response content.</p>
61   * <p>Both the request processing and the I/O are asynchronous.</p>
62   *
63   * @see ProxyServlet
64   * @see AsyncProxyServlet
65   * @see ConnectHandler
66   */
67  public class AsyncMiddleManServlet extends AbstractProxyServlet
68  {
69      private static final String PROXY_REQUEST_COMMITTED = AsyncMiddleManServlet.class.getName() + ".proxyRequestCommitted";
70      private static final String CLIENT_TRANSFORMER = AsyncMiddleManServlet.class.getName() + ".clientTransformer";
71      private static final String SERVER_TRANSFORMER = AsyncMiddleManServlet.class.getName() + ".serverTransformer";
72  
73      @Override
74      protected void service(HttpServletRequest clientRequest, HttpServletResponse proxyResponse) throws ServletException, IOException
75      {
76          String rewrittenTarget = rewriteTarget(clientRequest);
77          if (_log.isDebugEnabled())
78          {
79              StringBuffer target = clientRequest.getRequestURL();
80              if (clientRequest.getQueryString() != null)
81                  target.append("?").append(clientRequest.getQueryString());
82              _log.debug("{} rewriting: {} -> {}", getRequestId(clientRequest), target, rewrittenTarget);
83          }
84          if (rewrittenTarget == null)
85          {
86              onProxyRewriteFailed(clientRequest, proxyResponse);
87              return;
88          }
89  
90          final Request proxyRequest = getHttpClient().newRequest(rewrittenTarget)
91                  .method(clientRequest.getMethod())
92                  .version(HttpVersion.fromString(clientRequest.getProtocol()));
93  
94          boolean hasContent = hasContent(clientRequest);
95  
96          copyRequestHeaders(clientRequest, proxyRequest);
97  
98          addProxyHeaders(clientRequest, proxyRequest);
99  
100         final AsyncContext asyncContext = clientRequest.startAsync();
101         // We do not timeout the continuation, but the proxy request.
102         asyncContext.setTimeout(0);
103         proxyRequest.timeout(getTimeout(), TimeUnit.MILLISECONDS);
104 
105         // If there is content, the send of the proxy request
106         // is delayed and performed when the content arrives,
107         // to allow optimization of the Content-Length header.
108         if (hasContent)
109             proxyRequest.content(newProxyContentProvider(clientRequest, proxyResponse, proxyRequest));
110         else
111             sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
112     }
113 
114     protected ContentProvider newProxyContentProvider(final HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest) throws IOException
115     {
116         ServletInputStream input = clientRequest.getInputStream();
117         DeferredContentProvider provider = new DeferredContentProvider()
118         {
119             @Override
120             public boolean offer(ByteBuffer buffer, Callback callback)
121             {
122                 if (_log.isDebugEnabled())
123                     _log.debug("{} proxying content to upstream: {} bytes", getRequestId(clientRequest), buffer.remaining());
124                 return super.offer(buffer, callback);
125             }
126         };
127         input.setReadListener(newProxyReadListener(clientRequest, proxyResponse, proxyRequest, provider));
128         return provider;
129     }
130 
131     protected ReadListener newProxyReadListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest, DeferredContentProvider provider)
132     {
133         return new ProxyReader(clientRequest, proxyResponse, proxyRequest, provider);
134     }
135 
136     protected ProxyWriter newProxyWriteListener(HttpServletRequest clientRequest, Response proxyResponse)
137     {
138         return new ProxyWriter(clientRequest, proxyResponse);
139     }
140 
141     protected Response.CompleteListener newProxyResponseListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse)
142     {
143         return new ProxyResponseListener(clientRequest, proxyResponse);
144     }
145 
146     protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
147     {
148         return ContentTransformer.IDENTITY;
149     }
150 
151     protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
152     {
153         return ContentTransformer.IDENTITY;
154     }
155 
156     private void transform(ContentTransformer transformer, ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
157     {
158         try
159         {
160             transformer.transform(input, finished, output);
161         }
162         catch (Throwable x)
163         {
164             _log.info("Exception while transforming " + transformer, x);
165             throw x;
166         }
167     }
168 
169     int readClientRequestContent(ServletInputStream input, byte[] buffer) throws IOException
170     {
171         return input.read(buffer);
172     }
173 
174     void writeProxyResponseContent(ServletOutputStream output, ByteBuffer content) throws IOException
175     {
176         write(output, content);
177     }
178 
179     private static void write(OutputStream output, ByteBuffer content) throws IOException
180     {
181         int length = content.remaining();
182         int offset = 0;
183         byte[] buffer;
184         if (content.hasArray())
185         {
186             offset = content.arrayOffset();
187             buffer = content.array();
188         }
189         else
190         {
191             buffer = new byte[length];
192             content.get(buffer);
193         }
194         output.write(buffer, offset, length);
195     }
196 
197     private void cleanup(HttpServletRequest clientRequest)
198     {
199         ContentTransformer clientTransformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER);
200         if (clientTransformer instanceof Destroyable)
201             ((Destroyable)clientTransformer).destroy();
202         ContentTransformer serverTransformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
203         if (serverTransformer instanceof Destroyable)
204             ((Destroyable)serverTransformer).destroy();
205     }
206 
207     /**
208      * <p>Convenience extension of {@link AsyncMiddleManServlet} that offers transparent proxy functionalities.</p>
209      *
210      * @see org.eclipse.jetty.proxy.AbstractProxyServlet.TransparentDelegate
211      */
212     public static class Transparent extends ProxyServlet
213     {
214         private final TransparentDelegate delegate = new TransparentDelegate(this);
215 
216         @Override
217         public void init(ServletConfig config) throws ServletException
218         {
219             super.init(config);
220             delegate.init(config);
221         }
222 
223         @Override
224         protected String rewriteTarget(HttpServletRequest request)
225         {
226             return delegate.rewriteTarget(request);
227         }
228     }
229 
230     protected class ProxyReader extends IteratingCallback implements ReadListener
231     {
232         private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
233         private final List<ByteBuffer> buffers = new ArrayList<>();
234         private final HttpServletRequest clientRequest;
235         private final HttpServletResponse proxyResponse;
236         private final Request proxyRequest;
237         private final DeferredContentProvider provider;
238         private final int contentLength;
239         private int length;
240 
241         protected ProxyReader(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest, DeferredContentProvider provider)
242         {
243             this.clientRequest = clientRequest;
244             this.proxyResponse = proxyResponse;
245             this.proxyRequest = proxyRequest;
246             this.provider = provider;
247             this.contentLength = clientRequest.getContentLength();
248         }
249 
250         @Override
251         public void onDataAvailable() throws IOException
252         {
253             iterate();
254         }
255 
256         @Override
257         public void onAllDataRead() throws IOException
258         {
259             if (!provider.isClosed())
260             {
261                 process(BufferUtil.EMPTY_BUFFER, new Callback()
262                 {
263                     @Override
264                     public void failed(Throwable x)
265                     {
266                         onError(x);
267                     }
268                 }, true);
269             }
270 
271             if (_log.isDebugEnabled())
272                 _log.debug("{} proxying content to upstream completed", getRequestId(clientRequest));
273         }
274 
275         @Override
276         public void onError(Throwable t)
277         {
278             cleanup(clientRequest);
279             onClientRequestFailure(clientRequest, proxyRequest, proxyResponse, t);
280         }
281 
282         @Override
283         protected Action process() throws Exception
284         {
285             ServletInputStream input = clientRequest.getInputStream();
286             while (input.isReady() && !input.isFinished())
287             {
288                 int read = readClientRequestContent(input, buffer);
289 
290                 if (_log.isDebugEnabled())
291                     _log.debug("{} asynchronous read {} bytes on {}", getRequestId(clientRequest), read, input);
292 
293                 if (read < 0)
294                     return Action.SUCCEEDED;
295 
296                 if (contentLength > 0 && read > 0)
297                     length += read;
298 
299                 ByteBuffer content = read > 0 ? ByteBuffer.wrap(buffer, 0, read) : BufferUtil.EMPTY_BUFFER;
300                 boolean finished = length == contentLength;
301                 process(content, this, finished);
302 
303                 if (read > 0)
304                     return Action.SCHEDULED;
305             }
306 
307             if (input.isFinished())
308             {
309                 if (_log.isDebugEnabled())
310                     _log.debug("{} asynchronous read complete on {}", getRequestId(clientRequest), input);
311                 return Action.SUCCEEDED;
312             }
313             else
314             {
315                 if (_log.isDebugEnabled())
316                     _log.debug("{} asynchronous read pending on {}", getRequestId(clientRequest), input);
317                 return Action.IDLE;
318             }
319         }
320 
321         private void process(ByteBuffer content, Callback callback, boolean finished) throws IOException
322         {
323             ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER);
324             if (transformer == null)
325             {
326                 transformer = newClientRequestContentTransformer(clientRequest, proxyRequest);
327                 clientRequest.setAttribute(CLIENT_TRANSFORMER, transformer);
328             }
329 
330             boolean committed = clientRequest.getAttribute(PROXY_REQUEST_COMMITTED) != null;
331 
332             int contentBytes = content.remaining();
333 
334             // Skip transformation for empty non-last buffers.
335             if (contentBytes == 0 && !finished)
336             {
337                 callback.succeeded();
338                 return;
339             }
340 
341             transform(transformer, content, finished, buffers);
342 
343             int newContentBytes = 0;
344             int size = buffers.size();
345             if (size > 0)
346             {
347                 CountingCallback counter = new CountingCallback(callback, size);
348                 for (int i = 0; i < size; ++i)
349                 {
350                     ByteBuffer buffer = buffers.get(i);
351                     newContentBytes += buffer.remaining();
352                     provider.offer(buffer, counter);
353                 }
354                 buffers.clear();
355             }
356 
357             if (finished)
358                 provider.close();
359 
360             if (_log.isDebugEnabled())
361                 _log.debug("{} upstream content transformation {} -> {} bytes", getRequestId(clientRequest), contentBytes, newContentBytes);
362 
363             if (!committed && (size > 0 || finished))
364             {
365                 proxyRequest.header(HttpHeader.CONTENT_LENGTH, null);
366                 clientRequest.setAttribute(PROXY_REQUEST_COMMITTED, true);
367                 sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
368             }
369 
370             if (size == 0)
371                 callback.succeeded();
372         }
373 
374         @Override
375         protected void onCompleteFailure(Throwable x)
376         {
377             onError(x);
378         }
379     }
380 
381     protected class ProxyResponseListener extends Response.Listener.Adapter implements Callback
382     {
383         private final String WRITE_LISTENER_ATTRIBUTE = AsyncMiddleManServlet.class.getName() + ".writeListener";
384 
385         private final Callback complete = new CountingCallback(this, 2);
386         private final List<ByteBuffer> buffers = new ArrayList<>();
387         private final HttpServletRequest clientRequest;
388         private final HttpServletResponse proxyResponse;
389         private boolean hasContent;
390         private long contentLength;
391         private long length;
392         private Response response;
393 
394         protected ProxyResponseListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse)
395         {
396             this.clientRequest = clientRequest;
397             this.proxyResponse = proxyResponse;
398         }
399 
400         @Override
401         public void onBegin(Response serverResponse)
402         {
403             proxyResponse.setStatus(serverResponse.getStatus());
404         }
405 
406         @Override
407         public void onHeaders(Response serverResponse)
408         {
409             contentLength = serverResponse.getHeaders().getLongField(HttpHeader.CONTENT_LENGTH.asString());
410             onServerResponseHeaders(clientRequest, proxyResponse, serverResponse);
411         }
412 
413         @Override
414         public void onContent(final Response serverResponse, ByteBuffer content, final Callback callback)
415         {
416             try
417             {
418                 int contentBytes = content.remaining();
419                 if (_log.isDebugEnabled())
420                     _log.debug("{} received server content: {} bytes", getRequestId(clientRequest), contentBytes);
421 
422                 hasContent = true;
423 
424                 ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE);
425                 boolean committed = proxyWriter != null;
426                 if (proxyWriter == null)
427                 {
428                     proxyWriter = newProxyWriteListener(clientRequest, serverResponse);
429                     clientRequest.setAttribute(WRITE_LISTENER_ATTRIBUTE, proxyWriter);
430                 }
431 
432                 ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
433                 if (transformer == null)
434                 {
435                     transformer = newServerResponseContentTransformer(clientRequest, proxyResponse, serverResponse);
436                     clientRequest.setAttribute(SERVER_TRANSFORMER, transformer);
437                 }
438 
439                 length += contentBytes;
440 
441                 boolean finished = contentLength >= 0 && length == contentLength;
442                 transform(transformer, content, finished, buffers);
443 
444                 int newContentBytes = 0;
445                 int size = buffers.size();
446                 if (size > 0)
447                 {
448                     Callback counter = size == 1 ? callback : new CountingCallback(callback, size);
449                     for (int i = 0; i < size; ++i)
450                     {
451                         ByteBuffer buffer = buffers.get(i);
452                         newContentBytes += buffer.remaining();
453                         proxyWriter.offer(buffer, counter);
454                     }
455                     buffers.clear();
456                 }
457                 else
458                 {
459                     proxyWriter.offer(BufferUtil.EMPTY_BUFFER, callback);
460                 }
461                 if (finished)
462                     proxyWriter.offer(BufferUtil.EMPTY_BUFFER, complete);
463 
464                 if (_log.isDebugEnabled())
465                     _log.debug("{} downstream content transformation {} -> {} bytes", getRequestId(clientRequest), contentBytes, newContentBytes);
466 
467                 if (committed)
468                 {
469                     proxyWriter.onWritePossible();
470                 }
471                 else
472                 {
473                     if (contentLength >= 0)
474                         proxyResponse.setContentLength(-1);
475 
476                     // Setting the WriteListener triggers an invocation to
477                     // onWritePossible(), possibly on a different thread.
478                     // We cannot succeed the callback from here, otherwise
479                     // we run into a race where the different thread calls
480                     // onWritePossible() and succeeding the callback causes
481                     // this method to be called again, which also may call
482                     // onWritePossible().
483                     proxyResponse.getOutputStream().setWriteListener(proxyWriter);
484                 }
485             }
486             catch (Throwable x)
487             {
488                 callback.failed(x);
489             }
490         }
491 
492         @Override
493         public void onSuccess(final Response serverResponse)
494         {
495             try
496             {
497                 if (hasContent)
498                 {
499                     // If we had unknown length content, we need to call the
500                     // transformer to signal that the content is finished.
501                     if (contentLength < 0)
502                     {
503                         ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE);
504                         ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
505 
506                         transform(transformer, BufferUtil.EMPTY_BUFFER, true, buffers);
507 
508                         long newContentBytes = 0;
509                         int size = buffers.size();
510                         if (size > 0)
511                         {
512                             Callback callback = size == 1 ? complete : new CountingCallback(complete, size);
513                             for (int i = 0; i < size; ++i)
514                             {
515                                 ByteBuffer buffer = buffers.get(i);
516                                 newContentBytes += buffer.remaining();
517                                 proxyWriter.offer(buffer, callback);
518                             }
519                             buffers.clear();
520                         }
521                         else
522                         {
523                             proxyWriter.offer(BufferUtil.EMPTY_BUFFER, complete);
524                         }
525 
526                         if (_log.isDebugEnabled())
527                             _log.debug("{} downstream content transformation to {} bytes", getRequestId(clientRequest), newContentBytes);
528 
529                         proxyWriter.onWritePossible();
530                     }
531                 }
532                 else
533                 {
534                     complete.succeeded();
535                 }
536             }
537             catch (Throwable x)
538             {
539                 complete.failed(x);
540             }
541         }
542 
543         @Override
544         public void onComplete(Result result)
545         {
546             response = result.getResponse();
547             if (result.isSucceeded())
548                 complete.succeeded();
549             else
550                 complete.failed(result.getFailure());
551         }
552 
553         @Override
554         public void succeeded()
555         {
556             cleanup(clientRequest);
557             onProxyResponseSuccess(clientRequest, proxyResponse, response);
558         }
559 
560         @Override
561         public void failed(Throwable failure)
562         {
563             cleanup(clientRequest);
564             onProxyResponseFailure(clientRequest, proxyResponse, response, failure);
565         }
566     }
567 
568     protected class ProxyWriter implements WriteListener
569     {
570         private final Queue<DeferredContentProvider.Chunk> chunks = new ArrayDeque<>();
571         private final HttpServletRequest clientRequest;
572         private final Response serverResponse;
573         private DeferredContentProvider.Chunk chunk;
574         private boolean writePending;
575 
576         protected ProxyWriter(HttpServletRequest clientRequest, Response serverResponse)
577         {
578             this.clientRequest = clientRequest;
579             this.serverResponse = serverResponse;
580         }
581 
582         public boolean offer(ByteBuffer content, Callback callback)
583         {
584             if (_log.isDebugEnabled())
585                 _log.debug("{} proxying content to downstream: {} bytes {}", getRequestId(clientRequest), content.remaining(), callback);
586             return chunks.offer(new DeferredContentProvider.Chunk(content, callback));
587         }
588 
589         @Override
590         public void onWritePossible() throws IOException
591         {
592             ServletOutputStream output = clientRequest.getAsyncContext().getResponse().getOutputStream();
593 
594             // If we had a pending write, let's succeed it.
595             if (writePending)
596             {
597                 if (_log.isDebugEnabled())
598                     _log.debug("{} pending async write complete of {} on {}", getRequestId(clientRequest), chunk, output);
599                 writePending = false;
600                 if (succeed(chunk.callback))
601                     return;
602             }
603 
604             int length = 0;
605             DeferredContentProvider.Chunk chunk = null;
606             while (output.isReady())
607             {
608                 if (chunk != null)
609                 {
610                     if (_log.isDebugEnabled())
611                         _log.debug("{} async write complete of {} ({} bytes) on {}", getRequestId(clientRequest), chunk, length, output);
612                     if (succeed(chunk.callback))
613                         return;
614                 }
615 
616                 this.chunk = chunk = chunks.poll();
617                 if (chunk == null)
618                     return;
619 
620                 length = chunk.buffer.remaining();
621                 if (length > 0)
622                     writeProxyResponseContent(output, chunk.buffer);
623             }
624 
625             if (_log.isDebugEnabled())
626                 _log.debug("{} async write pending of {} ({} bytes) on {}", getRequestId(clientRequest), chunk, length, output);
627             writePending = true;
628         }
629 
630         private boolean succeed(Callback callback)
631         {
632             // Succeeding the callback may cause to reenter in onWritePossible()
633             // because typically the callback is the one that controls whether the
634             // content received from the server has been consumed, so succeeding
635             // the callback causes more content to be received from the server,
636             // and hence more to be written to the client by onWritePossible().
637             // A reentrant call to onWritePossible() performs another write,
638             // which may remain pending, which means that the reentrant call
639             // to onWritePossible() returns all the way back to just after the
640             // succeed of the callback. There, we cannot just loop attempting
641             // write, but we need to check whether we are write pending.
642             callback.succeeded();
643             return writePending;
644         }
645 
646         @Override
647         public void onError(Throwable failure)
648         {
649             DeferredContentProvider.Chunk chunk = this.chunk;
650             if (chunk != null)
651                 chunk.callback.failed(failure);
652             else
653                 serverResponse.abort(failure);
654         }
655     }
656 
657     /**
658      * <p>Allows applications to transform upstream and downstream content.</p>
659      * <p>Typical use cases of transformations are URL rewriting of HTML anchors
660      * (where the value of the <code>href</code> attribute of &lt;a&gt; elements
661      * is modified by the proxy), field renaming of JSON documents, etc.</p>
662      * <p>Applications should override {@link #newClientRequestContentTransformer(HttpServletRequest, Request)}
663      * and/or {@link #newServerResponseContentTransformer(HttpServletRequest, HttpServletResponse, Response)}
664      * to provide the transformer implementation.</p>
665      */
666     public interface ContentTransformer
667     {
668         /**
669          * The identity transformer that does not perform any transformation.
670          */
671         public static final ContentTransformer IDENTITY = new IdentityContentTransformer();
672 
673         /**
674          * <p>Transforms the given input byte buffers into (possibly multiple) byte buffers.</p>
675          * <p>The transformation must happen synchronously in the context of a call
676          * to this method (it is not supported to perform the transformation in another
677          * thread spawned during the call to this method).
678          * The transformation may happen or not, depending on the transformer implementation.
679          * For example, a buffering transformer may buffer the input aside, and only
680          * perform the transformation when the whole input is provided (by looking at the
681          * {@code finished} flag).</p>
682          * <p>The input buffer will be cleared and reused after the call to this method.
683          * Implementations that want to buffer aside the input (or part of it) must copy
684          * the input bytes that they want to buffer.</p>
685          * <p>Typical implementations:</p>
686          * <pre>
687          * // Identity transformation (no transformation, the input is copied to the output)
688          * public void transform(ByteBuffer input, boolean finished, List&lt;ByteBuffer&gt; output)
689          * {
690          *     output.add(input);
691          * }
692          *
693          * // Discard transformation (all input is discarded)
694          * public void transform(ByteBuffer input, boolean finished, List&lt;ByteBuffer&gt; output)
695          * {
696          *     // Empty
697          * }
698          *
699          * // Buffering identity transformation (all input is buffered aside until it is finished)
700          * public void transform(ByteBuffer input, boolean finished, List&lt;ByteBuffer&gt; output)
701          * {
702          *     ByteBuffer copy = ByteBuffer.allocate(input.remaining());
703          *     copy.put(input).flip();
704          *     store(copy);
705          *
706          *     if (finished)
707          *     {
708          *         List&lt;ByteBuffer&gt; copies = retrieve();
709          *         output.addAll(copies);
710          *     }
711          * }
712          * </pre>
713          *
714          * @param input the input content to transform (may be of length zero)
715          * @param finished whether the input content is finished or more will come
716          * @param output where to put the transformed output content
717          * @throws IOException in case of transformation failures
718          */
719         public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException;
720     }
721 
722     private static class IdentityContentTransformer implements ContentTransformer
723     {
724         @Override
725         public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output)
726         {
727             output.add(input);
728         }
729     }
730 
731     public static class GZIPContentTransformer implements ContentTransformer
732     {
733         private final List<ByteBuffer> buffers = new ArrayList<>(2);
734         private final ContentDecoder decoder = new GZIPContentDecoder();
735         private final ContentTransformer transformer;
736         private final ByteArrayOutputStream out;
737         private final GZIPOutputStream gzipOut;
738 
739         public GZIPContentTransformer(ContentTransformer transformer)
740         {
741             try
742             {
743                 this.transformer = transformer;
744                 this.out = new ByteArrayOutputStream();
745                 this.gzipOut = new GZIPOutputStream(out);
746             }
747             catch (IOException x)
748             {
749                 throw new RuntimeIOException(x);
750             }
751         }
752 
753         @Override
754         public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
755         {
756             if (!input.hasRemaining())
757             {
758                 if (finished)
759                     transformer.transform(input, true, buffers);
760             }
761             else
762             {
763                 while (input.hasRemaining())
764                 {
765                     ByteBuffer decoded = decoder.decode(input);
766                     if (decoded.hasRemaining())
767                         transformer.transform(decoded, finished && !input.hasRemaining(), buffers);
768                 }
769             }
770 
771             if (!buffers.isEmpty() || finished)
772             {
773                 ByteBuffer result = gzip(buffers, finished);
774                 buffers.clear();
775                 output.add(result);
776             }
777         }
778 
779         private ByteBuffer gzip(List<ByteBuffer> buffers, boolean finished) throws IOException
780         {
781             for (ByteBuffer buffer : buffers)
782                 write(gzipOut, buffer);
783             if (finished)
784                 gzipOut.close();
785             byte[] gzipBytes = out.toByteArray();
786             out.reset();
787             return ByteBuffer.wrap(gzipBytes);
788         }
789     }
790 }