1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.continuation.test;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.net.Socket;
24 import java.util.Timer;
25 import java.util.TimerTask;
26
27 import javax.servlet.ServletException;
28 import javax.servlet.http.HttpServlet;
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpServletResponse;
31
32 import junit.framework.TestCase;
33
34 import org.eclipse.jetty.continuation.Continuation;
35 import org.eclipse.jetty.continuation.ContinuationListener;
36 import org.eclipse.jetty.continuation.ContinuationSupport;
37
38
39
40 public abstract class ContinuationBase extends TestCase
41 {
42 protected SuspendServlet _servlet=new SuspendServlet();
43 protected int _port;
44
45 protected void doNormal(String type) throws Exception
46 {
47 String response=process(null,null);
48 assertContains(type,response);
49 assertContains("NORMAL",response);
50 assertNotContains("history: onTimeout",response);
51 assertNotContains("history: onComplete",response);
52 }
53
54 protected void doSleep() throws Exception
55 {
56 String response=process("sleep=200",null);
57 assertContains("SLEPT",response);
58 assertNotContains("history: onTimeout",response);
59 assertNotContains("history: onComplete",response);
60 }
61
62 protected void doSuspend() throws Exception
63 {
64 String response=process("suspend=200",null);
65 assertContains("TIMEOUT",response);
66 assertContains("history: onTimeout",response);
67 assertContains("history: onComplete",response);
68 }
69
70 protected void doSuspendWaitResume() throws Exception
71 {
72 String response=process("suspend=200&resume=10",null);
73 assertContains("RESUMED",response);
74 assertNotContains("history: onTimeout",response);
75 assertContains("history: onComplete",response);
76 }
77
78 protected void doSuspendResume() throws Exception
79 {
80 String response=process("suspend=200&resume=0",null);
81 assertContains("RESUMED",response);
82 assertNotContains("history: onTimeout",response);
83 assertContains("history: onComplete",response);
84 }
85
86 protected void doSuspendWaitComplete() throws Exception
87 {
88 String response=process("suspend=200&complete=50",null);
89 assertContains("COMPLETED",response);
90 assertContains("history: initial",response);
91 assertNotContains("history: onTimeout",response);
92 assertContains("history: onComplete",response);
93 assertNotContains("history: !initial",response);
94 }
95
96 protected void doSuspendComplete() throws Exception
97 {
98 String response=process("suspend=200&complete=0",null);
99 assertContains("COMPLETED",response);
100 assertContains("history: initial",response);
101 assertNotContains("history: onTimeout",response);
102 assertContains("history: onComplete",response);
103 assertNotContains("history: !initial",response);
104 }
105
106 protected void doSuspendWaitResumeSuspendWaitResume() throws Exception
107 {
108 String response=process("suspend=1000&resume=10&suspend2=1000&resume2=10",null);
109 assertEquals(2,count(response,"history: suspend"));
110 assertEquals(2,count(response,"history: resume"));
111 assertEquals(0,count(response,"history: onTimeout"));
112 assertEquals(1,count(response,"history: onComplete"));
113 assertContains("RESUMED",response);
114 }
115
116 protected void doSuspendWaitResumeSuspendComplete() throws Exception
117 {
118 String response=process("suspend=1000&resume=10&suspend2=1000&complete2=10",null);
119 assertEquals(2,count(response,"history: suspend"));
120 assertEquals(1,count(response,"history: resume"));
121 assertEquals(0,count(response,"history: onTimeout"));
122 assertEquals(1,count(response,"history: onComplete"));
123 assertContains("COMPLETED",response);
124 }
125
126 protected void doSuspendWaitResumeSuspend() throws Exception
127 {
128 String response=process("suspend=1000&resume=10&suspend2=10",null);
129 assertEquals(2,count(response,"history: suspend"));
130 assertEquals(1,count(response,"history: resume"));
131 assertEquals(1,count(response,"history: onTimeout"));
132 assertEquals(1,count(response,"history: onComplete"));
133 assertContains("TIMEOUT",response);
134 }
135
136 protected void doSuspendTimeoutSuspendResume() throws Exception
137 {
138 String response=process("suspend=10&suspend2=1000&resume2=10",null);
139 assertEquals(2,count(response,"history: suspend"));
140 assertEquals(1,count(response,"history: resume"));
141 assertEquals(1,count(response,"history: onTimeout"));
142 assertEquals(1,count(response,"history: onComplete"));
143 assertContains("RESUMED",response);
144 }
145
146 protected void doSuspendTimeoutSuspendComplete() throws Exception
147 {
148 String response=process("suspend=10&suspend2=1000&complete2=10",null);
149 assertEquals(2,count(response,"history: suspend"));
150 assertEquals(0,count(response,"history: resume"));
151 assertEquals(1,count(response,"history: onTimeout"));
152 assertEquals(1,count(response,"history: onComplete"));
153 assertContains("COMPLETED",response);
154 }
155
156 protected void doSuspendTimeoutSuspend() throws Exception
157 {
158 String response=process("suspend=10&suspend2=10",null);
159 assertEquals(2,count(response,"history: suspend"));
160 assertEquals(0,count(response,"history: resume"));
161 assertEquals(2,count(response,"history: onTimeout"));
162 assertEquals(1,count(response,"history: onComplete"));
163 assertContains("TIMEOUT",response);
164 }
165
166 protected void doSuspendThrowResume() throws Exception
167 {
168 String response=process("suspend=200&resume=10&undispatch=true",null);
169 assertContains("RESUMED",response);
170 assertNotContains("history: onTimeout",response);
171 assertContains("history: onComplete",response);
172 }
173
174 protected void doSuspendResumeThrow() throws Exception
175 {
176 String response=process("suspend=200&resume=0&undispatch=true",null);
177 assertContains("RESUMED",response);
178 assertNotContains("history: onTimeout",response);
179 assertContains("history: onComplete",response);
180 }
181
182 protected void doSuspendThrowComplete() throws Exception
183 {
184 String response=process("suspend=200&complete=10&undispatch=true",null);
185 assertContains("COMPLETED",response);
186 assertNotContains("history: onTimeout",response);
187 assertContains("history: onComplete",response);
188 }
189
190 protected void doSuspendCompleteThrow() throws Exception
191 {
192 String response=process("suspend=200&complete=0&undispatch=true",null);
193 assertContains("COMPLETED",response);
194 assertNotContains("history: onTimeout",response);
195 assertContains("history: onComplete",response);
196 }
197
198
199 private int count(String responses,String substring)
200 {
201 int count=0;
202 int i=responses.indexOf(substring);
203 while (i>=0)
204 {
205 count++;
206 i=responses.indexOf(substring,i+substring.length());
207 }
208
209 return count;
210 }
211
212 protected void assertContains(String content,String response)
213 {
214 assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
215 if (response.indexOf(content,15)<0)
216 {
217 System.err.println("'"+content+"' NOT IN:\n"+response+"\n--");
218 assertTrue(false);
219 }
220 }
221
222 protected void assertNotContains(String content,String response)
223 {
224 assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
225 if (response.indexOf(content,15)>=0)
226 {
227 System.err.println("'"+content+"' IS IN:\n"+response+"'\n--");
228 assertTrue(false);
229 }
230 }
231
232 public synchronized String process(String query,String content) throws Exception
233 {
234 String request = "GET /";
235
236 if (query!=null)
237 request+="?"+query;
238 request+=" HTTP/1.1\r\n"+
239 "Host: localhost\r\n"+
240 "Connection: close\r\n";
241 if (content==null)
242 request+="\r\n";
243 else
244 {
245 request+="Content-Length: "+content.length()+"\r\n";
246 request+="\r\n" + content;
247 }
248
249 int port=_port;
250 String response=null;
251 try
252 {
253 Socket socket = new Socket("localhost",port);
254 socket.setSoTimeout(10000);
255 socket.getOutputStream().write(request.getBytes("UTF-8"));
256
257 response = toString(socket.getInputStream());
258 }
259 catch(Exception e)
260 {
261 System.err.println("failed on port "+port);
262 e.printStackTrace();
263 throw e;
264 }
265 return response;
266 }
267
268
269 protected abstract String toString(InputStream in) throws IOException;
270
271
272 private static class SuspendServlet extends HttpServlet
273 {
274 private Timer _timer=new Timer();
275
276 public SuspendServlet()
277 {}
278
279
280 protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
281 {
282 final Continuation continuation = ContinuationSupport.getContinuation(request);
283
284 response.addHeader("history",continuation.getClass().toString());
285
286 int read_before=0;
287 long sleep_for=-1;
288 long suspend_for=-1;
289 long suspend2_for=-1;
290 long resume_after=-1;
291 long resume2_after=-1;
292 long complete_after=-1;
293 long complete2_after=-1;
294 boolean undispatch=false;
295
296 if (request.getParameter("read")!=null)
297 read_before=Integer.parseInt(request.getParameter("read"));
298 if (request.getParameter("sleep")!=null)
299 sleep_for=Integer.parseInt(request.getParameter("sleep"));
300 if (request.getParameter("suspend")!=null)
301 suspend_for=Integer.parseInt(request.getParameter("suspend"));
302 if (request.getParameter("suspend2")!=null)
303 suspend2_for=Integer.parseInt(request.getParameter("suspend2"));
304 if (request.getParameter("resume")!=null)
305 resume_after=Integer.parseInt(request.getParameter("resume"));
306 if (request.getParameter("resume2")!=null)
307 resume2_after=Integer.parseInt(request.getParameter("resume2"));
308 if (request.getParameter("complete")!=null)
309 complete_after=Integer.parseInt(request.getParameter("complete"));
310 if (request.getParameter("complete2")!=null)
311 complete2_after=Integer.parseInt(request.getParameter("complete2"));
312 if (request.getParameter("undispatch")!=null)
313 undispatch=Boolean.parseBoolean(request.getParameter("undispatch"));
314
315 if (continuation.isInitial())
316 {
317 ((HttpServletResponse)response).addHeader("history","initial");
318 if (read_before>0)
319 {
320 byte[] buf=new byte[read_before];
321 request.getInputStream().read(buf);
322 }
323 else if (read_before<0)
324 {
325 InputStream in = request.getInputStream();
326 int b=in.read();
327 while(b!=-1)
328 b=in.read();
329 }
330
331 if (suspend_for>=0)
332 {
333 if (suspend_for>0)
334 continuation.setTimeout(suspend_for);
335 continuation.addContinuationListener(__listener);
336 ((HttpServletResponse)response).addHeader("history","suspend");
337 continuation.suspend(response);
338
339 if (complete_after>0)
340 {
341 TimerTask complete = new TimerTask()
342 {
343 @Override
344 public void run()
345 {
346 try
347 {
348 response.setStatus(200);
349 response.getOutputStream().println("COMPLETED\n");
350 continuation.complete();
351 }
352 catch(Exception e)
353 {
354 e.printStackTrace();
355 }
356 }
357 };
358 synchronized (_timer)
359 {
360 _timer.schedule(complete,complete_after);
361 }
362 }
363 else if (complete_after==0)
364 {
365 response.setStatus(200);
366 response.getOutputStream().println("COMPLETED\n");
367 continuation.complete();
368 }
369 else if (resume_after>0)
370 {
371 TimerTask resume = new TimerTask()
372 {
373 @Override
374 public void run()
375 {
376 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","resume");
377 continuation.resume();
378 }
379 };
380 synchronized (_timer)
381 {
382 _timer.schedule(resume,resume_after);
383 }
384 }
385 else if (resume_after==0)
386 {
387 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","resume");
388 continuation.resume();
389 }
390
391 if (undispatch)
392 continuation.undispatch();
393 }
394 else if (sleep_for>=0)
395 {
396 try
397 {
398 Thread.sleep(sleep_for);
399 }
400 catch (InterruptedException e)
401 {
402 e.printStackTrace();
403 }
404 response.setStatus(200);
405 response.getOutputStream().println("SLEPT\n");
406 }
407 else
408 {
409 response.setStatus(200);
410 response.getOutputStream().println("NORMAL\n");
411 }
412 }
413 else
414 {
415 ((HttpServletResponse)response).addHeader("history","!initial");
416 if (suspend2_for>=0 && request.getAttribute("2nd")==null)
417 {
418 request.setAttribute("2nd","cycle");
419
420 if (suspend2_for>0)
421 continuation.setTimeout(suspend2_for);
422
423 ((HttpServletResponse)response).addHeader("history","suspend");
424 continuation.suspend(response);
425
426 if (complete2_after>0)
427 {
428 TimerTask complete = new TimerTask()
429 {
430 @Override
431 public void run()
432 {
433 try
434 {
435 response.setStatus(200);
436 response.getOutputStream().println("COMPLETED\n");
437 continuation.complete();
438 }
439 catch(Exception e)
440 {
441 e.printStackTrace();
442 }
443 }
444 };
445 synchronized (_timer)
446 {
447 _timer.schedule(complete,complete2_after);
448 }
449 }
450 else if (complete2_after==0)
451 {
452 response.setStatus(200);
453 response.getOutputStream().println("COMPLETED\n");
454 continuation.complete();
455 }
456 else if (resume2_after>0)
457 {
458 TimerTask resume = new TimerTask()
459 {
460 @Override
461 public void run()
462 {
463 ((HttpServletResponse)response).addHeader("history","resume");
464 continuation.resume();
465 }
466 };
467 synchronized (_timer)
468 {
469 _timer.schedule(resume,resume2_after);
470 }
471 }
472 else if (resume2_after==0)
473 {
474 ((HttpServletResponse)response).addHeader("history","resume");
475 continuation.resume();
476 }
477 if (undispatch)
478 continuation.undispatch();
479 return;
480 }
481 else if (continuation.isExpired())
482 {
483 response.setStatus(200);
484 response.getOutputStream().println("TIMEOUT\n");
485 }
486 else if (continuation.isResumed())
487 {
488 response.setStatus(200);
489 response.getOutputStream().println("RESUMED\n");
490 }
491 else
492 {
493 response.setStatus(200);
494 response.getOutputStream().println("unknown???\n");
495 }
496 }
497 }
498 }
499
500
501 private static ContinuationListener __listener = new ContinuationListener()
502 {
503 public void onComplete(Continuation continuation)
504 {
505 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","onComplete");
506 }
507
508 public void onTimeout(Continuation continuation)
509 {
510 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","onTimeout");
511 continuation.resume();
512 }
513
514 };
515 }