1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.servlets;
15
16 import java.io.IOException;
17 import java.util.Queue;
18 import java.util.concurrent.Semaphore;
19 import java.util.concurrent.TimeUnit;
20
21 import javax.servlet.Filter;
22 import javax.servlet.FilterChain;
23 import javax.servlet.FilterConfig;
24 import javax.servlet.ServletContext;
25 import javax.servlet.ServletException;
26 import javax.servlet.ServletRequest;
27 import javax.servlet.ServletResponse;
28 import javax.servlet.http.HttpServletRequest;
29 import javax.servlet.http.HttpServletResponse;
30 import javax.servlet.http.HttpSession;
31
32 import org.eclipse.jetty.continuation.Continuation;
33 import org.eclipse.jetty.continuation.ContinuationSupport;
34 import org.eclipse.jetty.util.ArrayQueue;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 public class QoSFilter implements Filter
67 {
68 final static int __DEFAULT_MAX_PRIORITY=10;
69 final static int __DEFAULT_PASSES=10;
70 final static int __DEFAULT_WAIT_MS=50;
71 final static long __DEFAULT_TIMEOUT_MS = -1;
72
73 final static String MAX_REQUESTS_INIT_PARAM="maxRequests";
74 final static String MAX_PRIORITY_INIT_PARAM="maxPriority";
75 final static String MAX_WAIT_INIT_PARAM="waitMs";
76 final static String SUSPEND_INIT_PARAM="suspendMs";
77
78 ServletContext _context;
79 long _waitMs;
80 long _suspendMs;
81 Semaphore _passes;
82 Queue<Continuation>[] _queue;
83 String _suspended="QoSFilter@"+this.hashCode();
84
85 public void init(FilterConfig filterConfig)
86 {
87 _context=filterConfig.getServletContext();
88
89 int max_priority=__DEFAULT_MAX_PRIORITY;
90 if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM)!=null)
91 max_priority=Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
92 _queue=new Queue[max_priority+1];
93 for (int p=0;p<_queue.length;p++)
94 _queue[p]=new ArrayQueue<Continuation>();
95
96 int passes=__DEFAULT_PASSES;
97 if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM)!=null)
98 passes=Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
99 _passes=new Semaphore(passes,true);
100
101 long wait = __DEFAULT_WAIT_MS;
102 if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM)!=null)
103 wait=Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
104 _waitMs=wait;
105
106 long suspend = __DEFAULT_TIMEOUT_MS;
107 if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM)!=null)
108 suspend=Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
109 _suspendMs=suspend;
110 }
111
112 public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
113 throws IOException, ServletException
114 {
115 boolean accepted=false;
116 try
117 {
118 if (request.getAttribute(_suspended)==null)
119 {
120 accepted=_passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
121 if (accepted)
122 {
123 request.setAttribute(_suspended,Boolean.FALSE);
124 }
125 else
126 {
127 request.setAttribute(_suspended,Boolean.TRUE);
128 Continuation continuation = ContinuationSupport.getContinuation(request,response);
129 if (_suspendMs>0)
130 continuation.setTimeout(_suspendMs);
131 continuation.suspend();
132
133 int priority = getPriority(request);
134 _queue[priority].add(continuation);
135 return;
136 }
137 }
138 else
139 {
140 Boolean suspended=(Boolean)request.getAttribute(_suspended);
141
142 if (suspended.booleanValue())
143 {
144 request.setAttribute(_suspended,Boolean.FALSE);
145 if (request.getAttribute("javax.servlet.resumed")==Boolean.TRUE)
146 {
147 _passes.acquire();
148 accepted=true;
149 }
150 else
151 {
152
153 accepted = _passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
154 }
155 }
156 else
157 {
158
159 _passes.acquire();
160 accepted = true;
161 }
162 }
163
164
165 if (accepted)
166 {
167 chain.doFilter(request,response);
168 }
169 else
170 {
171 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
172 }
173
174
175
176 }
177 catch(InterruptedException e)
178 {
179 _context.log("QoS",e);
180 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
181 }
182 finally
183 {
184 if (accepted)
185 {
186 for (int p=_queue.length;p-->0;)
187 {
188 Continuation continutaion=_queue[p].poll();
189 if (continutaion!=null)
190 {
191 continutaion.resume();
192 break;
193 }
194 }
195 _passes.release();
196 }
197 }
198 }
199
200
201
202
203
204
205
206
207
208
209
210
211
212 protected int getPriority(ServletRequest request)
213 {
214 HttpServletRequest baseRequest = (HttpServletRequest)request;
215 if (baseRequest.getUserPrincipal() != null )
216 return 2;
217 else
218 {
219 HttpSession session = baseRequest.getSession(false);
220 if (session!=null && !session.isNew())
221 return 1;
222 else
223 return 0;
224 }
225 }
226
227
228 public void destroy(){}
229
230 }