View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.servlets;
20  
21  import java.io.File;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.RandomAccessFile;
25  import java.nio.ByteBuffer;
26  import java.nio.channels.FileChannel.MapMode;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.ScheduledThreadPoolExecutor;
29  import java.util.concurrent.TimeUnit;
30  
31  import javax.servlet.AsyncContext;
32  import javax.servlet.ServletException;
33  import javax.servlet.ServletOutputStream;
34  import javax.servlet.WriteListener;
35  import javax.servlet.http.HttpServlet;
36  import javax.servlet.http.HttpServletRequest;
37  import javax.servlet.http.HttpServletResponse;
38  
39  import org.eclipse.jetty.server.HttpOutput;
40  
41  /**
42   * A servlet that uses the Servlet 3.1 asynchronous IO API to server
43   * static content at a limited data rate.
44   * <p>
45   * Two implementations are supported: <ul>
46   * <li>The <code>StandardDataStream</code> impl uses only standard
47   * APIs, but produces more garbage due to the byte[] nature of the API.  
48   * <li>the <code>JettyDataStream</code> impl uses a Jetty API to write a ByteBuffer
49   * and thus allow the efficient use of file mapped buffers without any
50   * temporary buffer copies (I did tell the JSR that this was a good idea to 
51   * have in the standard!).
52   * </ul>
53   * <p>
54   * The data rate is controlled by setting init parameters:
55   * <dl>
56   * <dt>buffersize</dt><dd>The amount of data in bytes written per write</dd>
57   * <dt>pause</dt><dd>The period in ms to wait after a write before attempting another</dd>
58   * <dt>pool</dt><dd>The size of the thread pool used to service the writes (defaults to available processors)</dd>
59   * </dl>
60   * Thus if buffersize = 1024 and pause = 100, the data rate will be limited to 10KB per second.
61   */
62  public class DataRateLimitedServlet extends HttpServlet
63  {
64      private static final long serialVersionUID = -4771757707068097025L;
65      private int buffersize=8192;
66      private long pauseNS=TimeUnit.MILLISECONDS.toNanos(100);
67      ScheduledThreadPoolExecutor scheduler;
68      private final ConcurrentHashMap<String, ByteBuffer> cache=new ConcurrentHashMap<>();
69      
70      @Override
71      public void init() throws ServletException
72      {
73          // read the init params
74          String tmp = getInitParameter("buffersize");
75          if (tmp!=null)
76              buffersize=Integer.parseInt(tmp);
77          tmp = getInitParameter("pause");
78          if (tmp!=null)
79              pauseNS=TimeUnit.MILLISECONDS.toNanos(Integer.parseInt(tmp));
80          tmp = getInitParameter("pool");
81          int pool=tmp==null?Runtime.getRuntime().availableProcessors():Integer.parseInt(tmp);
82          
83          // Create and start a shared scheduler.  
84          scheduler=new ScheduledThreadPoolExecutor(pool);
85      }
86  
87      @Override
88      public void destroy()
89      {
90          scheduler.shutdown();
91      }
92      
93      @Override
94      protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
95      {
96          // Get the path of the static resource to serve.
97          String info=request.getPathInfo();
98                  
99          // We don't handle directories
100         if (info.endsWith("/"))
101         {
102             response.sendError(503,"directories not supported");
103             return;
104         }
105 
106         // Set the mime type of the response
107         String content_type=getServletContext().getMimeType(info);
108         response.setContentType(content_type==null?"application/x-data":content_type);
109         
110         // Look for a matching file path
111         String path = request.getPathTranslated();
112         
113         // If we have a file path and this is a jetty response, we can use the JettyStream impl
114         ServletOutputStream out = response.getOutputStream();
115         if (path != null && out instanceof HttpOutput)
116         {
117             // If the file exists
118             File file = new File(path);
119             if (file.exists() && file.canRead())
120             {
121                 // Set the content length
122                 response.setContentLengthLong(file.length());
123                 
124                 // Look for a file mapped buffer in the cache
125                 ByteBuffer mapped=cache.get(path);
126                 
127                 // Handle cache miss
128                 if (mapped==null)
129                 {
130                     // TODO implement LRU cache flush
131                     try (RandomAccessFile raf = new RandomAccessFile(file, "r"))
132                     {
133                         ByteBuffer buf = raf.getChannel().map(MapMode.READ_ONLY,0,raf.length());
134                         mapped=cache.putIfAbsent(path,buf);
135                         if (mapped==null)
136                             mapped=buf;
137                     }
138                 }
139 
140                 // start async request handling
141                 AsyncContext async=request.startAsync();
142 
143                 // Set a JettyStream as the write listener to write the content asynchronously.
144                 out.setWriteListener(new JettyDataStream(mapped,async,out));    
145                 return;
146             }
147         }
148         
149         // Jetty API was not used, so lets try the standards approach
150         
151         // Can we find the content as an input stream
152         InputStream content = getServletContext().getResourceAsStream(info);
153         if (content==null)
154         {
155             response.sendError(404);
156             return;
157         }
158 
159         // Set a StandardStream as he write listener to write the content asynchronously
160         out.setWriteListener(new StandardDataStream(content,request.startAsync(),out));
161     }
162 
163     /**
164      * A standard API Stream writer
165      */
166     private final class StandardDataStream implements WriteListener, Runnable
167     {
168         private final InputStream content;
169         private final AsyncContext async;
170         private final ServletOutputStream out;
171 
172         private StandardDataStream(InputStream content, AsyncContext async, ServletOutputStream out)
173         {
174             this.content = content;
175             this.async = async;
176             this.out = out;
177         }
178 
179         @Override
180         public void onWritePossible() throws IOException
181         {
182             // If we are able to write
183             if(out.isReady())
184             {
185                 // Allocated a copy buffer for each write, so as to not hold while paused
186                 // TODO put these buffers into a pool
187                 byte[] buffer = new byte[buffersize];
188                 
189                 // read some content into the copy buffer
190                 int len=content.read(buffer);
191                 
192                 // If we are at EOF
193                 if (len<0)
194                 {
195                     // complete the async lifecycle
196                     async.complete();
197                     return;
198                 }
199                 
200                 // write out the copy buffer.  This will be an asynchronous write
201                 // and will always return immediately without blocking.  If a subsequent
202                 // call to out.isReady() returns false, then this onWritePossible method
203                 // will be called back when a write is possible.
204                 out.write(buffer,0,len);
205                 
206                 // Schedule a timer callback to pause writing.  Because isReady() is not called,
207                 // a onWritePossible callback is no scheduled.
208                 scheduler.schedule(this,pauseNS,TimeUnit.NANOSECONDS);
209             }
210         }
211         
212         @Override 
213         public void run()
214         {
215             try
216             {
217                 // When the pause timer wakes up, call onWritePossible.  Either isReady() will return
218                 // true and another chunk of content will be written, or it will return false and the 
219                 // onWritePossible() callback will be scheduled when a write is next possible.
220                 onWritePossible();
221             }
222             catch(Exception e)
223             {
224                 onError(e);
225             }
226         }
227 
228         @Override
229         public void onError(Throwable t)
230         {
231             getServletContext().log("Async Error",t);
232             async.complete();
233         }
234     }
235     
236 
237     /**
238      * A Jetty API DataStream
239      *
240      */
241     private final class JettyDataStream implements WriteListener, Runnable
242     {
243         private final ByteBuffer content;
244         private final int limit;
245         private final AsyncContext async;
246         private final HttpOutput out;
247 
248         private JettyDataStream(ByteBuffer content, AsyncContext async, ServletOutputStream out)
249         {
250             // Make a readonly copy of the passed buffer. This uses the same underlying content
251             // without a copy, but gives this instance its own position and limit.
252             this.content = content.asReadOnlyBuffer();
253             // remember the ultimate limit.
254             this.limit=this.content.limit();
255             this.async = async;
256             this.out = (HttpOutput)out;
257         }
258 
259         @Override
260         public void onWritePossible() throws IOException
261         {            
262             // If we are able to write
263             if(out.isReady())
264             {   
265                 // Position our buffers limit to allow only buffersize bytes to be written
266                 int l=content.position()+buffersize;
267                 // respect the ultimate limit
268                 if (l>limit)
269                     l=limit;
270                 content.limit(l);
271 
272                 // if all content has been written
273                 if (!content.hasRemaining())
274                 {              
275                     // complete the async lifecycle
276                     async.complete();
277                     return;
278                 }
279                 
280                 // write our limited buffer.  This will be an asynchronous write
281                 // and will always return immediately without blocking.  If a subsequent
282                 // call to out.isReady() returns false, then this onWritePossible method
283                 // will be called back when a write is possible.
284                 out.write(content);
285 
286                 // Schedule a timer callback to pause writing.  Because isReady() is not called,
287                 // a onWritePossible callback is not scheduled.
288                 scheduler.schedule(this,pauseNS,TimeUnit.NANOSECONDS);
289             }
290         }
291         
292         @Override 
293         public void run()
294         {
295             try
296             {
297                 // When the pause timer wakes up, call onWritePossible.  Either isReady() will return
298                 // true and another chunk of content will be written, or it will return false and the 
299                 // onWritePossible() callback will be scheduled when a write is next possible.
300                 onWritePossible();
301             }
302             catch(Exception e)
303             {
304                 onError(e);
305             }
306         }
307 
308         @Override
309         public void onError(Throwable t)
310         {
311             getServletContext().log("Async Error",t);
312             async.complete();
313         }
314     }
315 }