1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.servlets;
15
16 import java.io.IOException;
17 import java.util.Queue;
18 import java.util.concurrent.ConcurrentLinkedQueue;
19 import java.util.concurrent.Semaphore;
20 import java.util.concurrent.TimeUnit;
21
22 import javax.servlet.Filter;
23 import javax.servlet.FilterChain;
24 import javax.servlet.FilterConfig;
25 import javax.servlet.ServletContext;
26 import javax.servlet.ServletException;
27 import javax.servlet.ServletRequest;
28 import javax.servlet.ServletResponse;
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpServletResponse;
31 import javax.servlet.http.HttpSession;
32
33 import org.eclipse.jetty.continuation.Continuation;
34 import org.eclipse.jetty.continuation.ContinuationListener;
35 import org.eclipse.jetty.continuation.ContinuationSupport;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 public class QoSFilter implements Filter
68 {
69 final static int __DEFAULT_MAX_PRIORITY=10;
70 final static int __DEFAULT_PASSES=10;
71 final static int __DEFAULT_WAIT_MS=50;
72 final static long __DEFAULT_TIMEOUT_MS = -1;
73
74 final static String MAX_REQUESTS_INIT_PARAM="maxRequests";
75 final static String MAX_PRIORITY_INIT_PARAM="maxPriority";
76 final static String MAX_WAIT_INIT_PARAM="waitMs";
77 final static String SUSPEND_INIT_PARAM="suspendMs";
78
79 ServletContext _context;
80 long _waitMs;
81 long _suspendMs;
82 Semaphore _passes;
83 Queue<Continuation>[] _queue;
84 ContinuationListener[] _listener;
85 String _suspended="QoSFilter@"+this.hashCode();
86
87 public void init(FilterConfig filterConfig)
88 {
89 _context=filterConfig.getServletContext();
90
91 int max_priority=__DEFAULT_MAX_PRIORITY;
92 if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM)!=null)
93 max_priority=Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
94 _queue=new Queue[max_priority+1];
95 _listener = new ContinuationListener[max_priority + 1];
96 for (int p=0;p<_queue.length;p++)
97 {
98 _queue[p]=new ConcurrentLinkedQueue<Continuation>();
99
100 final int priority=p;
101 _listener[p] = new ContinuationListener()
102 {
103 public void onComplete(Continuation continuation)
104 {}
105
106 public void onTimeout(Continuation continuation)
107 {
108 _queue[priority].remove(continuation);
109 }
110 };
111 }
112
113 int passes=__DEFAULT_PASSES;
114 if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM)!=null)
115 passes=Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
116 _passes=new Semaphore(passes,true);
117
118 long wait = __DEFAULT_WAIT_MS;
119 if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM)!=null)
120 wait=Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
121 _waitMs=wait;
122
123 long suspend = __DEFAULT_TIMEOUT_MS;
124 if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM)!=null)
125 suspend=Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
126 _suspendMs=suspend;
127 }
128
129 public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
130 throws IOException, ServletException
131 {
132 boolean accepted=false;
133 try
134 {
135 if (request.getAttribute(_suspended)==null)
136 {
137 accepted=_passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
138 if (accepted)
139 {
140 request.setAttribute(_suspended,Boolean.FALSE);
141 }
142 else
143 {
144 request.setAttribute(_suspended,Boolean.TRUE);
145 int priority = getPriority(request);
146 Continuation continuation = ContinuationSupport.getContinuation(request);
147 if (_suspendMs>0)
148 continuation.setTimeout(_suspendMs);
149 continuation.suspend();
150 continuation.addContinuationListener(_listener[priority]);
151 _queue[priority].add(continuation);
152 return;
153 }
154 }
155 else
156 {
157 Boolean suspended=(Boolean)request.getAttribute(_suspended);
158
159 if (suspended.booleanValue())
160 {
161 request.setAttribute(_suspended,Boolean.FALSE);
162 if (request.getAttribute("javax.servlet.resumed")==Boolean.TRUE)
163 {
164 _passes.acquire();
165 accepted=true;
166 }
167 else
168 {
169
170 accepted = _passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
171 }
172 }
173 else
174 {
175
176 _passes.acquire();
177 accepted = true;
178 }
179 }
180
181 if (accepted)
182 {
183 chain.doFilter(request,response);
184 }
185 else
186 {
187 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
188 }
189 }
190 catch(InterruptedException e)
191 {
192 _context.log("QoS",e);
193 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
194 }
195 finally
196 {
197 if (accepted)
198 {
199 for (int p=_queue.length;p-->0;)
200 {
201 Continuation continutaion=_queue[p].poll();
202 if (continutaion!=null && continutaion.isSuspended())
203 {
204 continutaion.resume();
205 break;
206 }
207 }
208 _passes.release();
209 }
210 }
211 }
212
213
214
215
216
217
218
219
220
221
222
223
224
225 protected int getPriority(ServletRequest request)
226 {
227 HttpServletRequest baseRequest = (HttpServletRequest)request;
228 if (baseRequest.getUserPrincipal() != null )
229 return 2;
230 else
231 {
232 HttpSession session = baseRequest.getSession(false);
233 if (session!=null && !session.isNew())
234 return 1;
235 else
236 return 0;
237 }
238 }
239
240
241 public void destroy(){}
242
243 }