View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.servlets;
20  
21  import java.io.IOException;
22  import java.util.Queue;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.Semaphore;
25  import java.util.concurrent.TimeUnit;
26  
27  import javax.servlet.Filter;
28  import javax.servlet.FilterChain;
29  import javax.servlet.FilterConfig;
30  import javax.servlet.ServletContext;
31  import javax.servlet.ServletException;
32  import javax.servlet.ServletRequest;
33  import javax.servlet.ServletResponse;
34  import javax.servlet.http.HttpServletRequest;
35  import javax.servlet.http.HttpServletResponse;
36  import javax.servlet.http.HttpSession;
37  
38  import org.eclipse.jetty.continuation.Continuation;
39  import org.eclipse.jetty.continuation.ContinuationListener;
40  import org.eclipse.jetty.continuation.ContinuationSupport;
41  import org.eclipse.jetty.server.handler.ContextHandler;
42  import org.eclipse.jetty.util.annotation.ManagedAttribute;
43  import org.eclipse.jetty.util.annotation.ManagedObject;
44  
45  /**
46   * Quality of Service Filter.
47   *
48   * This filter limits the number of active requests to the number set by the "maxRequests" init parameter (default 10).
49   * If more requests are received, they are suspended and placed on priority queues.  Priorities are determined by
50   * the {@link #getPriority(ServletRequest)} method and are a value between 0 and the value given by the "maxPriority"
51   * init parameter (default 10), with higher values having higher priority.
52   * </p><p>
53   * This filter is ideal to prevent wasting threads waiting for slow/limited
54   * resources such as a JDBC connection pool.  It avoids the situation where all of a
55   * containers thread pool may be consumed blocking on such a slow resource.
56   * By limiting the number of active threads, a smaller thread pool may be used as
57   * the threads are not wasted waiting.  Thus more memory may be available for use by
58   * the active threads.
59   * </p><p>
60   * Furthermore, this filter uses a priority when resuming waiting requests. So that if
61   * a container is under load, and there are many requests waiting for resources,
62   * the {@link #getPriority(ServletRequest)} method is used, so that more important
63   * requests are serviced first.     For example, this filter could be deployed with a
64   * maxRequest limit slightly smaller than the containers thread pool and a high priority
65   * allocated to admin users.  Thus regardless of load, admin users would always be
66   * able to access the web application.
67   * </p><p>
68   * The maxRequest limit is policed by a {@link Semaphore} and the filter will wait a short while attempting to acquire
69   * the semaphore. This wait is controlled by the "waitMs" init parameter and allows the expense of a suspend to be
70   * avoided if the semaphore is shortly available.  If the semaphore cannot be obtained, the request will be suspended
71   * for the default suspend period of the container or the valued set as the "suspendMs" init parameter.
72   * </p><p>
73   * If the "managedAttr" init parameter is set to true, then this servlet is set as a {@link ServletContext} attribute with the
74   * filter name as the attribute name.  This allows context external mechanism (eg JMX via {@link ContextHandler#MANAGED_ATTRIBUTES}) to
75   * manage the configuration of the filter.
76   * </p>
77   *
78   *
79   */
80  @ManagedObject("Quality of Service Filter")
81  public class QoSFilter implements Filter
82  {
83      final static int __DEFAULT_MAX_PRIORITY=10;
84      final static int __DEFAULT_PASSES=10;
85      final static int __DEFAULT_WAIT_MS=50;
86      final static long __DEFAULT_TIMEOUT_MS = -1;
87  
88      final static String MANAGED_ATTR_INIT_PARAM="managedAttr";
89      final static String MAX_REQUESTS_INIT_PARAM="maxRequests";
90      final static String MAX_PRIORITY_INIT_PARAM="maxPriority";
91      final static String MAX_WAIT_INIT_PARAM="waitMs";
92      final static String SUSPEND_INIT_PARAM="suspendMs";
93  
94      ServletContext _context;
95  
96      protected long _waitMs;
97      protected long _suspendMs;
98      protected int _maxRequests;
99  
100     private Semaphore _passes;
101     private Queue<Continuation>[] _queue;
102     private ContinuationListener[] _listener;
103     private String _suspended="QoSFilter@"+this.hashCode();
104 
105     /* ------------------------------------------------------------ */
106     /**
107      * @see javax.servlet.Filter#init(javax.servlet.FilterConfig)
108      */
109     public void init(FilterConfig filterConfig)
110     {
111         _context=filterConfig.getServletContext();
112 
113         int max_priority=__DEFAULT_MAX_PRIORITY;
114         if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM)!=null)
115             max_priority=Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
116         _queue=new Queue[max_priority+1];
117         _listener = new ContinuationListener[max_priority + 1];
118         for (int p=0;p<_queue.length;p++)
119         {
120             _queue[p]=new ConcurrentLinkedQueue<Continuation>();
121 
122             final int priority=p;
123             _listener[p] = new ContinuationListener()
124             {
125                 public void onComplete(Continuation continuation)
126                 {}
127 
128                 public void onTimeout(Continuation continuation)
129                 {
130                     _queue[priority].remove(continuation);
131                 }
132             };
133         }
134 
135         int maxRequests=__DEFAULT_PASSES;
136         if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM)!=null)
137             maxRequests=Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
138         _passes=new Semaphore(maxRequests,true);
139         _maxRequests = maxRequests;
140 
141         long wait = __DEFAULT_WAIT_MS;
142         if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM)!=null)
143             wait=Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
144         _waitMs=wait;
145 
146         long suspend = __DEFAULT_TIMEOUT_MS;
147         if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM)!=null)
148             suspend=Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
149         _suspendMs=suspend;
150 
151         if (_context!=null && Boolean.parseBoolean(filterConfig.getInitParameter(MANAGED_ATTR_INIT_PARAM)))
152             _context.setAttribute(filterConfig.getFilterName(),this);
153     }
154 
155     /* ------------------------------------------------------------ */
156     /**
157      * @see javax.servlet.Filter#doFilter(javax.servlet.ServletRequest, javax.servlet.ServletResponse, javax.servlet.FilterChain)
158      */
159     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
160     throws IOException, ServletException
161     {
162         boolean accepted=false;
163         try
164         {
165             if (request.getAttribute(_suspended)==null)
166             {
167                 accepted=_passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
168                 if (accepted)
169                 {
170                     request.setAttribute(_suspended,Boolean.FALSE);
171                 }
172                 else
173                 {
174                     request.setAttribute(_suspended,Boolean.TRUE);
175                     int priority = getPriority(request);
176                     Continuation continuation = ContinuationSupport.getContinuation(request);
177                     if (_suspendMs>0)
178                         continuation.setTimeout(_suspendMs);
179                     continuation.suspend();
180                     continuation.addContinuationListener(_listener[priority]);
181                     _queue[priority].add(continuation);
182                     return;
183                 }
184             }
185             else
186             {
187                 Boolean suspended=(Boolean)request.getAttribute(_suspended);
188 
189                 if (suspended.booleanValue())
190                 {
191                     request.setAttribute(_suspended,Boolean.FALSE);
192                     if (request.getAttribute("javax.servlet.resumed")==Boolean.TRUE)
193                     {
194                         _passes.acquire();
195                         accepted=true;
196                     }
197                     else
198                     {
199                         // Timeout! try 1 more time.
200                         accepted = _passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
201                     }
202                 }
203                 else
204                 {
205                     // pass through resume of previously accepted request
206                     _passes.acquire();
207                     accepted = true;
208                 }
209             }
210 
211             if (accepted)
212             {
213                 chain.doFilter(request,response);
214             }
215             else
216             {
217                 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
218             }
219         }
220         catch(InterruptedException e)
221         {
222             _context.log("QoS",e);
223             ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
224         }
225         finally
226         {
227             if (accepted)
228             {
229                 for (int p=_queue.length;p-->0;)
230                 {
231                     Continuation continutaion=_queue[p].poll();
232                     if (continutaion!=null && continutaion.isSuspended())
233                     {
234                         continutaion.resume();
235                         break;
236                     }
237                 }
238                 _passes.release();
239             }
240         }
241     }
242 
243     /**
244      * Get the request Priority.
245      * <p> The default implementation assigns the following priorities:<ul>
246      * <li> 2 - for a authenticated request
247      * <li> 1 - for a request with valid /non new session
248      * <li> 0 - for all other requests.
249      * </ul>
250      * This method may be specialised to provide application specific priorities.
251      *
252      * @param request
253      * @return the request priority
254      */
255     protected int getPriority(ServletRequest request)
256     {
257         HttpServletRequest baseRequest = (HttpServletRequest)request;
258         if (baseRequest.getUserPrincipal() != null )
259             return 2;
260         else
261         {
262             HttpSession session = baseRequest.getSession(false);
263             if (session!=null && !session.isNew())
264                 return 1;
265             else
266                 return 0;
267         }
268     }
269 
270 
271     /* ------------------------------------------------------------ */
272     /**
273      * @see javax.servlet.Filter#destroy()
274      */
275     public void destroy(){}
276 
277     /* ------------------------------------------------------------ */
278     /**
279      * Get the (short) amount of time (in milliseconds) that the filter would wait
280      * for the semaphore to become available before suspending a request.
281      *
282      * @return wait time (in milliseconds)
283      */
284     @ManagedAttribute("(short) amount of time filter will wait before suspending request (in ms)")
285     public long getWaitMs()
286     {
287         return _waitMs;
288     }
289 
290     /* ------------------------------------------------------------ */
291     /**
292      * Set the (short) amount of time (in milliseconds) that the filter would wait
293      * for the semaphore to become available before suspending a request.
294      *
295      * @param value wait time (in milliseconds)
296      */
297     public void setWaitMs(long value)
298     {
299         _waitMs = value;
300     }
301 
302     /* ------------------------------------------------------------ */
303     /**
304      * Get the amount of time (in milliseconds) that the filter would suspend
305      * a request for while waiting for the semaphore to become available.
306      *
307      * @return suspend time (in milliseconds)
308      */
309     @ManagedAttribute("amount of time filter will suspend a request for while waiting for the semaphore to become available (in ms)")
310     public long getSuspendMs()
311     {
312         return _suspendMs;
313     }
314 
315     /* ------------------------------------------------------------ */
316     /**
317      * Set the amount of time (in milliseconds) that the filter would suspend
318      * a request for while waiting for the semaphore to become available.
319      *
320      * @param value suspend time (in milliseconds)
321      */
322     public void setSuspendMs(long value)
323     {
324         _suspendMs = value;
325     }
326 
327     /* ------------------------------------------------------------ */
328     /**
329      * Get the maximum number of requests allowed to be processed
330      * at the same time.
331      *
332      * @return maximum number of requests
333      */
334     @ManagedAttribute("maximum number of requests to allow processing of at the same time")
335     public int getMaxRequests()
336     {
337         return _maxRequests;
338     }
339 
340     /* ------------------------------------------------------------ */
341     /**
342      * Set the maximum number of requests allowed to be processed
343      * at the same time.
344      *
345      * @param value the number of requests
346      */
347     public void setMaxRequests(int value)
348     {
349         _passes = new Semaphore((value-_maxRequests+_passes.availablePermits()), true);
350         _maxRequests = value;
351     }
352 
353 }