View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.servlets;
20  
21  import java.io.IOException;
22  import java.util.Queue;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.Semaphore;
25  import java.util.concurrent.TimeUnit;
26  
27  import javax.servlet.Filter;
28  import javax.servlet.FilterChain;
29  import javax.servlet.FilterConfig;
30  import javax.servlet.ServletContext;
31  import javax.servlet.ServletException;
32  import javax.servlet.ServletRequest;
33  import javax.servlet.ServletResponse;
34  import javax.servlet.http.HttpServletRequest;
35  import javax.servlet.http.HttpServletResponse;
36  import javax.servlet.http.HttpSession;
37  
38  import org.eclipse.jetty.continuation.Continuation;
39  import org.eclipse.jetty.continuation.ContinuationListener;
40  import org.eclipse.jetty.continuation.ContinuationSupport;
41  import org.eclipse.jetty.server.handler.ContextHandler;
42  
43  /**
44   * Quality of Service Filter.
45   * 
46   * This filter limits the number of active requests to the number set by the "maxRequests" init parameter (default 10).
47   * If more requests are received, they are suspended and placed on priority queues.  Priorities are determined by 
48   * the {@link #getPriority(ServletRequest)} method and are a value between 0 and the value given by the "maxPriority" 
49   * init parameter (default 10), with higher values having higher priority.
50   * </p><p>
51   * This filter is ideal to prevent wasting threads waiting for slow/limited 
52   * resources such as a JDBC connection pool.  It avoids the situation where all of a 
53   * containers thread pool may be consumed blocking on such a slow resource.
54   * By limiting the number of active threads, a smaller thread pool may be used as 
55   * the threads are not wasted waiting.  Thus more memory may be available for use by 
56   * the active threads.
57   * </p><p>
58   * Furthermore, this filter uses a priority when resuming waiting requests. So that if
59   * a container is under load, and there are many requests waiting for resources,
60   * the {@link #getPriority(ServletRequest)} method is used, so that more important 
61   * requests are serviced first.     For example, this filter could be deployed with a 
62   * maxRequest limit slightly smaller than the containers thread pool and a high priority 
63   * allocated to admin users.  Thus regardless of load, admin users would always be
64   * able to access the web application.
65   * </p><p>
66   * The maxRequest limit is policed by a {@link Semaphore} and the filter will wait a short while attempting to acquire
67   * the semaphore. This wait is controlled by the "waitMs" init parameter and allows the expense of a suspend to be
68   * avoided if the semaphore is shortly available.  If the semaphore cannot be obtained, the request will be suspended
69   * for the default suspend period of the container or the valued set as the "suspendMs" init parameter.
70   * </p><p>
71   * If the "managedAttr" init parameter is set to true, then this servlet is set as a {@link ServletContext} attribute with the 
72   * filter name as the attribute name.  This allows context external mechanism (eg JMX via {@link ContextHandler#MANAGED_ATTRIBUTES}) to
73   * manage the configuration of the filter.
74   * </p>
75   * 
76   *
77   */
78  public class QoSFilter implements Filter
79  {
80      final static int __DEFAULT_MAX_PRIORITY=10;
81      final static int __DEFAULT_PASSES=10;
82      final static int __DEFAULT_WAIT_MS=50;
83      final static long __DEFAULT_TIMEOUT_MS = -1;
84      
85      final static String MANAGED_ATTR_INIT_PARAM="managedAttr";
86      final static String MAX_REQUESTS_INIT_PARAM="maxRequests";
87      final static String MAX_PRIORITY_INIT_PARAM="maxPriority";
88      final static String MAX_WAIT_INIT_PARAM="waitMs";
89      final static String SUSPEND_INIT_PARAM="suspendMs";
90      
91      ServletContext _context;
92  
93      protected long _waitMs;
94      protected long _suspendMs;
95      protected int _maxRequests;
96      
97      private Semaphore _passes;
98      private Queue<Continuation>[] _queue;
99      private ContinuationListener[] _listener;
100     private String _suspended="QoSFilter@"+this.hashCode();
101     
102     /* ------------------------------------------------------------ */
103     /**
104      * @see javax.servlet.Filter#init(javax.servlet.FilterConfig)
105      */
106     public void init(FilterConfig filterConfig) 
107     {
108         _context=filterConfig.getServletContext();
109 
110         int max_priority=__DEFAULT_MAX_PRIORITY;
111         if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM)!=null)
112             max_priority=Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
113         _queue=new Queue[max_priority+1];
114         _listener = new ContinuationListener[max_priority + 1];
115         for (int p=0;p<_queue.length;p++)
116         {
117             _queue[p]=new ConcurrentLinkedQueue<Continuation>();
118 
119             final int priority=p;
120             _listener[p] = new ContinuationListener()
121             {
122                 public void onComplete(Continuation continuation)
123                 {}
124 
125                 public void onTimeout(Continuation continuation)
126                 {
127                     _queue[priority].remove(continuation);
128                 }
129             };
130         }
131         
132         int maxRequests=__DEFAULT_PASSES;
133         if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM)!=null)
134             maxRequests=Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
135         _passes=new Semaphore(maxRequests,true);
136         _maxRequests = maxRequests;
137         
138         long wait = __DEFAULT_WAIT_MS;
139         if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM)!=null)
140             wait=Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
141         _waitMs=wait;
142         
143         long suspend = __DEFAULT_TIMEOUT_MS;
144         if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM)!=null)
145             suspend=Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
146         _suspendMs=suspend;
147 
148         if (_context!=null && Boolean.parseBoolean(filterConfig.getInitParameter(MANAGED_ATTR_INIT_PARAM)))
149             _context.setAttribute(filterConfig.getFilterName(),this);
150     }
151     
152     /* ------------------------------------------------------------ */
153     /**
154      * @see javax.servlet.Filter#doFilter(javax.servlet.ServletRequest, javax.servlet.ServletResponse, javax.servlet.FilterChain)
155      */
156     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) 
157     throws IOException, ServletException
158     {
159         boolean accepted=false;
160         try
161         {
162             if (request.getAttribute(_suspended)==null)
163             {
164                 accepted=_passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
165                 if (accepted)
166                 {
167                     request.setAttribute(_suspended,Boolean.FALSE);
168                 }
169                 else
170                 {
171                     request.setAttribute(_suspended,Boolean.TRUE);
172                     int priority = getPriority(request);
173                     Continuation continuation = ContinuationSupport.getContinuation(request);
174                     if (_suspendMs>0)
175                         continuation.setTimeout(_suspendMs);
176                     continuation.suspend();
177                     continuation.addContinuationListener(_listener[priority]);
178                     _queue[priority].add(continuation);
179                     return;
180                 }
181             }
182             else
183             {
184                 Boolean suspended=(Boolean)request.getAttribute(_suspended);
185                 
186                 if (suspended.booleanValue())
187                 {
188                     request.setAttribute(_suspended,Boolean.FALSE);
189                     if (request.getAttribute("javax.servlet.resumed")==Boolean.TRUE)
190                     {
191                         _passes.acquire();
192                         accepted=true;
193                     }
194                     else 
195                     {
196                         // Timeout! try 1 more time.
197                         accepted = _passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
198                     }
199                 }
200                 else
201                 {
202                     // pass through resume of previously accepted request
203                     _passes.acquire();
204                     accepted = true;
205                 }
206             }
207 
208             if (accepted)
209             {
210                 chain.doFilter(request,response);
211             }
212             else
213             {
214                 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
215             }
216         }
217         catch(InterruptedException e)
218         {
219             _context.log("QoS",e);
220             ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
221         }
222         finally
223         {
224             if (accepted)
225             {
226                 for (int p=_queue.length;p-->0;)
227                 {
228                     Continuation continutaion=_queue[p].poll();
229                     if (continutaion!=null && continutaion.isSuspended())
230                     {
231                         continutaion.resume();
232                         break;
233                     }
234                 }
235                 _passes.release();
236             }
237         }
238     }
239 
240     /** 
241      * Get the request Priority.
242      * <p> The default implementation assigns the following priorities:<ul>
243      * <li> 2 - for a authenticated request
244      * <li> 1 - for a request with valid /non new session 
245      * <li> 0 - for all other requests.
246      * </ul>
247      * This method may be specialised to provide application specific priorities.
248      * 
249      * @param request
250      * @return the request priority
251      */
252     protected int getPriority(ServletRequest request)
253     {
254         HttpServletRequest baseRequest = (HttpServletRequest)request;
255         if (baseRequest.getUserPrincipal() != null )
256             return 2;
257         else 
258         {
259             HttpSession session = baseRequest.getSession(false);
260             if (session!=null && !session.isNew()) 
261                 return 1;
262             else
263                 return 0;
264         }
265     }
266 
267 
268     /* ------------------------------------------------------------ */
269     /**
270      * @see javax.servlet.Filter#destroy()
271      */
272     public void destroy(){}
273 
274     /* ------------------------------------------------------------ */
275     /** 
276      * Get the (short) amount of time (in milliseconds) that the filter would wait
277      * for the semaphore to become available before suspending a request.
278      * 
279      * @return wait time (in milliseconds)
280      */
281     public long getWaitMs()
282     {
283         return _waitMs;
284     }
285 
286     /* ------------------------------------------------------------ */
287     /**
288      * Set the (short) amount of time (in milliseconds) that the filter would wait
289      * for the semaphore to become available before suspending a request.
290      * 
291      * @param value wait time (in milliseconds)
292      */
293     public void setWaitMs(long value)
294     {
295         _waitMs = value;
296     }
297 
298     /* ------------------------------------------------------------ */
299     /**
300      * Get the amount of time (in milliseconds) that the filter would suspend
301      * a request for while waiting for the semaphore to become available.
302      * 
303      * @return suspend time (in milliseconds)
304      */
305     public long getSuspendMs()
306     {
307         return _suspendMs;
308     }
309 
310     /* ------------------------------------------------------------ */
311     /**
312      * Set the amount of time (in milliseconds) that the filter would suspend
313      * a request for while waiting for the semaphore to become available.
314      * 
315      * @param value suspend time (in milliseconds)
316      */
317     public void setSuspendMs(long value)
318     {
319         _suspendMs = value;
320     }
321 
322     /* ------------------------------------------------------------ */
323     /**
324      * Get the maximum number of requests allowed to be processed
325      * at the same time.
326      * 
327      * @return maximum number of requests
328      */
329     public int getMaxRequests()
330     {
331         return _maxRequests;
332     }
333 
334     /* ------------------------------------------------------------ */
335     /**
336      * Set the maximum number of requests allowed to be processed
337      * at the same time.
338      * 
339      * @param value the number of requests
340      */
341     public void setMaxRequests(int value)
342     {
343         _passes = new Semaphore((value-_maxRequests+_passes.availablePermits()), true);
344         _maxRequests = value;
345     }
346 
347 }