View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.proxy;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.IOException;
23  import java.io.OutputStream;
24  import java.nio.ByteBuffer;
25  import java.util.ArrayDeque;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.Queue;
29  import java.util.concurrent.TimeUnit;
30  import java.util.zip.GZIPOutputStream;
31  
32  import javax.servlet.AsyncContext;
33  import javax.servlet.ReadListener;
34  import javax.servlet.ServletConfig;
35  import javax.servlet.ServletException;
36  import javax.servlet.ServletInputStream;
37  import javax.servlet.ServletOutputStream;
38  import javax.servlet.WriteListener;
39  import javax.servlet.http.HttpServletRequest;
40  import javax.servlet.http.HttpServletResponse;
41  
42  import org.eclipse.jetty.client.ContentDecoder;
43  import org.eclipse.jetty.client.GZIPContentDecoder;
44  import org.eclipse.jetty.client.api.ContentProvider;
45  import org.eclipse.jetty.client.api.Request;
46  import org.eclipse.jetty.client.api.Response;
47  import org.eclipse.jetty.client.api.Result;
48  import org.eclipse.jetty.client.util.DeferredContentProvider;
49  import org.eclipse.jetty.http.HttpHeader;
50  import org.eclipse.jetty.http.HttpVersion;
51  import org.eclipse.jetty.io.RuntimeIOException;
52  import org.eclipse.jetty.util.BufferUtil;
53  import org.eclipse.jetty.util.Callback;
54  import org.eclipse.jetty.util.CountingCallback;
55  import org.eclipse.jetty.util.IteratingCallback;
56  import org.eclipse.jetty.util.component.Destroyable;
57  
58  /**
59   * <p>Servlet 3.1 asynchronous proxy servlet with capability
60   * to intercept and modify request/response content.</p>
61   * <p>Both the request processing and the I/O are asynchronous.</p>
62   *
63   * @see ProxyServlet
64   * @see AsyncProxyServlet
65   * @see ConnectHandler
66   */
67  public class AsyncMiddleManServlet extends AbstractProxyServlet
68  {
69      private static final String PROXY_REQUEST_COMMITTED = AsyncMiddleManServlet.class.getName() + ".proxyRequestCommitted";
70      private static final String CLIENT_TRANSFORMER = AsyncMiddleManServlet.class.getName() + ".clientTransformer";
71      private static final String SERVER_TRANSFORMER = AsyncMiddleManServlet.class.getName() + ".serverTransformer";
72  
73      @Override
74      protected void service(HttpServletRequest clientRequest, HttpServletResponse proxyResponse) throws ServletException, IOException
75      {
76          String rewrittenTarget = rewriteTarget(clientRequest);
77          if (_log.isDebugEnabled())
78          {
79              StringBuffer target = clientRequest.getRequestURL();
80              if (clientRequest.getQueryString() != null)
81                  target.append("?").append(clientRequest.getQueryString());
82              _log.debug("{} rewriting: {} -> {}", getRequestId(clientRequest), target, rewrittenTarget);
83          }
84          if (rewrittenTarget == null)
85          {
86              onProxyRewriteFailed(clientRequest, proxyResponse);
87              return;
88          }
89  
90          final Request proxyRequest = getHttpClient().newRequest(rewrittenTarget)
91                  .method(clientRequest.getMethod())
92                  .version(HttpVersion.fromString(clientRequest.getProtocol()));
93  
94          boolean hasContent = hasContent(clientRequest);
95  
96          copyRequestHeaders(clientRequest, proxyRequest);
97  
98          addProxyHeaders(clientRequest, proxyRequest);
99  
100         final AsyncContext asyncContext = clientRequest.startAsync();
101         // We do not timeout the continuation, but the proxy request.
102         asyncContext.setTimeout(0);
103         proxyRequest.timeout(getTimeout(), TimeUnit.MILLISECONDS);
104 
105         // If there is content, the send of the proxy request
106         // is delayed and performed when the content arrives,
107         // to allow optimization of the Content-Length header.
108         if (hasContent)
109             proxyRequest.content(newProxyContentProvider(clientRequest, proxyResponse, proxyRequest));
110         else
111             sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
112     }
113 
114     protected ContentProvider newProxyContentProvider(final HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest) throws IOException
115     {
116         ServletInputStream input = clientRequest.getInputStream();
117         DeferredContentProvider provider = new DeferredContentProvider()
118         {
119             @Override
120             public boolean offer(ByteBuffer buffer, Callback callback)
121             {
122                 if (_log.isDebugEnabled())
123                     _log.debug("{} proxying content to upstream: {} bytes", getRequestId(clientRequest), buffer.remaining());
124                 return super.offer(buffer, callback);
125             }
126         };
127         input.setReadListener(newProxyReadListener(clientRequest, proxyResponse, proxyRequest, provider));
128         return provider;
129     }
130 
131     protected ReadListener newProxyReadListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest, DeferredContentProvider provider)
132     {
133         return new ProxyReader(clientRequest, proxyResponse, proxyRequest, provider);
134     }
135 
136     protected ProxyWriter newProxyWriteListener(HttpServletRequest clientRequest, Response proxyResponse)
137     {
138         return new ProxyWriter(clientRequest, proxyResponse);
139     }
140 
141     @Override
142     protected Response.CompleteListener newProxyResponseListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse)
143     {
144         return new ProxyResponseListener(clientRequest, proxyResponse);
145     }
146 
147     protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
148     {
149         return ContentTransformer.IDENTITY;
150     }
151 
152     protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
153     {
154         return ContentTransformer.IDENTITY;
155     }
156 
157     private void transform(ContentTransformer transformer, ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
158     {
159         try
160         {
161             transformer.transform(input, finished, output);
162         }
163         catch (Throwable x)
164         {
165             _log.info("Exception while transforming " + transformer, x);
166             throw x;
167         }
168     }
169 
170     int readClientRequestContent(ServletInputStream input, byte[] buffer) throws IOException
171     {
172         return input.read(buffer);
173     }
174 
175     void writeProxyResponseContent(ServletOutputStream output, ByteBuffer content) throws IOException
176     {
177         write(output, content);
178     }
179 
180     private static void write(OutputStream output, ByteBuffer content) throws IOException
181     {
182         int length = content.remaining();
183         int offset = 0;
184         byte[] buffer;
185         if (content.hasArray())
186         {
187             offset = content.arrayOffset();
188             buffer = content.array();
189         }
190         else
191         {
192             buffer = new byte[length];
193             content.get(buffer);
194         }
195         output.write(buffer, offset, length);
196     }
197 
198     private void cleanup(HttpServletRequest clientRequest)
199     {
200         ContentTransformer clientTransformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER);
201         if (clientTransformer instanceof Destroyable)
202             ((Destroyable)clientTransformer).destroy();
203         ContentTransformer serverTransformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
204         if (serverTransformer instanceof Destroyable)
205             ((Destroyable)serverTransformer).destroy();
206     }
207 
208     /**
209      * <p>Convenience extension of {@link AsyncMiddleManServlet} that offers transparent proxy functionalities.</p>
210      *
211      * @see org.eclipse.jetty.proxy.AbstractProxyServlet.TransparentDelegate
212      */
213     public static class Transparent extends ProxyServlet
214     {
215         private final TransparentDelegate delegate = new TransparentDelegate(this);
216 
217         @Override
218         public void init(ServletConfig config) throws ServletException
219         {
220             super.init(config);
221             delegate.init(config);
222         }
223 
224         @Override
225         protected String rewriteTarget(HttpServletRequest request)
226         {
227             return delegate.rewriteTarget(request);
228         }
229     }
230 
231     protected class ProxyReader extends IteratingCallback implements ReadListener
232     {
233         private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
234         private final List<ByteBuffer> buffers = new ArrayList<>();
235         private final HttpServletRequest clientRequest;
236         private final HttpServletResponse proxyResponse;
237         private final Request proxyRequest;
238         private final DeferredContentProvider provider;
239         private final int contentLength;
240         private int length;
241 
242         protected ProxyReader(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest, DeferredContentProvider provider)
243         {
244             this.clientRequest = clientRequest;
245             this.proxyResponse = proxyResponse;
246             this.proxyRequest = proxyRequest;
247             this.provider = provider;
248             this.contentLength = clientRequest.getContentLength();
249         }
250 
251         @Override
252         public void onDataAvailable() throws IOException
253         {
254             iterate();
255         }
256 
257         @Override
258         public void onAllDataRead() throws IOException
259         {
260             if (!provider.isClosed())
261             {
262                 process(BufferUtil.EMPTY_BUFFER, new Callback()
263                 {
264                     @Override
265                     public void failed(Throwable x)
266                     {
267                         onError(x);
268                     }
269                 }, true);
270             }
271 
272             if (_log.isDebugEnabled())
273                 _log.debug("{} proxying content to upstream completed", getRequestId(clientRequest));
274         }
275 
276         @Override
277         public void onError(Throwable t)
278         {
279             cleanup(clientRequest);
280             onClientRequestFailure(clientRequest, proxyRequest, proxyResponse, t);
281         }
282 
283         @Override
284         protected Action process() throws Exception
285         {
286             ServletInputStream input = clientRequest.getInputStream();
287             while (input.isReady() && !input.isFinished())
288             {
289                 int read = readClientRequestContent(input, buffer);
290 
291                 if (_log.isDebugEnabled())
292                     _log.debug("{} asynchronous read {} bytes on {}", getRequestId(clientRequest), read, input);
293 
294                 if (read < 0)
295                     return Action.SUCCEEDED;
296 
297                 if (contentLength > 0 && read > 0)
298                     length += read;
299 
300                 ByteBuffer content = read > 0 ? ByteBuffer.wrap(buffer, 0, read) : BufferUtil.EMPTY_BUFFER;
301                 boolean finished = length == contentLength;
302                 process(content, this, finished);
303 
304                 if (read > 0)
305                     return Action.SCHEDULED;
306             }
307 
308             if (input.isFinished())
309             {
310                 if (_log.isDebugEnabled())
311                     _log.debug("{} asynchronous read complete on {}", getRequestId(clientRequest), input);
312                 return Action.SUCCEEDED;
313             }
314             else
315             {
316                 if (_log.isDebugEnabled())
317                     _log.debug("{} asynchronous read pending on {}", getRequestId(clientRequest), input);
318                 return Action.IDLE;
319             }
320         }
321 
322         private void process(ByteBuffer content, Callback callback, boolean finished) throws IOException
323         {
324             ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER);
325             if (transformer == null)
326             {
327                 transformer = newClientRequestContentTransformer(clientRequest, proxyRequest);
328                 clientRequest.setAttribute(CLIENT_TRANSFORMER, transformer);
329             }
330 
331             boolean committed = clientRequest.getAttribute(PROXY_REQUEST_COMMITTED) != null;
332 
333             int contentBytes = content.remaining();
334 
335             // Skip transformation for empty non-last buffers.
336             if (contentBytes == 0 && !finished)
337             {
338                 callback.succeeded();
339                 return;
340             }
341 
342             transform(transformer, content, finished, buffers);
343 
344             int newContentBytes = 0;
345             int size = buffers.size();
346             if (size > 0)
347             {
348                 CountingCallback counter = new CountingCallback(callback, size);
349                 for (int i = 0; i < size; ++i)
350                 {
351                     ByteBuffer buffer = buffers.get(i);
352                     newContentBytes += buffer.remaining();
353                     provider.offer(buffer, counter);
354                 }
355                 buffers.clear();
356             }
357 
358             if (finished)
359                 provider.close();
360 
361             if (_log.isDebugEnabled())
362                 _log.debug("{} upstream content transformation {} -> {} bytes", getRequestId(clientRequest), contentBytes, newContentBytes);
363 
364             if (!committed && (size > 0 || finished))
365             {
366                 proxyRequest.header(HttpHeader.CONTENT_LENGTH, null);
367                 clientRequest.setAttribute(PROXY_REQUEST_COMMITTED, true);
368                 sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
369             }
370 
371             if (size == 0)
372                 callback.succeeded();
373         }
374 
375         @Override
376         protected void onCompleteFailure(Throwable x)
377         {
378             onError(x);
379         }
380     }
381 
382     protected class ProxyResponseListener extends Response.Listener.Adapter implements Callback
383     {
384         private final String WRITE_LISTENER_ATTRIBUTE = AsyncMiddleManServlet.class.getName() + ".writeListener";
385 
386         private final Callback complete = new CountingCallback(this, 2);
387         private final List<ByteBuffer> buffers = new ArrayList<>();
388         private final HttpServletRequest clientRequest;
389         private final HttpServletResponse proxyResponse;
390         private boolean hasContent;
391         private long contentLength;
392         private long length;
393         private Response response;
394 
395         protected ProxyResponseListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse)
396         {
397             this.clientRequest = clientRequest;
398             this.proxyResponse = proxyResponse;
399         }
400 
401         @Override
402         public void onBegin(Response serverResponse)
403         {
404             proxyResponse.setStatus(serverResponse.getStatus());
405         }
406 
407         @Override
408         public void onHeaders(Response serverResponse)
409         {
410             contentLength = serverResponse.getHeaders().getLongField(HttpHeader.CONTENT_LENGTH.asString());
411             onServerResponseHeaders(clientRequest, proxyResponse, serverResponse);
412         }
413 
414         @Override
415         public void onContent(final Response serverResponse, ByteBuffer content, final Callback callback)
416         {
417             try
418             {
419                 int contentBytes = content.remaining();
420                 if (_log.isDebugEnabled())
421                     _log.debug("{} received server content: {} bytes", getRequestId(clientRequest), contentBytes);
422 
423                 hasContent = true;
424 
425                 ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE);
426                 boolean committed = proxyWriter != null;
427                 if (proxyWriter == null)
428                 {
429                     proxyWriter = newProxyWriteListener(clientRequest, serverResponse);
430                     clientRequest.setAttribute(WRITE_LISTENER_ATTRIBUTE, proxyWriter);
431                 }
432 
433                 ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
434                 if (transformer == null)
435                 {
436                     transformer = newServerResponseContentTransformer(clientRequest, proxyResponse, serverResponse);
437                     clientRequest.setAttribute(SERVER_TRANSFORMER, transformer);
438                 }
439 
440                 length += contentBytes;
441 
442                 boolean finished = contentLength >= 0 && length == contentLength;
443                 transform(transformer, content, finished, buffers);
444 
445                 int newContentBytes = 0;
446                 int size = buffers.size();
447                 if (size > 0)
448                 {
449                     Callback counter = size == 1 ? callback : new CountingCallback(callback, size);
450                     for (int i = 0; i < size; ++i)
451                     {
452                         ByteBuffer buffer = buffers.get(i);
453                         newContentBytes += buffer.remaining();
454                         proxyWriter.offer(buffer, counter);
455                     }
456                     buffers.clear();
457                 }
458                 else
459                 {
460                     proxyWriter.offer(BufferUtil.EMPTY_BUFFER, callback);
461                 }
462                 if (finished)
463                     proxyWriter.offer(BufferUtil.EMPTY_BUFFER, complete);
464 
465                 if (_log.isDebugEnabled())
466                     _log.debug("{} downstream content transformation {} -> {} bytes", getRequestId(clientRequest), contentBytes, newContentBytes);
467 
468                 if (committed)
469                 {
470                     proxyWriter.onWritePossible();
471                 }
472                 else
473                 {
474                     if (contentLength >= 0)
475                         proxyResponse.setContentLength(-1);
476 
477                     // Setting the WriteListener triggers an invocation to
478                     // onWritePossible(), possibly on a different thread.
479                     // We cannot succeed the callback from here, otherwise
480                     // we run into a race where the different thread calls
481                     // onWritePossible() and succeeding the callback causes
482                     // this method to be called again, which also may call
483                     // onWritePossible().
484                     proxyResponse.getOutputStream().setWriteListener(proxyWriter);
485                 }
486             }
487             catch (Throwable x)
488             {
489                 callback.failed(x);
490             }
491         }
492 
493         @Override
494         public void onSuccess(final Response serverResponse)
495         {
496             try
497             {
498                 if (hasContent)
499                 {
500                     // If we had unknown length content, we need to call the
501                     // transformer to signal that the content is finished.
502                     if (contentLength < 0)
503                     {
504                         ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE);
505                         ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
506 
507                         transform(transformer, BufferUtil.EMPTY_BUFFER, true, buffers);
508 
509                         long newContentBytes = 0;
510                         int size = buffers.size();
511                         if (size > 0)
512                         {
513                             Callback callback = size == 1 ? complete : new CountingCallback(complete, size);
514                             for (int i = 0; i < size; ++i)
515                             {
516                                 ByteBuffer buffer = buffers.get(i);
517                                 newContentBytes += buffer.remaining();
518                                 proxyWriter.offer(buffer, callback);
519                             }
520                             buffers.clear();
521                         }
522                         else
523                         {
524                             proxyWriter.offer(BufferUtil.EMPTY_BUFFER, complete);
525                         }
526 
527                         if (_log.isDebugEnabled())
528                             _log.debug("{} downstream content transformation to {} bytes", getRequestId(clientRequest), newContentBytes);
529 
530                         proxyWriter.onWritePossible();
531                     }
532                 }
533                 else
534                 {
535                     complete.succeeded();
536                 }
537             }
538             catch (Throwable x)
539             {
540                 complete.failed(x);
541             }
542         }
543 
544         @Override
545         public void onComplete(Result result)
546         {
547             response = result.getResponse();
548             if (result.isSucceeded())
549                 complete.succeeded();
550             else
551                 complete.failed(result.getFailure());
552         }
553 
554         @Override
555         public void succeeded()
556         {
557             cleanup(clientRequest);
558             onProxyResponseSuccess(clientRequest, proxyResponse, response);
559         }
560 
561         @Override
562         public void failed(Throwable failure)
563         {
564             cleanup(clientRequest);
565             onProxyResponseFailure(clientRequest, proxyResponse, response, failure);
566         }
567     }
568 
569     protected class ProxyWriter implements WriteListener
570     {
571         private final Queue<DeferredContentProvider.Chunk> chunks = new ArrayDeque<>();
572         private final HttpServletRequest clientRequest;
573         private final Response serverResponse;
574         private DeferredContentProvider.Chunk chunk;
575         private boolean writePending;
576 
577         protected ProxyWriter(HttpServletRequest clientRequest, Response serverResponse)
578         {
579             this.clientRequest = clientRequest;
580             this.serverResponse = serverResponse;
581         }
582 
583         public boolean offer(ByteBuffer content, Callback callback)
584         {
585             if (_log.isDebugEnabled())
586                 _log.debug("{} proxying content to downstream: {} bytes {}", getRequestId(clientRequest), content.remaining(), callback);
587             return chunks.offer(new DeferredContentProvider.Chunk(content, callback));
588         }
589 
590         @Override
591         public void onWritePossible() throws IOException
592         {
593             ServletOutputStream output = clientRequest.getAsyncContext().getResponse().getOutputStream();
594 
595             // If we had a pending write, let's succeed it.
596             if (writePending)
597             {
598                 if (_log.isDebugEnabled())
599                     _log.debug("{} pending async write complete of {} on {}", getRequestId(clientRequest), chunk, output);
600                 writePending = false;
601                 if (succeed(chunk.callback))
602                     return;
603             }
604 
605             int length = 0;
606             DeferredContentProvider.Chunk chunk = null;
607             while (output.isReady())
608             {
609                 if (chunk != null)
610                 {
611                     if (_log.isDebugEnabled())
612                         _log.debug("{} async write complete of {} ({} bytes) on {}", getRequestId(clientRequest), chunk, length, output);
613                     if (succeed(chunk.callback))
614                         return;
615                 }
616 
617                 this.chunk = chunk = chunks.poll();
618                 if (chunk == null)
619                     return;
620 
621                 length = chunk.buffer.remaining();
622                 if (length > 0)
623                     writeProxyResponseContent(output, chunk.buffer);
624             }
625 
626             if (_log.isDebugEnabled())
627                 _log.debug("{} async write pending of {} ({} bytes) on {}", getRequestId(clientRequest), chunk, length, output);
628             writePending = true;
629         }
630 
631         private boolean succeed(Callback callback)
632         {
633             // Succeeding the callback may cause to reenter in onWritePossible()
634             // because typically the callback is the one that controls whether the
635             // content received from the server has been consumed, so succeeding
636             // the callback causes more content to be received from the server,
637             // and hence more to be written to the client by onWritePossible().
638             // A reentrant call to onWritePossible() performs another write,
639             // which may remain pending, which means that the reentrant call
640             // to onWritePossible() returns all the way back to just after the
641             // succeed of the callback. There, we cannot just loop attempting
642             // write, but we need to check whether we are write pending.
643             callback.succeeded();
644             return writePending;
645         }
646 
647         @Override
648         public void onError(Throwable failure)
649         {
650             DeferredContentProvider.Chunk chunk = this.chunk;
651             if (chunk != null)
652                 chunk.callback.failed(failure);
653             else
654                 serverResponse.abort(failure);
655         }
656     }
657 
658     /**
659      * <p>Allows applications to transform upstream and downstream content.</p>
660      * <p>Typical use cases of transformations are URL rewriting of HTML anchors
661      * (where the value of the <code>href</code> attribute of &lt;a&gt; elements
662      * is modified by the proxy), field renaming of JSON documents, etc.</p>
663      * <p>Applications should override {@link #newClientRequestContentTransformer(HttpServletRequest, Request)}
664      * and/or {@link #newServerResponseContentTransformer(HttpServletRequest, HttpServletResponse, Response)}
665      * to provide the transformer implementation.</p>
666      */
667     public interface ContentTransformer
668     {
669         /**
670          * The identity transformer that does not perform any transformation.
671          */
672         public static final ContentTransformer IDENTITY = new IdentityContentTransformer();
673 
674         /**
675          * <p>Transforms the given input byte buffers into (possibly multiple) byte buffers.</p>
676          * <p>The transformation must happen synchronously in the context of a call
677          * to this method (it is not supported to perform the transformation in another
678          * thread spawned during the call to this method).
679          * The transformation may happen or not, depending on the transformer implementation.
680          * For example, a buffering transformer may buffer the input aside, and only
681          * perform the transformation when the whole input is provided (by looking at the
682          * {@code finished} flag).</p>
683          * <p>The input buffer will be cleared and reused after the call to this method.
684          * Implementations that want to buffer aside the input (or part of it) must copy
685          * the input bytes that they want to buffer.</p>
686          * <p>Typical implementations:</p>
687          * <pre>
688          * // Identity transformation (no transformation, the input is copied to the output)
689          * public void transform(ByteBuffer input, boolean finished, List&lt;ByteBuffer&gt; output)
690          * {
691          *     output.add(input);
692          * }
693          *
694          * // Discard transformation (all input is discarded)
695          * public void transform(ByteBuffer input, boolean finished, List&lt;ByteBuffer&gt; output)
696          * {
697          *     // Empty
698          * }
699          *
700          * // Buffering identity transformation (all input is buffered aside until it is finished)
701          * public void transform(ByteBuffer input, boolean finished, List&lt;ByteBuffer&gt; output)
702          * {
703          *     ByteBuffer copy = ByteBuffer.allocate(input.remaining());
704          *     copy.put(input).flip();
705          *     store(copy);
706          *
707          *     if (finished)
708          *     {
709          *         List&lt;ByteBuffer&gt; copies = retrieve();
710          *         output.addAll(copies);
711          *     }
712          * }
713          * </pre>
714          *
715          * @param input the input content to transform (may be of length zero)
716          * @param finished whether the input content is finished or more will come
717          * @param output where to put the transformed output content
718          * @throws IOException in case of transformation failures
719          */
720         public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException;
721     }
722 
723     private static class IdentityContentTransformer implements ContentTransformer
724     {
725         @Override
726         public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output)
727         {
728             output.add(input);
729         }
730     }
731 
732     public static class GZIPContentTransformer implements ContentTransformer
733     {
734         private final List<ByteBuffer> buffers = new ArrayList<>(2);
735         private final ContentDecoder decoder = new GZIPContentDecoder();
736         private final ContentTransformer transformer;
737         private final ByteArrayOutputStream out;
738         private final GZIPOutputStream gzipOut;
739 
740         public GZIPContentTransformer(ContentTransformer transformer)
741         {
742             try
743             {
744                 this.transformer = transformer;
745                 this.out = new ByteArrayOutputStream();
746                 this.gzipOut = new GZIPOutputStream(out);
747             }
748             catch (IOException x)
749             {
750                 throw new RuntimeIOException(x);
751             }
752         }
753 
754         @Override
755         public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
756         {
757             if (!input.hasRemaining())
758             {
759                 if (finished)
760                     transformer.transform(input, true, buffers);
761             }
762             else
763             {
764                 while (input.hasRemaining())
765                 {
766                     ByteBuffer decoded = decoder.decode(input);
767                     if (decoded.hasRemaining())
768                         transformer.transform(decoded, finished && !input.hasRemaining(), buffers);
769                 }
770             }
771 
772             if (!buffers.isEmpty() || finished)
773             {
774                 ByteBuffer result = gzip(buffers, finished);
775                 buffers.clear();
776                 output.add(result);
777             }
778         }
779 
780         private ByteBuffer gzip(List<ByteBuffer> buffers, boolean finished) throws IOException
781         {
782             for (ByteBuffer buffer : buffers)
783                 write(gzipOut, buffer);
784             if (finished)
785                 gzipOut.close();
786             byte[] gzipBytes = out.toByteArray();
787             out.reset();
788             return ByteBuffer.wrap(gzipBytes);
789         }
790     }
791 }