View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.proxy;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.nio.ByteBuffer;
26  import java.nio.channels.Channels;
27  import java.nio.channels.FileChannel;
28  import java.nio.file.Files;
29  import java.nio.file.Path;
30  import java.nio.file.Paths;
31  import java.nio.file.StandardOpenOption;
32  import java.nio.file.attribute.FileAttribute;
33  import java.util.ArrayList;
34  import java.util.List;
35  
36  import org.eclipse.jetty.util.IO;
37  import org.eclipse.jetty.util.component.Destroyable;
38  import org.eclipse.jetty.util.log.Log;
39  import org.eclipse.jetty.util.log.Logger;
40  
41  /**
42   * <p>A specialized transformer for {@link AsyncMiddleManServlet} that performs
43   * the transformation when the whole content has been received.</p>
44   * <p>The content is buffered in memory up to a configurable {@link #getMaxInputBufferSize() maximum size},
45   * after which it is overflown to a file on disk. The overflow file is saved
46   * in the {@link #getOverflowDirectory() overflow directory} as a
47   * {@link Files#createTempFile(Path, String, String, FileAttribute[]) temporary file}
48   * with a name starting with the {@link #getInputFilePrefix() input prefix}
49   * and default suffix.</p>
50   * <p>Application must implement the {@link #transform(Source, Sink) transformation method}
51   * to transform the content.</p>
52   * <p>The transformed content is buffered in memory up to a configurable {@link #getMaxOutputBufferSize() maximum size}
53   * after which it is overflown to a file on disk. The overflow file is saved
54   * in the {@link #getOverflowDirectory() overflow directory} as a
55   * {@link Files#createTempFile(Path, String, String, FileAttribute[]) temporary file}
56   * with a name starting with the {@link #getOutputFilePrefix()} output prefix}
57   * and default suffix.</p>
58   */
59  public abstract class AfterContentTransformer implements AsyncMiddleManServlet.ContentTransformer, Destroyable
60  {
61      private static final Logger LOG = Log.getLogger(AfterContentTransformer.class);
62  
63      private final List<ByteBuffer> sourceBuffers = new ArrayList<>();
64      private Path overflowDirectory = Paths.get(System.getProperty("java.io.tmpdir"));
65      private String inputFilePrefix = "amms_adct_in_";
66      private String outputFilePrefix = "amms_adct_out_";
67      private long maxInputBufferSize = 1024 * 1024;
68      private long inputBufferSize;
69      private FileChannel inputFile;
70      private long maxOutputBufferSize = maxInputBufferSize;
71      private long outputBufferSize;
72      private FileChannel outputFile;
73  
74      /**
75       * <p>Returns the directory where input and output are overflown to
76       * temporary files if they exceed, respectively, the
77       * {@link #getMaxInputBufferSize() max input size} or the
78       * {@link #getMaxOutputBufferSize() max output size}.</p>
79       * <p>Defaults to the directory pointed by the {@code java.io.tmpdir}
80       * system property.</p>
81       *
82       * @return the overflow directory path
83       * @see #setOverflowDirectory(Path)
84       */
85      public Path getOverflowDirectory()
86      {
87          return overflowDirectory;
88      }
89  
90      /**
91       * @param overflowDirectory the overflow directory path
92       * @see #getOverflowDirectory()
93       */
94      public void setOverflowDirectory(Path overflowDirectory)
95      {
96          this.overflowDirectory = overflowDirectory;
97      }
98  
99      /**
100      * @return the prefix of the input overflow temporary files
101      * @see #setInputFilePrefix(String)
102      */
103     public String getInputFilePrefix()
104     {
105         return inputFilePrefix;
106     }
107 
108     /**
109      * @param inputFilePrefix the prefix of the input overflow temporary files
110      * @see #getInputFilePrefix()
111      */
112     public void setInputFilePrefix(String inputFilePrefix)
113     {
114         this.inputFilePrefix = inputFilePrefix;
115     }
116 
117     /**
118      * <p>Returns the maximum input buffer size, after which the input is overflown to disk.</p>
119      * <p>Defaults to 1 MiB, i.e. 1048576 bytes.</p>
120      *
121      * @return the max input buffer size
122      * @see #setMaxInputBufferSize(long)
123      */
124     public long getMaxInputBufferSize()
125     {
126         return maxInputBufferSize;
127     }
128 
129     /**
130      * @param maxInputBufferSize the max input buffer size
131      * @see #getMaxInputBufferSize()
132      */
133     public void setMaxInputBufferSize(long maxInputBufferSize)
134     {
135         this.maxInputBufferSize = maxInputBufferSize;
136     }
137 
138     /**
139      * @return the prefix of the output overflow temporary files
140      * @see #setOutputFilePrefix(String)
141      */
142     public String getOutputFilePrefix()
143     {
144         return outputFilePrefix;
145     }
146 
147     /**
148      * @param outputFilePrefix the prefix of the output overflow temporary files
149      * @see #getOutputFilePrefix()
150      */
151     public void setOutputFilePrefix(String outputFilePrefix)
152     {
153         this.outputFilePrefix = outputFilePrefix;
154     }
155 
156     /**
157      * <p>Returns the maximum output buffer size, after which the output is overflown to disk.</p>
158      * <p>Defaults to 1 MiB, i.e. 1048576 bytes.</p>
159      *
160      * @return the max output buffer size
161      * @see #setMaxOutputBufferSize(long)
162      */
163     public long getMaxOutputBufferSize()
164     {
165         return maxOutputBufferSize;
166     }
167 
168     /**
169      * @param maxOutputBufferSize the max output buffer size
170      */
171     public void setMaxOutputBufferSize(long maxOutputBufferSize)
172     {
173         this.maxOutputBufferSize = maxOutputBufferSize;
174     }
175 
176     @Override
177     public final void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
178     {
179         int remaining = input.remaining();
180         if (remaining > 0)
181         {
182             inputBufferSize += remaining;
183             long max = getMaxInputBufferSize();
184             if (max >= 0 && inputBufferSize > max)
185             {
186                 overflow(input);
187             }
188             else
189             {
190                 ByteBuffer copy = ByteBuffer.allocate(input.remaining());
191                 copy.put(input).flip();
192                 sourceBuffers.add(copy);
193             }
194         }
195 
196         if (finished)
197         {
198             Source source = new Source();
199             Sink sink = new Sink();
200             if (transform(source, sink))
201                 sink.drainTo(output);
202             else
203                 source.drainTo(output);
204         }
205     }
206 
207     /**
208      * <p>Transforms the original content read from the {@code source} into
209      * transformed content written to the {@code sink}.</p>
210      * <p>The transformation must happen synchronously in the context of a call
211      * to this method (it is not supported to perform the transformation in another
212      * thread spawned during the call to this method).</p>
213      * <p>Differently from {@link #transform(ByteBuffer, boolean, List)}, this
214      * method is invoked only when the whole content is available, and offers
215      * a blocking API via the InputStream and OutputStream that can be obtained
216      * from {@link Source} and {@link Sink} respectively.</p>
217      * <p>Implementations may read the source, inspect the input bytes and decide
218      * that no transformation is necessary, and therefore the source must be copied
219      * unchanged to the sink. In such case, the implementation must return false to
220      * indicate that it wishes to just pipe the bytes from the source to the sink.</p>
221      * <p>Typical implementations:</p>
222      * <pre>
223      * // Identity transformation (no transformation, the input is copied to the output)
224      * public boolean transform(Source source, Sink sink)
225      * {
226      *     org.eclipse.jetty.util.IO.copy(source.getInputStream(), sink.getOutputStream());
227      *     return true;
228      * }
229      * </pre>
230      *
231      * @param source where the original content is read
232      * @param sink where the transformed content is written
233      * @return true if the transformation happened and the transformed bytes have
234      * been written to the sink, false if no transformation happened and the source
235      * must be copied to the sink.
236      * @throws IOException if the transformation fails
237      */
238     public abstract boolean transform(Source source, Sink sink) throws IOException;
239 
240     private void overflow(ByteBuffer input) throws IOException
241     {
242         if (inputFile == null)
243         {
244             Path path = Files.createTempFile(getOverflowDirectory(), getInputFilePrefix(), null);
245             inputFile = FileChannel.open(path,
246                     StandardOpenOption.CREATE,
247                     StandardOpenOption.READ,
248                     StandardOpenOption.WRITE,
249                     StandardOpenOption.DELETE_ON_CLOSE);
250             int size = sourceBuffers.size();
251             if (size > 0)
252             {
253                 ByteBuffer[] buffers = sourceBuffers.toArray(new ByteBuffer[size]);
254                 sourceBuffers.clear();
255                 IO.write(inputFile,buffers,0,buffers.length);
256             }
257         }
258         inputFile.write(input);
259     }
260 
261     @Override
262     public void destroy()
263     {
264         close(inputFile);
265         close(outputFile);
266     }
267 
268     private void drain(FileChannel file, List<ByteBuffer> output) throws IOException
269     {
270         long position = 0;
271         long length = file.size();
272         file.position(position);
273         while (length > 0)
274         {
275             // At most 1 GiB file maps.
276             long size = Math.min(1024 * 1024 * 1024, length);
277             ByteBuffer buffer = file.map(FileChannel.MapMode.READ_ONLY, position, size);
278             output.add(buffer);
279             position += size;
280             length -= size;
281         }
282     }
283 
284     private void close(Closeable closeable)
285     {
286         try
287         {
288             if (closeable != null)
289                 closeable.close();
290         }
291         catch (IOException x)
292         {
293             LOG.ignore(x);
294         }
295     }
296 
297     /**
298      * <p>The source from where the original content is read to be transformed.</p>
299      * <p>The {@link #getInputStream() input stream} provided by this
300      * class supports the {@link InputStream#reset()} method so that
301      * the stream can be rewound to the beginning.</p>
302      */
303     public class Source
304     {
305         private final InputStream stream;
306 
307         private Source() throws IOException
308         {
309             if (inputFile != null)
310             {
311                 inputFile.force(true);
312                 stream = new ChannelInputStream();
313             }
314             else
315             {
316                 stream = new MemoryInputStream();
317             }
318             stream.reset();
319         }
320 
321         /**
322          * @return an input stream to read the original content from
323          */
324         public InputStream getInputStream()
325         {
326             return stream;
327         }
328 
329         private void drainTo(List<ByteBuffer> output) throws IOException
330         {
331             if (inputFile == null)
332             {
333                 output.addAll(sourceBuffers);
334                 sourceBuffers.clear();
335             }
336             else
337             {
338                 drain(inputFile, output);
339             }
340         }
341     }
342 
343     private class ChannelInputStream extends InputStream
344     {
345         private final InputStream stream = Channels.newInputStream(inputFile);
346 
347         @Override
348         public int read(byte[] b, int off, int len) throws IOException
349         {
350             return stream.read(b, off, len);
351         }
352 
353         @Override
354         public int read() throws IOException
355         {
356             return stream.read();
357         }
358 
359         @Override
360         public void reset() throws IOException
361         {
362             inputFile.position(0);
363         }
364     }
365 
366     private class MemoryInputStream extends InputStream
367     {
368         private final byte[] oneByte = new byte[1];
369         private int index;
370         private ByteBuffer slice;
371 
372         @Override
373         public int read(byte[] b, int off, int len) throws IOException
374         {
375             if (len == 0)
376                 return 0;
377             if (index == sourceBuffers.size())
378                 return -1;
379 
380             if (slice == null)
381                 slice = sourceBuffers.get(index).slice();
382 
383             int size = Math.min(len, slice.remaining());
384             slice.get(b, off, size);
385 
386             if (!slice.hasRemaining())
387             {
388                 ++index;
389                 slice = null;
390             }
391 
392             return size;
393         }
394 
395         @Override
396         public int read() throws IOException
397         {
398             int read = read(oneByte, 0, 1);
399             return read < 0 ? read : oneByte[0] & 0xFF;
400         }
401 
402         @Override
403         public void reset() throws IOException
404         {
405             index = 0;
406             slice = null;
407         }
408     }
409 
410     /**
411      * <p>The target to where the transformed content is written after the transformation.</p>
412      */
413     public class Sink
414     {
415         private final List<ByteBuffer> sinkBuffers = new ArrayList<>();
416         private final OutputStream stream = new SinkOutputStream();
417 
418         /**
419          * @return an output stream to write the transformed content to
420          */
421         public OutputStream getOutputStream()
422         {
423             return stream;
424         }
425 
426         
427         private void overflow(ByteBuffer output) throws IOException
428         {
429             if (outputFile == null)
430             {
431                 Path path = Files.createTempFile(getOverflowDirectory(), getOutputFilePrefix(), null);
432                 outputFile = FileChannel.open(path,
433                         StandardOpenOption.CREATE,
434                         StandardOpenOption.READ,
435                         StandardOpenOption.WRITE,
436                         StandardOpenOption.DELETE_ON_CLOSE);
437                 int size = sinkBuffers.size();
438                 if (size > 0)
439                 {
440                     ByteBuffer[] buffers = sinkBuffers.toArray(new ByteBuffer[size]);
441                     sinkBuffers.clear();
442                     
443                     IO.write(outputFile,buffers,0,buffers.length);
444                 }
445             }
446             outputFile.write(output);
447         }
448 
449         private void drainTo(List<ByteBuffer> output) throws IOException
450         {
451             if (outputFile == null)
452             {
453                 output.addAll(sinkBuffers);
454                 sinkBuffers.clear();
455             }
456             else
457             {
458                 outputFile.force(true);
459                 drain(outputFile, output);
460             }
461         }
462 
463         private class SinkOutputStream extends OutputStream
464         {
465             @Override
466             public void write(byte[] b, int off, int len) throws IOException
467             {
468                 if (len <= 0)
469                     return;
470 
471                 outputBufferSize += len;
472                 long max = getMaxOutputBufferSize();
473                 if (max >= 0 && outputBufferSize > max)
474                 {
475                     overflow(ByteBuffer.wrap(b, off, len));
476                 }
477                 else
478                 {
479                     // The array may be reused by the
480                     // application so we need to copy it.
481                     byte[] copy = new byte[len];
482                     System.arraycopy(b, off, copy, 0, len);
483                     sinkBuffers.add(ByteBuffer.wrap(copy));
484                 }
485             }
486 
487             @Override
488             public void write(int b) throws IOException
489             {
490                 write(new byte[]{(byte)b}, 0, 1);
491             }
492         }
493     }
494 }