1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.servlets;
20
21 import java.io.IOException;
22 import java.util.Queue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Semaphore;
25 import java.util.concurrent.TimeUnit;
26
27 import javax.servlet.Filter;
28 import javax.servlet.FilterChain;
29 import javax.servlet.FilterConfig;
30 import javax.servlet.ServletContext;
31 import javax.servlet.ServletException;
32 import javax.servlet.ServletRequest;
33 import javax.servlet.ServletResponse;
34 import javax.servlet.http.HttpServletRequest;
35 import javax.servlet.http.HttpServletResponse;
36 import javax.servlet.http.HttpSession;
37
38 import org.eclipse.jetty.continuation.Continuation;
39 import org.eclipse.jetty.continuation.ContinuationListener;
40 import org.eclipse.jetty.continuation.ContinuationSupport;
41 import org.eclipse.jetty.server.handler.ContextHandler;
42 import org.eclipse.jetty.util.annotation.ManagedAttribute;
43 import org.eclipse.jetty.util.annotation.ManagedObject;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 @ManagedObject("Quality of Service Filter")
81 public class QoSFilter implements Filter
82 {
83 final static int __DEFAULT_MAX_PRIORITY=10;
84 final static int __DEFAULT_PASSES=10;
85 final static int __DEFAULT_WAIT_MS=50;
86 final static long __DEFAULT_TIMEOUT_MS = -1;
87
88 final static String MANAGED_ATTR_INIT_PARAM="managedAttr";
89 final static String MAX_REQUESTS_INIT_PARAM="maxRequests";
90 final static String MAX_PRIORITY_INIT_PARAM="maxPriority";
91 final static String MAX_WAIT_INIT_PARAM="waitMs";
92 final static String SUSPEND_INIT_PARAM="suspendMs";
93
94 ServletContext _context;
95
96 protected long _waitMs;
97 protected long _suspendMs;
98 protected int _maxRequests;
99
100 private Semaphore _passes;
101 private Queue<Continuation>[] _queue;
102 private ContinuationListener[] _listener;
103 private String _suspended="QoSFilter@"+this.hashCode();
104
105
106
107
108
109 public void init(FilterConfig filterConfig)
110 {
111 _context=filterConfig.getServletContext();
112
113 int max_priority=__DEFAULT_MAX_PRIORITY;
114 if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM)!=null)
115 max_priority=Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
116 _queue=new Queue[max_priority+1];
117 _listener = new ContinuationListener[max_priority + 1];
118 for (int p=0;p<_queue.length;p++)
119 {
120 _queue[p]=new ConcurrentLinkedQueue<Continuation>();
121
122 final int priority=p;
123 _listener[p] = new ContinuationListener()
124 {
125 public void onComplete(Continuation continuation)
126 {}
127
128 public void onTimeout(Continuation continuation)
129 {
130 _queue[priority].remove(continuation);
131 }
132 };
133 }
134
135 int maxRequests=__DEFAULT_PASSES;
136 if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM)!=null)
137 maxRequests=Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
138 _passes=new Semaphore(maxRequests,true);
139 _maxRequests = maxRequests;
140
141 long wait = __DEFAULT_WAIT_MS;
142 if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM)!=null)
143 wait=Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
144 _waitMs=wait;
145
146 long suspend = __DEFAULT_TIMEOUT_MS;
147 if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM)!=null)
148 suspend=Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
149 _suspendMs=suspend;
150
151 if (_context!=null && Boolean.parseBoolean(filterConfig.getInitParameter(MANAGED_ATTR_INIT_PARAM)))
152 _context.setAttribute(filterConfig.getFilterName(),this);
153 }
154
155
156
157
158
159 public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
160 throws IOException, ServletException
161 {
162 boolean accepted=false;
163 try
164 {
165 if (request.getAttribute(_suspended)==null)
166 {
167 accepted=_passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
168 if (accepted)
169 {
170 request.setAttribute(_suspended,Boolean.FALSE);
171 }
172 else
173 {
174 request.setAttribute(_suspended,Boolean.TRUE);
175 int priority = getPriority(request);
176 Continuation continuation = ContinuationSupport.getContinuation(request);
177 if (_suspendMs>0)
178 continuation.setTimeout(_suspendMs);
179 continuation.suspend();
180 continuation.addContinuationListener(_listener[priority]);
181 _queue[priority].add(continuation);
182 return;
183 }
184 }
185 else
186 {
187 Boolean suspended=(Boolean)request.getAttribute(_suspended);
188
189 if (suspended.booleanValue())
190 {
191 request.setAttribute(_suspended,Boolean.FALSE);
192 if (request.getAttribute("javax.servlet.resumed")==Boolean.TRUE)
193 {
194 _passes.acquire();
195 accepted=true;
196 }
197 else
198 {
199
200 accepted = _passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
201 }
202 }
203 else
204 {
205
206 _passes.acquire();
207 accepted = true;
208 }
209 }
210
211 if (accepted)
212 {
213 chain.doFilter(request,response);
214 }
215 else
216 {
217 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
218 }
219 }
220 catch(InterruptedException e)
221 {
222 _context.log("QoS",e);
223 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
224 }
225 finally
226 {
227 if (accepted)
228 {
229 for (int p=_queue.length;p-->0;)
230 {
231 Continuation continutaion=_queue[p].poll();
232 if (continutaion!=null && continutaion.isSuspended())
233 {
234 continutaion.resume();
235 break;
236 }
237 }
238 _passes.release();
239 }
240 }
241 }
242
243
244
245
246
247
248
249
250
251
252
253
254
255 protected int getPriority(ServletRequest request)
256 {
257 HttpServletRequest baseRequest = (HttpServletRequest)request;
258 if (baseRequest.getUserPrincipal() != null )
259 return 2;
260 else
261 {
262 HttpSession session = baseRequest.getSession(false);
263 if (session!=null && !session.isNew())
264 return 1;
265 else
266 return 0;
267 }
268 }
269
270
271
272
273
274
275 public void destroy(){}
276
277
278
279
280
281
282
283
284 @ManagedAttribute("(short) amount of time filter will wait before suspending request (in ms)")
285 public long getWaitMs()
286 {
287 return _waitMs;
288 }
289
290
291
292
293
294
295
296
297 public void setWaitMs(long value)
298 {
299 _waitMs = value;
300 }
301
302
303
304
305
306
307
308
309 @ManagedAttribute("amount of time filter will suspend a request for while waiting for the semaphore to become available (in ms)")
310 public long getSuspendMs()
311 {
312 return _suspendMs;
313 }
314
315
316
317
318
319
320
321
322 public void setSuspendMs(long value)
323 {
324 _suspendMs = value;
325 }
326
327
328
329
330
331
332
333
334 @ManagedAttribute("maximum number of requests to allow processing of at the same time")
335 public int getMaxRequests()
336 {
337 return _maxRequests;
338 }
339
340
341
342
343
344
345
346
347 public void setMaxRequests(int value)
348 {
349 _passes = new Semaphore((value-_maxRequests+_passes.availablePermits()), true);
350 _maxRequests = value;
351 }
352
353 }