View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.servlets;
15  
16  import java.io.IOException;
17  import java.util.Queue;
18  import java.util.concurrent.Semaphore;
19  import java.util.concurrent.TimeUnit;
20  
21  import javax.servlet.Filter;
22  import javax.servlet.FilterChain;
23  import javax.servlet.FilterConfig;
24  import javax.servlet.ServletContext;
25  import javax.servlet.ServletException;
26  import javax.servlet.ServletRequest;
27  import javax.servlet.ServletResponse;
28  import javax.servlet.http.HttpServletRequest;
29  import javax.servlet.http.HttpServletResponse;
30  import javax.servlet.http.HttpSession;
31  
32  import org.eclipse.jetty.continuation.Continuation;
33  import org.eclipse.jetty.continuation.ContinuationSupport;
34  import org.eclipse.jetty.util.ArrayQueue;
35  
36  /**
37   * Quality of Service Filter.
38   * This filter limits the number of active requests to the number set by the "maxRequests" init parameter (default 10).
39   * If more requests are received, they are suspended and placed on priority queues.  Priorities are determined by 
40   * the {@link #getPriority(ServletRequest)} method and are a value between 0 and the value given by the "maxPriority" 
41   * init parameter (default 10), with higher values having higher priority.
42   * <p>
43   * This filter is ideal to prevent wasting threads waiting for slow/limited 
44   * resources such as a JDBC connection pool.  It avoids the situation where all of a 
45   * containers thread pool may be consumed blocking on such a slow resource.
46   * By limiting the number of active threads, a smaller thread pool may be used as 
47   * the threads are not wasted waiting.  Thus more memory may be available for use by 
48   * the active threads.
49   * <p>
50   * Furthermore, this filter uses a priority when resuming waiting requests. So that if
51   * a container is under load, and there are many requests waiting for resources,
52   * the {@link #getPriority(ServletRequest)} method is used, so that more important 
53   * requests are serviced first.     For example, this filter could be deployed with a 
54   * maxRequest limit slightly smaller than the containers thread pool and a high priority 
55   * allocated to admin users.  Thus regardless of load, admin users would always be
56   * able to access the web application.
57   * <p>
58   * The maxRequest limit is policed by a {@link Semaphore} and the filter will wait a short while attempting to acquire
59   * the semaphore. This wait is controlled by the "waitMs" init parameter and allows the expense of a suspend to be
60   * avoided if the semaphore is shortly available.  If the semaphore cannot be obtained, the request will be suspended
61   * for the default suspend period of the container or the valued set as the "suspendMs" init parameter.
62   * 
63   * 
64   *
65   */
66  public class QoSFilter implements Filter
67  {
68      final static int __DEFAULT_MAX_PRIORITY=10;
69      final static int __DEFAULT_PASSES=10;
70      final static int __DEFAULT_WAIT_MS=50;
71      final static long __DEFAULT_TIMEOUT_MS = -1;
72      
73      final static String MAX_REQUESTS_INIT_PARAM="maxRequests";
74      final static String MAX_PRIORITY_INIT_PARAM="maxPriority";
75      final static String MAX_WAIT_INIT_PARAM="waitMs";
76      final static String SUSPEND_INIT_PARAM="suspendMs";
77      
78      ServletContext _context;
79      long _waitMs;
80      long _suspendMs;
81      Semaphore _passes;
82      Queue<Continuation>[] _queue;
83      String _suspended="QoSFilter@"+this.hashCode();
84      
85      public void init(FilterConfig filterConfig) 
86      {
87          _context=filterConfig.getServletContext();
88          
89          int max_priority=__DEFAULT_MAX_PRIORITY;
90          if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM)!=null)
91              max_priority=Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
92          _queue=new Queue[max_priority+1];
93          for (int p=0;p<_queue.length;p++)
94              _queue[p]=new ArrayQueue<Continuation>();
95          
96          int passes=__DEFAULT_PASSES;
97          if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM)!=null)
98              passes=Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
99          _passes=new Semaphore(passes,true);
100         
101         long wait = __DEFAULT_WAIT_MS;
102         if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM)!=null)
103             wait=Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
104         _waitMs=wait;
105         
106         long suspend = __DEFAULT_TIMEOUT_MS;
107         if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM)!=null)
108             suspend=Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
109         _suspendMs=suspend;
110     }
111     
112     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) 
113     throws IOException, ServletException
114     {
115         boolean accepted=false;
116         try
117         {
118             if (request.getAttribute(_suspended)==null)
119             {
120                 accepted=_passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
121                 if (accepted)
122                 {
123                     request.setAttribute(_suspended,Boolean.FALSE);
124                 }
125                 else
126                 {
127                     request.setAttribute(_suspended,Boolean.TRUE);
128                     Continuation continuation = ContinuationSupport.getContinuation(request,response);
129                     if (_suspendMs>0)
130                         continuation.setTimeout(_suspendMs);
131                     continuation.suspend();
132                     
133                     int priority = getPriority(request);
134                     _queue[priority].add(continuation);
135                     return;
136                 }
137             }
138             else
139             {
140                 Boolean suspended=(Boolean)request.getAttribute(_suspended);
141                 
142                 if (suspended.booleanValue())
143                 {
144                     request.setAttribute(_suspended,Boolean.FALSE);
145                     if (request.getAttribute("javax.servlet.resumed")==Boolean.TRUE)
146                     {
147                         _passes.acquire();
148                         accepted=true;
149                     }
150                     else 
151                     {
152                         // Timeout! try 1 more time.
153                         accepted = _passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
154                     }
155                 }
156                 else
157                 {
158                     // pass through resume of previously accepted request
159                     _passes.acquire();
160                     accepted = true;
161                 }
162             }
163             
164 
165             if (accepted)
166             {
167                 chain.doFilter(request,response);
168             }
169             else
170             {
171                 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
172             }
173             
174             
175             
176         }
177         catch(InterruptedException e)
178         {
179             _context.log("QoS",e);
180             ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
181         }
182         finally
183         {
184             if (accepted)
185             {
186                 for (int p=_queue.length;p-->0;)
187                 {
188                     Continuation continutaion=_queue[p].poll();
189                     if (continutaion!=null)
190                     {
191                         continutaion.resume();
192                         break;
193                     }
194                 }
195                 _passes.release();
196             }
197         }
198     }
199 
200     /** 
201      * Get the request Priority.
202      * <p> The default implementation assigns the following priorities:<ul>
203      * <li> 2 - for a authenticated request
204      * <li> 1 - for a request with valid /non new session 
205      * <li> 0 - for all other requests.
206      * </ul>
207      * This method may be specialised to provide application specific priorities.
208      * 
209      * @param request
210      * @return
211      */
212     protected int getPriority(ServletRequest request)
213     {
214         HttpServletRequest baseRequest = (HttpServletRequest)request;
215         if (baseRequest.getUserPrincipal() != null )
216             return 2;
217         else 
218         {
219             HttpSession session = baseRequest.getSession(false);
220             if (session!=null && !session.isNew()) 
221                 return 1;
222             else
223                 return 0;
224         }
225     }
226 
227 
228     public void destroy(){}
229 
230 }