View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.servlets;
20  
21  import java.io.IOException;
22  import java.util.Queue;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.Semaphore;
25  import java.util.concurrent.TimeUnit;
26  import javax.servlet.AsyncContext;
27  import javax.servlet.AsyncEvent;
28  import javax.servlet.AsyncListener;
29  import javax.servlet.Filter;
30  import javax.servlet.FilterChain;
31  import javax.servlet.FilterConfig;
32  import javax.servlet.ServletContext;
33  import javax.servlet.ServletException;
34  import javax.servlet.ServletRequest;
35  import javax.servlet.ServletResponse;
36  import javax.servlet.http.HttpServletRequest;
37  import javax.servlet.http.HttpServletResponse;
38  import javax.servlet.http.HttpSession;
39  
40  import org.eclipse.jetty.server.handler.ContextHandler;
41  import org.eclipse.jetty.util.annotation.ManagedAttribute;
42  import org.eclipse.jetty.util.annotation.ManagedObject;
43  import org.eclipse.jetty.util.log.Log;
44  import org.eclipse.jetty.util.log.Logger;
45  
46  /**
47   * Quality of Service Filter.
48   * <p/>
49   * This filter limits the number of active requests to the number set by the "maxRequests" init parameter (default 10).
50   * If more requests are received, they are suspended and placed on priority queues.  Priorities are determined by
51   * the {@link #getPriority(ServletRequest)} method and are a value between 0 and the value given by the "maxPriority"
52   * init parameter (default 10), with higher values having higher priority.
53   * <p/>
54   * This filter is ideal to prevent wasting threads waiting for slow/limited
55   * resources such as a JDBC connection pool.  It avoids the situation where all of a
56   * containers thread pool may be consumed blocking on such a slow resource.
57   * By limiting the number of active threads, a smaller thread pool may be used as
58   * the threads are not wasted waiting.  Thus more memory may be available for use by
59   * the active threads.
60   * <p/>
61   * Furthermore, this filter uses a priority when resuming waiting requests. So that if
62   * a container is under load, and there are many requests waiting for resources,
63   * the {@link #getPriority(ServletRequest)} method is used, so that more important
64   * requests are serviced first.     For example, this filter could be deployed with a
65   * maxRequest limit slightly smaller than the containers thread pool and a high priority
66   * allocated to admin users.  Thus regardless of load, admin users would always be
67   * able to access the web application.
68   * <p/>
69   * The maxRequest limit is policed by a {@link Semaphore} and the filter will wait a short while attempting to acquire
70   * the semaphore. This wait is controlled by the "waitMs" init parameter and allows the expense of a suspend to be
71   * avoided if the semaphore is shortly available.  If the semaphore cannot be obtained, the request will be suspended
72   * for the default suspend period of the container or the valued set as the "suspendMs" init parameter.
73   * <p/>
74   * If the "managedAttr" init parameter is set to true, then this servlet is set as a {@link ServletContext} attribute with the
75   * filter name as the attribute name.  This allows context external mechanism (eg JMX via {@link ContextHandler#MANAGED_ATTRIBUTES}) to
76   * manage the configuration of the filter.
77   */
78  @ManagedObject("Quality of Service Filter")
79  public class QoSFilter implements Filter
80  {
81      private static final Logger LOG = Log.getLogger(QoSFilter.class);
82  
83      static final int __DEFAULT_MAX_PRIORITY = 10;
84      static final int __DEFAULT_PASSES = 10;
85      static final int __DEFAULT_WAIT_MS = 50;
86      static final long __DEFAULT_TIMEOUT_MS = -1;
87  
88      static final String MANAGED_ATTR_INIT_PARAM = "managedAttr";
89      static final String MAX_REQUESTS_INIT_PARAM = "maxRequests";
90      static final String MAX_PRIORITY_INIT_PARAM = "maxPriority";
91      static final String MAX_WAIT_INIT_PARAM = "waitMs";
92      static final String SUSPEND_INIT_PARAM = "suspendMs";
93  
94      private final String _suspended = "QoSFilter@" + Integer.toHexString(hashCode()) + ".SUSPENDED";
95      private final String _resumed = "QoSFilter@" + Integer.toHexString(hashCode()) + ".RESUMED";
96      private long _waitMs;
97      private long _suspendMs;
98      private int _maxRequests;
99      private Semaphore _passes;
100     private Queue<AsyncContext>[] _queues;
101     private AsyncListener[] _listeners;
102 
103     public void init(FilterConfig filterConfig)
104     {
105         int max_priority = __DEFAULT_MAX_PRIORITY;
106         if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM) != null)
107             max_priority = Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
108         _queues = new Queue[max_priority + 1];
109         _listeners = new AsyncListener[_queues.length];
110         for (int p = 0; p < _queues.length; ++p)
111         {
112             _queues[p] = new ConcurrentLinkedQueue<>();
113             _listeners[p] = new QoSAsyncListener(p);
114         }
115 
116         int maxRequests = __DEFAULT_PASSES;
117         if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM) != null)
118             maxRequests = Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
119         _passes = new Semaphore(maxRequests, true);
120         _maxRequests = maxRequests;
121 
122         long wait = __DEFAULT_WAIT_MS;
123         if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM) != null)
124             wait = Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
125         _waitMs = wait;
126 
127         long suspend = __DEFAULT_TIMEOUT_MS;
128         if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM) != null)
129             suspend = Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
130         _suspendMs = suspend;
131 
132         ServletContext context = filterConfig.getServletContext();
133         if (context != null && Boolean.parseBoolean(filterConfig.getInitParameter(MANAGED_ATTR_INIT_PARAM)))
134             context.setAttribute(filterConfig.getFilterName(), this);
135     }
136 
137     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException
138     {
139         boolean accepted = false;
140         try
141         {
142             Boolean suspended = (Boolean)request.getAttribute(_suspended);
143             if (suspended == null)
144             {
145                 accepted = _passes.tryAcquire(getWaitMs(), TimeUnit.MILLISECONDS);
146                 if (accepted)
147                 {
148                     request.setAttribute(_suspended, Boolean.FALSE);
149                     if (LOG.isDebugEnabled())
150                         LOG.debug("Accepted {}", request);
151                 }
152                 else
153                 {
154                     request.setAttribute(_suspended, Boolean.TRUE);
155                     int priority = getPriority(request);
156                     AsyncContext asyncContext = request.startAsync();
157                     long suspendMs = getSuspendMs();
158                     if (suspendMs > 0)
159                         asyncContext.setTimeout(suspendMs);
160                     asyncContext.addListener(_listeners[priority]);
161                     _queues[priority].add(asyncContext);
162                     if (LOG.isDebugEnabled())
163                         LOG.debug("Suspended {}", request);
164                     return;
165                 }
166             }
167             else
168             {
169                 if (suspended)
170                 {
171                     request.setAttribute(_suspended, Boolean.FALSE);
172                     Boolean resumed = (Boolean)request.getAttribute(_resumed);
173                     if (resumed == Boolean.TRUE)
174                     {
175                         _passes.acquire();
176                         accepted = true;
177                         if (LOG.isDebugEnabled())
178                             LOG.debug("Resumed {}", request);
179                     }
180                     else
181                     {
182                         // Timeout! try 1 more time.
183                         accepted = _passes.tryAcquire(getWaitMs(), TimeUnit.MILLISECONDS);
184                         if (LOG.isDebugEnabled())
185                             LOG.debug("Timeout {}", request);
186                     }
187                 }
188                 else
189                 {
190                     // Pass through resume of previously accepted request.
191                     _passes.acquire();
192                     accepted = true;
193                     if (LOG.isDebugEnabled())
194                         LOG.debug("Passthrough {}", request);
195                 }
196             }
197 
198             if (accepted)
199             {
200                 chain.doFilter(request, response);
201             }
202             else
203             {
204                 if (LOG.isDebugEnabled())
205                     LOG.debug("Rejected {}", request);
206                 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
207             }
208         }
209         catch (InterruptedException e)
210         {
211             ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
212         }
213         finally
214         {
215             if (accepted)
216             {
217                 for (int p = _queues.length - 1; p >= 0; --p)
218                 {
219                     AsyncContext asyncContext = _queues[p].poll();
220                     if (asyncContext != null)
221                     {
222                         ServletRequest candidate = asyncContext.getRequest();
223                         Boolean suspended = (Boolean)candidate.getAttribute(_suspended);
224                         if (suspended == Boolean.TRUE)
225                         {
226                             candidate.setAttribute(_resumed, Boolean.TRUE);
227                             asyncContext.dispatch();
228                             break;
229                         }
230                     }
231                 }
232                 _passes.release();
233             }
234         }
235     }
236 
237     /**
238      * Computes the request priority.
239      * <p/>
240      * The default implementation assigns the following priorities:
241      * <ul>
242      * <li> 2 - for an authenticated request
243      * <li> 1 - for a request with valid / non new session
244      * <li> 0 - for all other requests.
245      * </ul>
246      * This method may be overridden to provide application specific priorities.
247      *
248      * @param request the incoming request
249      * @return the computed request priority
250      */
251     protected int getPriority(ServletRequest request)
252     {
253         HttpServletRequest baseRequest = (HttpServletRequest)request;
254         if (baseRequest.getUserPrincipal() != null)
255         {
256             return 2;
257         }
258         else
259         {
260             HttpSession session = baseRequest.getSession(false);
261             if (session != null && !session.isNew())
262                 return 1;
263             else
264                 return 0;
265         }
266     }
267 
268     public void destroy()
269     {
270     }
271 
272     /**
273      * Get the (short) amount of time (in milliseconds) that the filter would wait
274      * for the semaphore to become available before suspending a request.
275      *
276      * @return wait time (in milliseconds)
277      */
278     @ManagedAttribute("(short) amount of time filter will wait before suspending request (in ms)")
279     public long getWaitMs()
280     {
281         return _waitMs;
282     }
283 
284     /**
285      * Set the (short) amount of time (in milliseconds) that the filter would wait
286      * for the semaphore to become available before suspending a request.
287      *
288      * @param value wait time (in milliseconds)
289      */
290     public void setWaitMs(long value)
291     {
292         _waitMs = value;
293     }
294 
295     /**
296      * Get the amount of time (in milliseconds) that the filter would suspend
297      * a request for while waiting for the semaphore to become available.
298      *
299      * @return suspend time (in milliseconds)
300      */
301     @ManagedAttribute("amount of time filter will suspend a request for while waiting for the semaphore to become available (in ms)")
302     public long getSuspendMs()
303     {
304         return _suspendMs;
305     }
306 
307     /**
308      * Set the amount of time (in milliseconds) that the filter would suspend
309      * a request for while waiting for the semaphore to become available.
310      *
311      * @param value suspend time (in milliseconds)
312      */
313     public void setSuspendMs(long value)
314     {
315         _suspendMs = value;
316     }
317 
318     /**
319      * Get the maximum number of requests allowed to be processed
320      * at the same time.
321      *
322      * @return maximum number of requests
323      */
324     @ManagedAttribute("maximum number of requests to allow processing of at the same time")
325     public int getMaxRequests()
326     {
327         return _maxRequests;
328     }
329 
330     /**
331      * Set the maximum number of requests allowed to be processed
332      * at the same time.
333      *
334      * @param value the number of requests
335      */
336     public void setMaxRequests(int value)
337     {
338         _passes = new Semaphore((value - getMaxRequests() + _passes.availablePermits()), true);
339         _maxRequests = value;
340     }
341 
342     private class QoSAsyncListener implements AsyncListener
343     {
344         private final int priority;
345 
346         public QoSAsyncListener(int priority)
347         {
348             this.priority = priority;
349         }
350 
351         @Override
352         public void onStartAsync(AsyncEvent event) throws IOException
353         {
354         }
355 
356         @Override
357         public void onComplete(AsyncEvent event) throws IOException
358         {
359         }
360 
361         @Override
362         public void onTimeout(AsyncEvent event) throws IOException
363         {
364             // Remove before it's redispatched, so it won't be
365             // redispatched again at the end of the filtering.
366             AsyncContext asyncContext = event.getAsyncContext();
367             _queues[priority].remove(asyncContext);
368             asyncContext.dispatch();
369         }
370 
371         @Override
372         public void onError(AsyncEvent event) throws IOException
373         {
374         }
375     }
376 }