1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.servlets;
20
21 import java.io.IOException;
22 import java.util.Queue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Semaphore;
25 import java.util.concurrent.TimeUnit;
26
27 import javax.servlet.Filter;
28 import javax.servlet.FilterChain;
29 import javax.servlet.FilterConfig;
30 import javax.servlet.ServletContext;
31 import javax.servlet.ServletException;
32 import javax.servlet.ServletRequest;
33 import javax.servlet.ServletResponse;
34 import javax.servlet.http.HttpServletRequest;
35 import javax.servlet.http.HttpServletResponse;
36 import javax.servlet.http.HttpSession;
37
38 import org.eclipse.jetty.continuation.Continuation;
39 import org.eclipse.jetty.continuation.ContinuationListener;
40 import org.eclipse.jetty.continuation.ContinuationSupport;
41 import org.eclipse.jetty.server.handler.ContextHandler;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 public class QoSFilter implements Filter
79 {
80 final static int __DEFAULT_MAX_PRIORITY=10;
81 final static int __DEFAULT_PASSES=10;
82 final static int __DEFAULT_WAIT_MS=50;
83 final static long __DEFAULT_TIMEOUT_MS = -1;
84
85 final static String MANAGED_ATTR_INIT_PARAM="managedAttr";
86 final static String MAX_REQUESTS_INIT_PARAM="maxRequests";
87 final static String MAX_PRIORITY_INIT_PARAM="maxPriority";
88 final static String MAX_WAIT_INIT_PARAM="waitMs";
89 final static String SUSPEND_INIT_PARAM="suspendMs";
90
91 ServletContext _context;
92
93 protected long _waitMs;
94 protected long _suspendMs;
95 protected int _maxRequests;
96
97 private Semaphore _passes;
98 private Queue<Continuation>[] _queue;
99 private ContinuationListener[] _listener;
100 private String _suspended="QoSFilter@"+this.hashCode();
101
102
103
104
105
106 public void init(FilterConfig filterConfig)
107 {
108 _context=filterConfig.getServletContext();
109
110 int max_priority=__DEFAULT_MAX_PRIORITY;
111 if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM)!=null)
112 max_priority=Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
113 _queue=new Queue[max_priority+1];
114 _listener = new ContinuationListener[max_priority + 1];
115 for (int p=0;p<_queue.length;p++)
116 {
117 _queue[p]=new ConcurrentLinkedQueue<Continuation>();
118
119 final int priority=p;
120 _listener[p] = new ContinuationListener()
121 {
122 public void onComplete(Continuation continuation)
123 {}
124
125 public void onTimeout(Continuation continuation)
126 {
127 _queue[priority].remove(continuation);
128 }
129 };
130 }
131
132 int maxRequests=__DEFAULT_PASSES;
133 if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM)!=null)
134 maxRequests=Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
135 _passes=new Semaphore(maxRequests,true);
136 _maxRequests = maxRequests;
137
138 long wait = __DEFAULT_WAIT_MS;
139 if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM)!=null)
140 wait=Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
141 _waitMs=wait;
142
143 long suspend = __DEFAULT_TIMEOUT_MS;
144 if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM)!=null)
145 suspend=Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
146 _suspendMs=suspend;
147
148 if (_context!=null && Boolean.parseBoolean(filterConfig.getInitParameter(MANAGED_ATTR_INIT_PARAM)))
149 _context.setAttribute(filterConfig.getFilterName(),this);
150 }
151
152
153
154
155
156 public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
157 throws IOException, ServletException
158 {
159 boolean accepted=false;
160 try
161 {
162 if (request.getAttribute(_suspended)==null)
163 {
164 accepted=_passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
165 if (accepted)
166 {
167 request.setAttribute(_suspended,Boolean.FALSE);
168 }
169 else
170 {
171 request.setAttribute(_suspended,Boolean.TRUE);
172 int priority = getPriority(request);
173 Continuation continuation = ContinuationSupport.getContinuation(request);
174 if (_suspendMs>0)
175 continuation.setTimeout(_suspendMs);
176 continuation.suspend();
177 continuation.addContinuationListener(_listener[priority]);
178 _queue[priority].add(continuation);
179 return;
180 }
181 }
182 else
183 {
184 Boolean suspended=(Boolean)request.getAttribute(_suspended);
185
186 if (suspended.booleanValue())
187 {
188 request.setAttribute(_suspended,Boolean.FALSE);
189 if (request.getAttribute("javax.servlet.resumed")==Boolean.TRUE)
190 {
191 _passes.acquire();
192 accepted=true;
193 }
194 else
195 {
196
197 accepted = _passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
198 }
199 }
200 else
201 {
202
203 _passes.acquire();
204 accepted = true;
205 }
206 }
207
208 if (accepted)
209 {
210 chain.doFilter(request,response);
211 }
212 else
213 {
214 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
215 }
216 }
217 catch(InterruptedException e)
218 {
219 _context.log("QoS",e);
220 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
221 }
222 finally
223 {
224 if (accepted)
225 {
226 for (int p=_queue.length;p-->0;)
227 {
228 Continuation continutaion=_queue[p].poll();
229 if (continutaion!=null && continutaion.isSuspended())
230 {
231 continutaion.resume();
232 break;
233 }
234 }
235 _passes.release();
236 }
237 }
238 }
239
240
241
242
243
244
245
246
247
248
249
250
251
252 protected int getPriority(ServletRequest request)
253 {
254 HttpServletRequest baseRequest = (HttpServletRequest)request;
255 if (baseRequest.getUserPrincipal() != null )
256 return 2;
257 else
258 {
259 HttpSession session = baseRequest.getSession(false);
260 if (session!=null && !session.isNew())
261 return 1;
262 else
263 return 0;
264 }
265 }
266
267
268
269
270
271
272 public void destroy(){}
273
274
275
276
277
278
279
280
281 public long getWaitMs()
282 {
283 return _waitMs;
284 }
285
286
287
288
289
290
291
292
293 public void setWaitMs(long value)
294 {
295 _waitMs = value;
296 }
297
298
299
300
301
302
303
304
305 public long getSuspendMs()
306 {
307 return _suspendMs;
308 }
309
310
311
312
313
314
315
316
317 public void setSuspendMs(long value)
318 {
319 _suspendMs = value;
320 }
321
322
323
324
325
326
327
328
329 public int getMaxRequests()
330 {
331 return _maxRequests;
332 }
333
334
335
336
337
338
339
340
341 public void setMaxRequests(int value)
342 {
343 _passes = new Semaphore((value-_maxRequests+_passes.availablePermits()), true);
344 _maxRequests = value;
345 }
346
347 }