View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.servlets;
20  
21  import java.io.IOException;
22  import java.util.Queue;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.Semaphore;
25  import java.util.concurrent.TimeUnit;
26  
27  import javax.servlet.AsyncContext;
28  import javax.servlet.AsyncEvent;
29  import javax.servlet.AsyncListener;
30  import javax.servlet.Filter;
31  import javax.servlet.FilterChain;
32  import javax.servlet.FilterConfig;
33  import javax.servlet.ServletContext;
34  import javax.servlet.ServletException;
35  import javax.servlet.ServletRequest;
36  import javax.servlet.ServletResponse;
37  import javax.servlet.http.HttpServletRequest;
38  import javax.servlet.http.HttpServletResponse;
39  import javax.servlet.http.HttpSession;
40  
41  import org.eclipse.jetty.server.handler.ContextHandler;
42  import org.eclipse.jetty.util.annotation.ManagedAttribute;
43  import org.eclipse.jetty.util.annotation.ManagedObject;
44  import org.eclipse.jetty.util.log.Log;
45  import org.eclipse.jetty.util.log.Logger;
46  
47  /**
48   * Quality of Service Filter.
49   * <p>
50   * This filter limits the number of active requests to the number set by the "maxRequests" init parameter (default 10).
51   * If more requests are received, they are suspended and placed on priority queues.  Priorities are determined by
52   * the {@link #getPriority(ServletRequest)} method and are a value between 0 and the value given by the "maxPriority"
53   * init parameter (default 10), with higher values having higher priority.
54   * <p>
55   * This filter is ideal to prevent wasting threads waiting for slow/limited
56   * resources such as a JDBC connection pool.  It avoids the situation where all of a
57   * containers thread pool may be consumed blocking on such a slow resource.
58   * By limiting the number of active threads, a smaller thread pool may be used as
59   * the threads are not wasted waiting.  Thus more memory may be available for use by
60   * the active threads.
61   * <p>
62   * Furthermore, this filter uses a priority when resuming waiting requests. So that if
63   * a container is under load, and there are many requests waiting for resources,
64   * the {@link #getPriority(ServletRequest)} method is used, so that more important
65   * requests are serviced first.     For example, this filter could be deployed with a
66   * maxRequest limit slightly smaller than the containers thread pool and a high priority
67   * allocated to admin users.  Thus regardless of load, admin users would always be
68   * able to access the web application.
69   * <p>
70   * The maxRequest limit is policed by a {@link Semaphore} and the filter will wait a short while attempting to acquire
71   * the semaphore. This wait is controlled by the "waitMs" init parameter and allows the expense of a suspend to be
72   * avoided if the semaphore is shortly available.  If the semaphore cannot be obtained, the request will be suspended
73   * for the default suspend period of the container or the valued set as the "suspendMs" init parameter.
74   * <p>
75   * If the "managedAttr" init parameter is set to true, then this servlet is set as a {@link ServletContext} attribute with the
76   * filter name as the attribute name.  This allows context external mechanism (eg JMX via {@link ContextHandler#MANAGED_ATTRIBUTES}) to
77   * manage the configuration of the filter.
78   */
79  @ManagedObject("Quality of Service Filter")
80  public class QoSFilter implements Filter
81  {
82      private static final Logger LOG = Log.getLogger(QoSFilter.class);
83  
84      static final int __DEFAULT_MAX_PRIORITY = 10;
85      static final int __DEFAULT_PASSES = 10;
86      static final int __DEFAULT_WAIT_MS = 50;
87      static final long __DEFAULT_TIMEOUT_MS = -1;
88  
89      static final String MANAGED_ATTR_INIT_PARAM = "managedAttr";
90      static final String MAX_REQUESTS_INIT_PARAM = "maxRequests";
91      static final String MAX_PRIORITY_INIT_PARAM = "maxPriority";
92      static final String MAX_WAIT_INIT_PARAM = "waitMs";
93      static final String SUSPEND_INIT_PARAM = "suspendMs";
94  
95      private final String _suspended = "QoSFilter@" + Integer.toHexString(hashCode()) + ".SUSPENDED";
96      private final String _resumed = "QoSFilter@" + Integer.toHexString(hashCode()) + ".RESUMED";
97      private long _waitMs;
98      private long _suspendMs;
99      private int _maxRequests;
100     private Semaphore _passes;
101     private Queue<AsyncContext>[] _queues;
102     private AsyncListener[] _listeners;
103 
104     public void init(FilterConfig filterConfig)
105     {
106         int max_priority = __DEFAULT_MAX_PRIORITY;
107         if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM) != null)
108             max_priority = Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
109         _queues = new Queue[max_priority + 1];
110         _listeners = new AsyncListener[_queues.length];
111         for (int p = 0; p < _queues.length; ++p)
112         {
113             _queues[p] = new ConcurrentLinkedQueue<>();
114             _listeners[p] = new QoSAsyncListener(p);
115         }
116 
117         int maxRequests = __DEFAULT_PASSES;
118         if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM) != null)
119             maxRequests = Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
120         _passes = new Semaphore(maxRequests, true);
121         _maxRequests = maxRequests;
122 
123         long wait = __DEFAULT_WAIT_MS;
124         if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM) != null)
125             wait = Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
126         _waitMs = wait;
127 
128         long suspend = __DEFAULT_TIMEOUT_MS;
129         if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM) != null)
130             suspend = Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
131         _suspendMs = suspend;
132 
133         ServletContext context = filterConfig.getServletContext();
134         if (context != null && Boolean.parseBoolean(filterConfig.getInitParameter(MANAGED_ATTR_INIT_PARAM)))
135             context.setAttribute(filterConfig.getFilterName(), this);
136     }
137 
138     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException
139     {
140         boolean accepted = false;
141         try
142         {
143             Boolean suspended = (Boolean)request.getAttribute(_suspended);
144             if (suspended == null)
145             {
146                 accepted = _passes.tryAcquire(getWaitMs(), TimeUnit.MILLISECONDS);
147                 if (accepted)
148                 {
149                     request.setAttribute(_suspended, Boolean.FALSE);
150                     if (LOG.isDebugEnabled())
151                         LOG.debug("Accepted {}", request);
152                 }
153                 else
154                 {
155                     request.setAttribute(_suspended, Boolean.TRUE);
156                     int priority = getPriority(request);
157                     AsyncContext asyncContext = request.startAsync();
158                     long suspendMs = getSuspendMs();
159                     if (suspendMs > 0)
160                         asyncContext.setTimeout(suspendMs);
161                     asyncContext.addListener(_listeners[priority]);
162                     _queues[priority].add(asyncContext);
163                     if (LOG.isDebugEnabled())
164                         LOG.debug("Suspended {}", request);
165                     return;
166                 }
167             }
168             else
169             {
170                 if (suspended)
171                 {
172                     request.setAttribute(_suspended, Boolean.FALSE);
173                     Boolean resumed = (Boolean)request.getAttribute(_resumed);
174                     if (resumed == Boolean.TRUE)
175                     {
176                         _passes.acquire();
177                         accepted = true;
178                         if (LOG.isDebugEnabled())
179                             LOG.debug("Resumed {}", request);
180                     }
181                     else
182                     {
183                         // Timeout! try 1 more time.
184                         accepted = _passes.tryAcquire(getWaitMs(), TimeUnit.MILLISECONDS);
185                         if (LOG.isDebugEnabled())
186                             LOG.debug("Timeout {}", request);
187                     }
188                 }
189                 else
190                 {
191                     // Pass through resume of previously accepted request.
192                     _passes.acquire();
193                     accepted = true;
194                     if (LOG.isDebugEnabled())
195                         LOG.debug("Passthrough {}", request);
196                 }
197             }
198 
199             if (accepted)
200             {
201                 chain.doFilter(request, response);
202             }
203             else
204             {
205                 if (LOG.isDebugEnabled())
206                     LOG.debug("Rejected {}", request);
207                 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
208             }
209         }
210         catch (InterruptedException e)
211         {
212             ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
213         }
214         finally
215         {
216             if (accepted)
217             {
218                 for (int p = _queues.length - 1; p >= 0; --p)
219                 {
220                     AsyncContext asyncContext = _queues[p].poll();
221                     if (asyncContext != null)
222                     {
223                         ServletRequest candidate = asyncContext.getRequest();
224                         Boolean suspended = (Boolean)candidate.getAttribute(_suspended);
225                         if (suspended == Boolean.TRUE)
226                         {
227                             candidate.setAttribute(_resumed, Boolean.TRUE);
228                             asyncContext.dispatch();
229                             break;
230                         }
231                     }
232                 }
233                 _passes.release();
234             }
235         }
236     }
237 
238     /**
239      * Computes the request priority.
240      * <p>
241      * The default implementation assigns the following priorities:
242      * <ul>
243      * <li> 2 - for an authenticated request
244      * <li> 1 - for a request with valid / non new session
245      * <li> 0 - for all other requests.
246      * </ul>
247      * This method may be overridden to provide application specific priorities.
248      *
249      * @param request the incoming request
250      * @return the computed request priority
251      */
252     protected int getPriority(ServletRequest request)
253     {
254         HttpServletRequest baseRequest = (HttpServletRequest)request;
255         if (baseRequest.getUserPrincipal() != null)
256         {
257             return 2;
258         }
259         else
260         {
261             HttpSession session = baseRequest.getSession(false);
262             if (session != null && !session.isNew())
263                 return 1;
264             else
265                 return 0;
266         }
267     }
268 
269     public void destroy()
270     {
271     }
272 
273     /**
274      * Get the (short) amount of time (in milliseconds) that the filter would wait
275      * for the semaphore to become available before suspending a request.
276      *
277      * @return wait time (in milliseconds)
278      */
279     @ManagedAttribute("(short) amount of time filter will wait before suspending request (in ms)")
280     public long getWaitMs()
281     {
282         return _waitMs;
283     }
284 
285     /**
286      * Set the (short) amount of time (in milliseconds) that the filter would wait
287      * for the semaphore to become available before suspending a request.
288      *
289      * @param value wait time (in milliseconds)
290      */
291     public void setWaitMs(long value)
292     {
293         _waitMs = value;
294     }
295 
296     /**
297      * Get the amount of time (in milliseconds) that the filter would suspend
298      * a request for while waiting for the semaphore to become available.
299      *
300      * @return suspend time (in milliseconds)
301      */
302     @ManagedAttribute("amount of time filter will suspend a request for while waiting for the semaphore to become available (in ms)")
303     public long getSuspendMs()
304     {
305         return _suspendMs;
306     }
307 
308     /**
309      * Set the amount of time (in milliseconds) that the filter would suspend
310      * a request for while waiting for the semaphore to become available.
311      *
312      * @param value suspend time (in milliseconds)
313      */
314     public void setSuspendMs(long value)
315     {
316         _suspendMs = value;
317     }
318 
319     /**
320      * Get the maximum number of requests allowed to be processed
321      * at the same time.
322      *
323      * @return maximum number of requests
324      */
325     @ManagedAttribute("maximum number of requests to allow processing of at the same time")
326     public int getMaxRequests()
327     {
328         return _maxRequests;
329     }
330 
331     /**
332      * Set the maximum number of requests allowed to be processed
333      * at the same time.
334      *
335      * @param value the number of requests
336      */
337     public void setMaxRequests(int value)
338     {
339         _passes = new Semaphore((value - getMaxRequests() + _passes.availablePermits()), true);
340         _maxRequests = value;
341     }
342 
343     private class QoSAsyncListener implements AsyncListener
344     {
345         private final int priority;
346 
347         public QoSAsyncListener(int priority)
348         {
349             this.priority = priority;
350         }
351 
352         @Override
353         public void onStartAsync(AsyncEvent event) throws IOException
354         {
355         }
356 
357         @Override
358         public void onComplete(AsyncEvent event) throws IOException
359         {
360         }
361 
362         @Override
363         public void onTimeout(AsyncEvent event) throws IOException
364         {
365             // Remove before it's redispatched, so it won't be
366             // redispatched again at the end of the filtering.
367             AsyncContext asyncContext = event.getAsyncContext();
368             _queues[priority].remove(asyncContext);
369             asyncContext.dispatch();
370         }
371 
372         @Override
373         public void onError(AsyncEvent event) throws IOException
374         {
375         }
376     }
377 }