1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.continuation.test;
15
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.net.Socket;
19 import java.util.Timer;
20 import java.util.TimerTask;
21
22 import javax.servlet.ServletException;
23 import javax.servlet.http.HttpServlet;
24 import javax.servlet.http.HttpServletRequest;
25 import javax.servlet.http.HttpServletResponse;
26
27 import org.eclipse.jetty.continuation.Continuation;
28 import org.eclipse.jetty.continuation.ContinuationListener;
29 import org.eclipse.jetty.continuation.ContinuationSupport;
30
31 import junit.framework.TestCase;
32
33
34
35 public abstract class ContinuationBase extends TestCase
36 {
37 protected SuspendServlet _servlet=new SuspendServlet();
38 protected int _port;
39
40 protected void doit(String type) throws Exception
41 {
42 String response;
43
44 response=process(null,null);
45 assertContains(type,response);
46 assertContains("NORMAL",response);
47 assertNotContains("history: onTimeout",response);
48 assertNotContains("history: onComplete",response);
49
50 response=process("sleep=200",null);
51 assertContains("SLEPT",response);
52 assertNotContains("history: onTimeout",response);
53 assertNotContains("history: onComplete",response);
54
55 response=process("suspend=200",null);
56 assertContains("TIMEOUT",response);
57 assertContains("history: onTimeout",response);
58 assertContains("history: onComplete",response);
59
60 response=process("suspend=200&resume=10",null);
61 assertContains("RESUMED",response);
62 assertNotContains("history: onTimeout",response);
63 assertContains("history: onComplete",response);
64
65 response=process("suspend=200&resume=0",null);
66 assertContains("RESUMED",response);
67 assertNotContains("history: onTimeout",response);
68 assertContains("history: onComplete",response);
69
70 response=process("suspend=200&complete=10",null);
71 assertContains("COMPLETED",response);
72 assertNotContains("history: onTimeout",response);
73 assertContains("history: onComplete",response);
74
75 response=process("suspend=200&complete=0",null);
76 assertContains("COMPLETED",response);
77 assertNotContains("history: onTimeout",response);
78 assertContains("history: onComplete",response);
79
80
81 response=process("suspend=1000&resume=10&suspend2=1000&resume2=10",null);
82 assertEquals(2,count(response,"history: suspend"));
83 assertEquals(2,count(response,"history: resume"));
84 assertEquals(0,count(response,"history: onTimeout"));
85 assertEquals(1,count(response,"history: onComplete"));
86 assertContains("RESUMED",response);
87
88 response=process("suspend=1000&resume=10&suspend2=1000&resume2=10",null);
89 assertEquals(2,count(response,"history: suspend"));
90 assertEquals(2,count(response,"history: resume"));
91 assertEquals(0,count(response,"history: onTimeout"));
92 assertEquals(1,count(response,"history: onComplete"));
93 assertContains("RESUMED",response);
94
95 response=process("suspend=1000&resume=10&suspend2=1000&complete2=10",null);
96 assertEquals(2,count(response,"history: suspend"));
97 assertEquals(1,count(response,"history: resume"));
98 assertEquals(0,count(response,"history: onTimeout"));
99 assertEquals(1,count(response,"history: onComplete"));
100 assertContains("COMPLETED",response);
101
102 response=process("suspend=1000&resume=10&suspend2=10",null);
103 assertEquals(2,count(response,"history: suspend"));
104 assertEquals(1,count(response,"history: resume"));
105 assertEquals(1,count(response,"history: onTimeout"));
106 assertEquals(1,count(response,"history: onComplete"));
107 assertContains("TIMEOUT",response);
108
109
110
111 response=process("suspend=10&suspend2=1000&resume2=10",null);
112 assertEquals(2,count(response,"history: suspend"));
113 assertEquals(1,count(response,"history: resume"));
114 assertEquals(1,count(response,"history: onTimeout"));
115 assertEquals(1,count(response,"history: onComplete"));
116 assertContains("RESUMED",response);
117
118 response=process("suspend=10&suspend2=1000&resume2=10",null);
119 assertEquals(2,count(response,"history: suspend"));
120 assertEquals(1,count(response,"history: resume"));
121 assertEquals(1,count(response,"history: onTimeout"));
122 assertEquals(1,count(response,"history: onComplete"));
123 assertContains("RESUMED",response);
124
125 response=process("suspend=10&suspend2=1000&complete2=10",null);
126 assertEquals(2,count(response,"history: suspend"));
127 assertEquals(0,count(response,"history: resume"));
128 assertEquals(1,count(response,"history: onTimeout"));
129 assertEquals(1,count(response,"history: onComplete"));
130 assertContains("COMPLETED",response);
131
132 response=process("suspend=10&suspend2=10",null);
133 assertEquals(2,count(response,"history: suspend"));
134 assertEquals(0,count(response,"history: resume"));
135 assertEquals(2,count(response,"history: onTimeout"));
136 assertEquals(1,count(response,"history: onComplete"));
137 assertContains("TIMEOUT",response);
138
139
140 response=process("suspend=200&resume=10&undispatch=true",null);
141 assertContains("RESUMED",response);
142 assertNotContains("history: onTimeout",response);
143 assertContains("history: onComplete",response);
144
145 response=process("suspend=200&resume=0&undispatch=true",null);
146 assertContains("RESUMED",response);
147 assertNotContains("history: onTimeout",response);
148 assertContains("history: onComplete",response);
149
150 response=process("suspend=200&complete=10&undispatch=true",null);
151 assertContains("COMPLETED",response);
152 assertNotContains("history: onTimeout",response);
153 assertContains("history: onComplete",response);
154
155 response=process("suspend=200&complete=0&undispatch=true",null);
156 assertContains("COMPLETED",response);
157 assertNotContains("history: onTimeout",response);
158 assertContains("history: onComplete",response);
159 }
160
161
162 private int count(String responses,String substring)
163 {
164 int count=0;
165 int i=responses.indexOf(substring);
166 while (i>=0)
167 {
168 count++;
169 i=responses.indexOf(substring,i+substring.length());
170 }
171
172 return count;
173 }
174
175 protected void assertContains(String content,String response)
176 {
177 assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
178 if (response.indexOf(content,15)<0)
179 {
180 System.err.println("'"+content+"' NOT IN:\n"+response+"\n--");
181 assertTrue(false);
182 }
183 }
184
185 protected void assertNotContains(String content,String response)
186 {
187 assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
188 if (response.indexOf(content,15)>=0)
189 {
190 System.err.println("'"+content+"' IS IN:\n"+response+"'\n--");
191 assertTrue(false);
192 }
193 }
194
195 public synchronized String process(String query,String content) throws Exception
196 {
197 String request = "GET /";
198
199 if (query!=null)
200 request+="?"+query;
201 request+=" HTTP/1.1\r\n"+
202 "Host: localhost\r\n"+
203 "Connection: close\r\n";
204 if (content!=null)
205 request+="Content-Length: "+content.length()+"\r\n";
206 request+="\r\n" + content;
207
208 Socket socket = new Socket("localhost",_port);
209 socket.getOutputStream().write(request.getBytes("UTF-8"));
210
211 String response = toString(socket.getInputStream());
212 return response;
213 }
214
215
216 protected abstract String toString(InputStream in) throws IOException;
217
218
219 private static class SuspendServlet extends HttpServlet
220 {
221 private Timer _timer=new Timer();
222
223 public SuspendServlet()
224 {}
225
226
227 protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
228 {
229 final Continuation continuation = ContinuationSupport.getContinuation(request,response);
230
231 response.addHeader("history",continuation.getClass().toString());
232
233 int read_before=0;
234 long sleep_for=-1;
235 long suspend_for=-1;
236 long suspend2_for=-1;
237 long resume_after=-1;
238 long resume2_after=-1;
239 long complete_after=-1;
240 long complete2_after=-1;
241 boolean undispatch=false;
242
243 if (request.getParameter("read")!=null)
244 read_before=Integer.parseInt(request.getParameter("read"));
245 if (request.getParameter("sleep")!=null)
246 sleep_for=Integer.parseInt(request.getParameter("sleep"));
247 if (request.getParameter("suspend")!=null)
248 suspend_for=Integer.parseInt(request.getParameter("suspend"));
249 if (request.getParameter("suspend2")!=null)
250 suspend2_for=Integer.parseInt(request.getParameter("suspend2"));
251 if (request.getParameter("resume")!=null)
252 resume_after=Integer.parseInt(request.getParameter("resume"));
253 if (request.getParameter("resume2")!=null)
254 resume2_after=Integer.parseInt(request.getParameter("resume2"));
255 if (request.getParameter("complete")!=null)
256 complete_after=Integer.parseInt(request.getParameter("complete"));
257 if (request.getParameter("complete2")!=null)
258 complete2_after=Integer.parseInt(request.getParameter("complete2"));
259 if (request.getParameter("undispatch")!=null)
260 undispatch=Boolean.parseBoolean(request.getParameter("undispatch"));
261
262 if (continuation.isInitial())
263 {
264 if (read_before>0)
265 {
266 byte[] buf=new byte[read_before];
267 request.getInputStream().read(buf);
268 }
269 else if (read_before<0)
270 {
271 InputStream in = request.getInputStream();
272 int b=in.read();
273 while(b!=-1)
274 b=in.read();
275 }
276
277 if (suspend_for>=0)
278 {
279 if (suspend_for>0)
280 continuation.setTimeout(suspend_for);
281 continuation.addContinuationListener(__listener);
282 ((HttpServletResponse)response).addHeader("history","suspend");
283 continuation.suspend(response);
284
285 if (complete_after>0)
286 {
287 TimerTask complete = new TimerTask()
288 {
289 public void run()
290 {
291 try
292 {
293 response.setStatus(200);
294 response.getOutputStream().println("COMPLETED\n");
295 continuation.complete();
296 }
297 catch(Exception e)
298 {
299 e.printStackTrace();
300 }
301 }
302 };
303 synchronized (_timer)
304 {
305 _timer.schedule(complete,complete_after);
306 }
307 }
308 else if (complete_after==0)
309 {
310 response.setStatus(200);
311 response.getOutputStream().println("COMPLETED\n");
312 continuation.complete();
313 }
314 else if (resume_after>0)
315 {
316 TimerTask resume = new TimerTask()
317 {
318 public void run()
319 {
320 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","resume");
321 continuation.resume();
322 }
323 };
324 synchronized (_timer)
325 {
326 _timer.schedule(resume,resume_after);
327 }
328 }
329 else if (resume_after==0)
330 {
331 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","resume");
332 continuation.resume();
333 }
334
335 if (undispatch)
336 continuation.undispatch();
337 }
338 else if (sleep_for>=0)
339 {
340 try
341 {
342 Thread.sleep(sleep_for);
343 }
344 catch (InterruptedException e)
345 {
346 e.printStackTrace();
347 }
348 response.setStatus(200);
349 response.getOutputStream().println("SLEPT\n");
350 }
351 else
352 {
353 response.setStatus(200);
354 response.getOutputStream().println("NORMAL\n");
355 }
356 }
357 else if (suspend2_for>=0 && request.getAttribute("2nd")==null)
358 {
359 request.setAttribute("2nd","cycle");
360
361 if (suspend2_for>0)
362 continuation.setTimeout(suspend2_for);
363
364 ((HttpServletResponse)response).addHeader("history","suspend");
365 continuation.suspend(response);
366
367 if (complete2_after>0)
368 {
369 TimerTask complete = new TimerTask()
370 {
371 public void run()
372 {
373 try
374 {
375 response.setStatus(200);
376 response.getOutputStream().println("COMPLETED\n");
377 continuation.complete();
378 }
379 catch(Exception e)
380 {
381 e.printStackTrace();
382 }
383 }
384 };
385 synchronized (_timer)
386 {
387 _timer.schedule(complete,complete2_after);
388 }
389 }
390 else if (complete2_after==0)
391 {
392 response.setStatus(200);
393 response.getOutputStream().println("COMPLETED\n");
394 continuation.complete();
395 }
396 else if (resume2_after>0)
397 {
398 TimerTask resume = new TimerTask()
399 {
400 public void run()
401 {
402 ((HttpServletResponse)response).addHeader("history","resume");
403 continuation.resume();
404 }
405 };
406 synchronized (_timer)
407 {
408 _timer.schedule(resume,resume2_after);
409 }
410 }
411 else if (resume2_after==0)
412 {
413 ((HttpServletResponse)response).addHeader("history","resume");
414 continuation.resume();
415 }
416 if (undispatch)
417 continuation.undispatch();
418 return;
419 }
420 else if (continuation.isExpired())
421 {
422 response.setStatus(200);
423 response.getOutputStream().println("TIMEOUT\n");
424 }
425 else if (continuation.isResumed())
426 {
427 response.setStatus(200);
428 response.getOutputStream().println("RESUMED\n");
429 }
430 else
431 {
432 response.setStatus(200);
433 response.getOutputStream().println("unknown???\n");
434 }
435 }
436 }
437
438
439 private static ContinuationListener __listener =
440 new ContinuationListener()
441 {
442 public void onComplete(Continuation continuation)
443 {
444 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","onComplete");
445 }
446
447 public void onTimeout(Continuation continuation)
448 {
449 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","onTimeout");
450 continuation.resume();
451 }
452
453 };
454 }