View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.servlets;
15  
16  import java.io.IOException;
17  import java.util.Queue;
18  import java.util.concurrent.ConcurrentLinkedQueue;
19  import java.util.concurrent.Semaphore;
20  import java.util.concurrent.TimeUnit;
21  
22  import javax.servlet.Filter;
23  import javax.servlet.FilterChain;
24  import javax.servlet.FilterConfig;
25  import javax.servlet.ServletContext;
26  import javax.servlet.ServletException;
27  import javax.servlet.ServletRequest;
28  import javax.servlet.ServletResponse;
29  import javax.servlet.http.HttpServletRequest;
30  import javax.servlet.http.HttpServletResponse;
31  import javax.servlet.http.HttpSession;
32  
33  import org.eclipse.jetty.continuation.Continuation;
34  import org.eclipse.jetty.continuation.ContinuationListener;
35  import org.eclipse.jetty.continuation.ContinuationSupport;
36  
37  /**
38   * Quality of Service Filter.
39   * This filter limits the number of active requests to the number set by the "maxRequests" init parameter (default 10).
40   * If more requests are received, they are suspended and placed on priority queues.  Priorities are determined by 
41   * the {@link #getPriority(ServletRequest)} method and are a value between 0 and the value given by the "maxPriority" 
42   * init parameter (default 10), with higher values having higher priority.
43   * <p>
44   * This filter is ideal to prevent wasting threads waiting for slow/limited 
45   * resources such as a JDBC connection pool.  It avoids the situation where all of a 
46   * containers thread pool may be consumed blocking on such a slow resource.
47   * By limiting the number of active threads, a smaller thread pool may be used as 
48   * the threads are not wasted waiting.  Thus more memory may be available for use by 
49   * the active threads.
50   * <p>
51   * Furthermore, this filter uses a priority when resuming waiting requests. So that if
52   * a container is under load, and there are many requests waiting for resources,
53   * the {@link #getPriority(ServletRequest)} method is used, so that more important 
54   * requests are serviced first.     For example, this filter could be deployed with a 
55   * maxRequest limit slightly smaller than the containers thread pool and a high priority 
56   * allocated to admin users.  Thus regardless of load, admin users would always be
57   * able to access the web application.
58   * <p>
59   * The maxRequest limit is policed by a {@link Semaphore} and the filter will wait a short while attempting to acquire
60   * the semaphore. This wait is controlled by the "waitMs" init parameter and allows the expense of a suspend to be
61   * avoided if the semaphore is shortly available.  If the semaphore cannot be obtained, the request will be suspended
62   * for the default suspend period of the container or the valued set as the "suspendMs" init parameter.
63   * 
64   * 
65   *
66   */
67  public class QoSFilter implements Filter
68  {
69      final static int __DEFAULT_MAX_PRIORITY=10;
70      final static int __DEFAULT_PASSES=10;
71      final static int __DEFAULT_WAIT_MS=50;
72      final static long __DEFAULT_TIMEOUT_MS = -1;
73      
74      final static String MAX_REQUESTS_INIT_PARAM="maxRequests";
75      final static String MAX_PRIORITY_INIT_PARAM="maxPriority";
76      final static String MAX_WAIT_INIT_PARAM="waitMs";
77      final static String SUSPEND_INIT_PARAM="suspendMs";
78      
79      ServletContext _context;
80      long _waitMs;
81      long _suspendMs;
82      Semaphore _passes;
83      Queue<Continuation>[] _queue;
84      ContinuationListener[] _listener;
85      String _suspended="QoSFilter@"+this.hashCode();
86      
87      public void init(FilterConfig filterConfig) 
88      {
89          _context=filterConfig.getServletContext();
90          
91          int max_priority=__DEFAULT_MAX_PRIORITY;
92          if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM)!=null)
93              max_priority=Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
94          _queue=new Queue[max_priority+1];
95          _listener = new ContinuationListener[max_priority + 1];
96          for (int p=0;p<_queue.length;p++)
97          {
98              _queue[p]=new ConcurrentLinkedQueue<Continuation>();
99  
100             final int priority=p;
101             _listener[p] = new ContinuationListener()
102             {
103                 public void onComplete(Continuation continuation)
104                 {}
105 
106                 public void onTimeout(Continuation continuation)
107                 {
108                     _queue[priority].remove(continuation);
109                 }
110             };
111         }
112         
113         int passes=__DEFAULT_PASSES;
114         if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM)!=null)
115             passes=Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
116         _passes=new Semaphore(passes,true);
117         
118         long wait = __DEFAULT_WAIT_MS;
119         if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM)!=null)
120             wait=Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
121         _waitMs=wait;
122         
123         long suspend = __DEFAULT_TIMEOUT_MS;
124         if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM)!=null)
125             suspend=Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
126         _suspendMs=suspend;
127     }
128     
129     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) 
130     throws IOException, ServletException
131     {
132         boolean accepted=false;
133         try
134         {
135             if (request.getAttribute(_suspended)==null)
136             {
137                 accepted=_passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
138                 if (accepted)
139                 {
140                     request.setAttribute(_suspended,Boolean.FALSE);
141                 }
142                 else
143                 {
144                     request.setAttribute(_suspended,Boolean.TRUE);
145                     int priority = getPriority(request);
146                     Continuation continuation = ContinuationSupport.getContinuation(request);
147                     if (_suspendMs>0)
148                         continuation.setTimeout(_suspendMs);
149                     continuation.suspend();
150                     continuation.addContinuationListener(_listener[priority]);
151                     _queue[priority].add(continuation);
152                     return;
153                 }
154             }
155             else
156             {
157                 Boolean suspended=(Boolean)request.getAttribute(_suspended);
158                 
159                 if (suspended.booleanValue())
160                 {
161                     request.setAttribute(_suspended,Boolean.FALSE);
162                     if (request.getAttribute("javax.servlet.resumed")==Boolean.TRUE)
163                     {
164                         _passes.acquire();
165                         accepted=true;
166                     }
167                     else 
168                     {
169                         // Timeout! try 1 more time.
170                         accepted = _passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
171                     }
172                 }
173                 else
174                 {
175                     // pass through resume of previously accepted request
176                     _passes.acquire();
177                     accepted = true;
178                 }
179             }
180 
181             if (accepted)
182             {
183                 chain.doFilter(request,response);
184             }
185             else
186             {
187                 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
188             }
189         }
190         catch(InterruptedException e)
191         {
192             _context.log("QoS",e);
193             ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
194         }
195         finally
196         {
197             if (accepted)
198             {
199                 for (int p=_queue.length;p-->0;)
200                 {
201                     Continuation continutaion=_queue[p].poll();
202                     if (continutaion!=null && continutaion.isSuspended())
203                     {
204                         continutaion.resume();
205                         break;
206                     }
207                 }
208                 _passes.release();
209             }
210         }
211     }
212 
213     /** 
214      * Get the request Priority.
215      * <p> The default implementation assigns the following priorities:<ul>
216      * <li> 2 - for a authenticated request
217      * <li> 1 - for a request with valid /non new session 
218      * <li> 0 - for all other requests.
219      * </ul>
220      * This method may be specialised to provide application specific priorities.
221      * 
222      * @param request
223      * @return the request priority
224      */
225     protected int getPriority(ServletRequest request)
226     {
227         HttpServletRequest baseRequest = (HttpServletRequest)request;
228         if (baseRequest.getUserPrincipal() != null )
229             return 2;
230         else 
231         {
232             HttpSession session = baseRequest.getSession(false);
233             if (session!=null && !session.isNew()) 
234                 return 1;
235             else
236                 return 0;
237         }
238     }
239 
240 
241     public void destroy(){}
242 
243 }