1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.servlets;
20
21 import java.io.IOException;
22 import java.util.Queue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Semaphore;
25 import java.util.concurrent.TimeUnit;
26 import javax.servlet.AsyncContext;
27 import javax.servlet.AsyncEvent;
28 import javax.servlet.AsyncListener;
29 import javax.servlet.Filter;
30 import javax.servlet.FilterChain;
31 import javax.servlet.FilterConfig;
32 import javax.servlet.ServletContext;
33 import javax.servlet.ServletException;
34 import javax.servlet.ServletRequest;
35 import javax.servlet.ServletResponse;
36 import javax.servlet.http.HttpServletRequest;
37 import javax.servlet.http.HttpServletResponse;
38 import javax.servlet.http.HttpSession;
39
40 import org.eclipse.jetty.server.handler.ContextHandler;
41 import org.eclipse.jetty.util.annotation.ManagedAttribute;
42 import org.eclipse.jetty.util.annotation.ManagedObject;
43 import org.eclipse.jetty.util.log.Log;
44 import org.eclipse.jetty.util.log.Logger;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 @ManagedObject("Quality of Service Filter")
79 public class QoSFilter implements Filter
80 {
81 private static final Logger LOG = Log.getLogger(QoSFilter.class);
82
83 static final int __DEFAULT_MAX_PRIORITY = 10;
84 static final int __DEFAULT_PASSES = 10;
85 static final int __DEFAULT_WAIT_MS = 50;
86 static final long __DEFAULT_TIMEOUT_MS = -1;
87
88 static final String MANAGED_ATTR_INIT_PARAM = "managedAttr";
89 static final String MAX_REQUESTS_INIT_PARAM = "maxRequests";
90 static final String MAX_PRIORITY_INIT_PARAM = "maxPriority";
91 static final String MAX_WAIT_INIT_PARAM = "waitMs";
92 static final String SUSPEND_INIT_PARAM = "suspendMs";
93
94 private final String _suspended = "QoSFilter@" + Integer.toHexString(hashCode()) + ".SUSPENDED";
95 private final String _resumed = "QoSFilter@" + Integer.toHexString(hashCode()) + ".RESUMED";
96 private long _waitMs;
97 private long _suspendMs;
98 private int _maxRequests;
99 private Semaphore _passes;
100 private Queue<AsyncContext>[] _queues;
101 private AsyncListener[] _listeners;
102
103 public void init(FilterConfig filterConfig)
104 {
105 int max_priority = __DEFAULT_MAX_PRIORITY;
106 if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM) != null)
107 max_priority = Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
108 _queues = new Queue[max_priority + 1];
109 _listeners = new AsyncListener[_queues.length];
110 for (int p = 0; p < _queues.length; ++p)
111 {
112 _queues[p] = new ConcurrentLinkedQueue<>();
113 _listeners[p] = new QoSAsyncListener(p);
114 }
115
116 int maxRequests = __DEFAULT_PASSES;
117 if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM) != null)
118 maxRequests = Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
119 _passes = new Semaphore(maxRequests, true);
120 _maxRequests = maxRequests;
121
122 long wait = __DEFAULT_WAIT_MS;
123 if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM) != null)
124 wait = Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
125 _waitMs = wait;
126
127 long suspend = __DEFAULT_TIMEOUT_MS;
128 if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM) != null)
129 suspend = Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
130 _suspendMs = suspend;
131
132 ServletContext context = filterConfig.getServletContext();
133 if (context != null && Boolean.parseBoolean(filterConfig.getInitParameter(MANAGED_ATTR_INIT_PARAM)))
134 context.setAttribute(filterConfig.getFilterName(), this);
135 }
136
137 public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException
138 {
139 boolean accepted = false;
140 try
141 {
142 Boolean suspended = (Boolean)request.getAttribute(_suspended);
143 if (suspended == null)
144 {
145 accepted = _passes.tryAcquire(getWaitMs(), TimeUnit.MILLISECONDS);
146 if (accepted)
147 {
148 request.setAttribute(_suspended, Boolean.FALSE);
149 if (LOG.isDebugEnabled())
150 LOG.debug("Accepted {}", request);
151 }
152 else
153 {
154 request.setAttribute(_suspended, Boolean.TRUE);
155 int priority = getPriority(request);
156 AsyncContext asyncContext = request.startAsync();
157 long suspendMs = getSuspendMs();
158 if (suspendMs > 0)
159 asyncContext.setTimeout(suspendMs);
160 asyncContext.addListener(_listeners[priority]);
161 _queues[priority].add(asyncContext);
162 if (LOG.isDebugEnabled())
163 LOG.debug("Suspended {}", request);
164 return;
165 }
166 }
167 else
168 {
169 if (suspended)
170 {
171 request.setAttribute(_suspended, Boolean.FALSE);
172 Boolean resumed = (Boolean)request.getAttribute(_resumed);
173 if (resumed == Boolean.TRUE)
174 {
175 _passes.acquire();
176 accepted = true;
177 if (LOG.isDebugEnabled())
178 LOG.debug("Resumed {}", request);
179 }
180 else
181 {
182
183 accepted = _passes.tryAcquire(getWaitMs(), TimeUnit.MILLISECONDS);
184 if (LOG.isDebugEnabled())
185 LOG.debug("Timeout {}", request);
186 }
187 }
188 else
189 {
190
191 _passes.acquire();
192 accepted = true;
193 if (LOG.isDebugEnabled())
194 LOG.debug("Passthrough {}", request);
195 }
196 }
197
198 if (accepted)
199 {
200 chain.doFilter(request, response);
201 }
202 else
203 {
204 if (LOG.isDebugEnabled())
205 LOG.debug("Rejected {}", request);
206 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
207 }
208 }
209 catch (InterruptedException e)
210 {
211 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
212 }
213 finally
214 {
215 if (accepted)
216 {
217 for (int p = _queues.length - 1; p >= 0; --p)
218 {
219 AsyncContext asyncContext = _queues[p].poll();
220 if (asyncContext != null)
221 {
222 ServletRequest candidate = asyncContext.getRequest();
223 Boolean suspended = (Boolean)candidate.getAttribute(_suspended);
224 if (suspended == Boolean.TRUE)
225 {
226 candidate.setAttribute(_resumed, Boolean.TRUE);
227 asyncContext.dispatch();
228 break;
229 }
230 }
231 }
232 _passes.release();
233 }
234 }
235 }
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251 protected int getPriority(ServletRequest request)
252 {
253 HttpServletRequest baseRequest = (HttpServletRequest)request;
254 if (baseRequest.getUserPrincipal() != null)
255 {
256 return 2;
257 }
258 else
259 {
260 HttpSession session = baseRequest.getSession(false);
261 if (session != null && !session.isNew())
262 return 1;
263 else
264 return 0;
265 }
266 }
267
268 public void destroy()
269 {
270 }
271
272
273
274
275
276
277
278 @ManagedAttribute("(short) amount of time filter will wait before suspending request (in ms)")
279 public long getWaitMs()
280 {
281 return _waitMs;
282 }
283
284
285
286
287
288
289
290 public void setWaitMs(long value)
291 {
292 _waitMs = value;
293 }
294
295
296
297
298
299
300
301 @ManagedAttribute("amount of time filter will suspend a request for while waiting for the semaphore to become available (in ms)")
302 public long getSuspendMs()
303 {
304 return _suspendMs;
305 }
306
307
308
309
310
311
312
313 public void setSuspendMs(long value)
314 {
315 _suspendMs = value;
316 }
317
318
319
320
321
322
323
324 @ManagedAttribute("maximum number of requests to allow processing of at the same time")
325 public int getMaxRequests()
326 {
327 return _maxRequests;
328 }
329
330
331
332
333
334
335
336 public void setMaxRequests(int value)
337 {
338 _passes = new Semaphore((value - getMaxRequests() + _passes.availablePermits()), true);
339 _maxRequests = value;
340 }
341
342 private class QoSAsyncListener implements AsyncListener
343 {
344 private final int priority;
345
346 public QoSAsyncListener(int priority)
347 {
348 this.priority = priority;
349 }
350
351 @Override
352 public void onStartAsync(AsyncEvent event) throws IOException
353 {
354 }
355
356 @Override
357 public void onComplete(AsyncEvent event) throws IOException
358 {
359 }
360
361 @Override
362 public void onTimeout(AsyncEvent event) throws IOException
363 {
364
365
366 AsyncContext asyncContext = event.getAsyncContext();
367 _queues[priority].remove(asyncContext);
368 asyncContext.dispatch();
369 }
370
371 @Override
372 public void onError(AsyncEvent event) throws IOException
373 {
374 }
375 }
376 }