1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.servlets;
20
21 import java.io.IOException;
22 import java.util.Queue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Semaphore;
25 import java.util.concurrent.TimeUnit;
26
27 import javax.servlet.AsyncContext;
28 import javax.servlet.AsyncEvent;
29 import javax.servlet.AsyncListener;
30 import javax.servlet.Filter;
31 import javax.servlet.FilterChain;
32 import javax.servlet.FilterConfig;
33 import javax.servlet.ServletContext;
34 import javax.servlet.ServletException;
35 import javax.servlet.ServletRequest;
36 import javax.servlet.ServletResponse;
37 import javax.servlet.http.HttpServletRequest;
38 import javax.servlet.http.HttpServletResponse;
39 import javax.servlet.http.HttpSession;
40
41 import org.eclipse.jetty.server.handler.ContextHandler;
42 import org.eclipse.jetty.util.annotation.ManagedAttribute;
43 import org.eclipse.jetty.util.annotation.ManagedObject;
44 import org.eclipse.jetty.util.log.Log;
45 import org.eclipse.jetty.util.log.Logger;
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 @ManagedObject("Quality of Service Filter")
80 public class QoSFilter implements Filter
81 {
82 private static final Logger LOG = Log.getLogger(QoSFilter.class);
83
84 static final int __DEFAULT_MAX_PRIORITY = 10;
85 static final int __DEFAULT_PASSES = 10;
86 static final int __DEFAULT_WAIT_MS = 50;
87 static final long __DEFAULT_TIMEOUT_MS = -1;
88
89 static final String MANAGED_ATTR_INIT_PARAM = "managedAttr";
90 static final String MAX_REQUESTS_INIT_PARAM = "maxRequests";
91 static final String MAX_PRIORITY_INIT_PARAM = "maxPriority";
92 static final String MAX_WAIT_INIT_PARAM = "waitMs";
93 static final String SUSPEND_INIT_PARAM = "suspendMs";
94
95 private final String _suspended = "QoSFilter@" + Integer.toHexString(hashCode()) + ".SUSPENDED";
96 private final String _resumed = "QoSFilter@" + Integer.toHexString(hashCode()) + ".RESUMED";
97 private long _waitMs;
98 private long _suspendMs;
99 private int _maxRequests;
100 private Semaphore _passes;
101 private Queue<AsyncContext>[] _queues;
102 private AsyncListener[] _listeners;
103
104 public void init(FilterConfig filterConfig)
105 {
106 int max_priority = __DEFAULT_MAX_PRIORITY;
107 if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM) != null)
108 max_priority = Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
109 _queues = new Queue[max_priority + 1];
110 _listeners = new AsyncListener[_queues.length];
111 for (int p = 0; p < _queues.length; ++p)
112 {
113 _queues[p] = new ConcurrentLinkedQueue<>();
114 _listeners[p] = new QoSAsyncListener(p);
115 }
116
117 int maxRequests = __DEFAULT_PASSES;
118 if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM) != null)
119 maxRequests = Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
120 _passes = new Semaphore(maxRequests, true);
121 _maxRequests = maxRequests;
122
123 long wait = __DEFAULT_WAIT_MS;
124 if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM) != null)
125 wait = Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
126 _waitMs = wait;
127
128 long suspend = __DEFAULT_TIMEOUT_MS;
129 if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM) != null)
130 suspend = Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
131 _suspendMs = suspend;
132
133 ServletContext context = filterConfig.getServletContext();
134 if (context != null && Boolean.parseBoolean(filterConfig.getInitParameter(MANAGED_ATTR_INIT_PARAM)))
135 context.setAttribute(filterConfig.getFilterName(), this);
136 }
137
138 public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException
139 {
140 boolean accepted = false;
141 try
142 {
143 Boolean suspended = (Boolean)request.getAttribute(_suspended);
144 if (suspended == null)
145 {
146 accepted = _passes.tryAcquire(getWaitMs(), TimeUnit.MILLISECONDS);
147 if (accepted)
148 {
149 request.setAttribute(_suspended, Boolean.FALSE);
150 if (LOG.isDebugEnabled())
151 LOG.debug("Accepted {}", request);
152 }
153 else
154 {
155 request.setAttribute(_suspended, Boolean.TRUE);
156 int priority = getPriority(request);
157 AsyncContext asyncContext = request.startAsync();
158 long suspendMs = getSuspendMs();
159 if (suspendMs > 0)
160 asyncContext.setTimeout(suspendMs);
161 asyncContext.addListener(_listeners[priority]);
162 _queues[priority].add(asyncContext);
163 if (LOG.isDebugEnabled())
164 LOG.debug("Suspended {}", request);
165 return;
166 }
167 }
168 else
169 {
170 if (suspended)
171 {
172 request.setAttribute(_suspended, Boolean.FALSE);
173 Boolean resumed = (Boolean)request.getAttribute(_resumed);
174 if (resumed == Boolean.TRUE)
175 {
176 _passes.acquire();
177 accepted = true;
178 if (LOG.isDebugEnabled())
179 LOG.debug("Resumed {}", request);
180 }
181 else
182 {
183
184 accepted = _passes.tryAcquire(getWaitMs(), TimeUnit.MILLISECONDS);
185 if (LOG.isDebugEnabled())
186 LOG.debug("Timeout {}", request);
187 }
188 }
189 else
190 {
191
192 _passes.acquire();
193 accepted = true;
194 if (LOG.isDebugEnabled())
195 LOG.debug("Passthrough {}", request);
196 }
197 }
198
199 if (accepted)
200 {
201 chain.doFilter(request, response);
202 }
203 else
204 {
205 if (LOG.isDebugEnabled())
206 LOG.debug("Rejected {}", request);
207 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
208 }
209 }
210 catch (InterruptedException e)
211 {
212 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
213 }
214 finally
215 {
216 if (accepted)
217 {
218 for (int p = _queues.length - 1; p >= 0; --p)
219 {
220 AsyncContext asyncContext = _queues[p].poll();
221 if (asyncContext != null)
222 {
223 ServletRequest candidate = asyncContext.getRequest();
224 Boolean suspended = (Boolean)candidate.getAttribute(_suspended);
225 if (suspended == Boolean.TRUE)
226 {
227 candidate.setAttribute(_resumed, Boolean.TRUE);
228 asyncContext.dispatch();
229 break;
230 }
231 }
232 }
233 _passes.release();
234 }
235 }
236 }
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252 protected int getPriority(ServletRequest request)
253 {
254 HttpServletRequest baseRequest = (HttpServletRequest)request;
255 if (baseRequest.getUserPrincipal() != null)
256 {
257 return 2;
258 }
259 else
260 {
261 HttpSession session = baseRequest.getSession(false);
262 if (session != null && !session.isNew())
263 return 1;
264 else
265 return 0;
266 }
267 }
268
269 public void destroy()
270 {
271 }
272
273
274
275
276
277
278
279 @ManagedAttribute("(short) amount of time filter will wait before suspending request (in ms)")
280 public long getWaitMs()
281 {
282 return _waitMs;
283 }
284
285
286
287
288
289
290
291 public void setWaitMs(long value)
292 {
293 _waitMs = value;
294 }
295
296
297
298
299
300
301
302 @ManagedAttribute("amount of time filter will suspend a request for while waiting for the semaphore to become available (in ms)")
303 public long getSuspendMs()
304 {
305 return _suspendMs;
306 }
307
308
309
310
311
312
313
314 public void setSuspendMs(long value)
315 {
316 _suspendMs = value;
317 }
318
319
320
321
322
323
324
325 @ManagedAttribute("maximum number of requests to allow processing of at the same time")
326 public int getMaxRequests()
327 {
328 return _maxRequests;
329 }
330
331
332
333
334
335
336
337 public void setMaxRequests(int value)
338 {
339 _passes = new Semaphore((value - getMaxRequests() + _passes.availablePermits()), true);
340 _maxRequests = value;
341 }
342
343 private class QoSAsyncListener implements AsyncListener
344 {
345 private final int priority;
346
347 public QoSAsyncListener(int priority)
348 {
349 this.priority = priority;
350 }
351
352 @Override
353 public void onStartAsync(AsyncEvent event) throws IOException
354 {
355 }
356
357 @Override
358 public void onComplete(AsyncEvent event) throws IOException
359 {
360 }
361
362 @Override
363 public void onTimeout(AsyncEvent event) throws IOException
364 {
365
366
367 AsyncContext asyncContext = event.getAsyncContext();
368 _queues[priority].remove(asyncContext);
369 asyncContext.dispatch();
370 }
371
372 @Override
373 public void onError(AsyncEvent event) throws IOException
374 {
375 }
376 }
377 }