1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.continuation.test;
15
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.net.Socket;
19 import java.util.Timer;
20 import java.util.TimerTask;
21
22 import javax.servlet.ServletException;
23 import javax.servlet.http.HttpServlet;
24 import javax.servlet.http.HttpServletRequest;
25 import javax.servlet.http.HttpServletResponse;
26
27 import junit.framework.TestCase;
28
29 import org.eclipse.jetty.continuation.Continuation;
30 import org.eclipse.jetty.continuation.ContinuationListener;
31 import org.eclipse.jetty.continuation.ContinuationSupport;
32
33
34
35 public abstract class ContinuationBase extends TestCase
36 {
37 protected SuspendServlet _servlet=new SuspendServlet();
38 protected int _port;
39
40 protected void doNormal(String type) throws Exception
41 {
42 String response=process(null,null);
43 assertContains(type,response);
44 assertContains("NORMAL",response);
45 assertNotContains("history: onTimeout",response);
46 assertNotContains("history: onComplete",response);
47 }
48
49 protected void doSleep() throws Exception
50 {
51 String response=process("sleep=200",null);
52 assertContains("SLEPT",response);
53 assertNotContains("history: onTimeout",response);
54 assertNotContains("history: onComplete",response);
55 }
56
57 protected void doSuspend() throws Exception
58 {
59 String response=process("suspend=200",null);
60 assertContains("TIMEOUT",response);
61 assertContains("history: onTimeout",response);
62 assertContains("history: onComplete",response);
63 }
64
65 protected void doSuspendWaitResume() throws Exception
66 {
67 String response=process("suspend=200&resume=10",null);
68 assertContains("RESUMED",response);
69 assertNotContains("history: onTimeout",response);
70 assertContains("history: onComplete",response);
71 }
72
73 protected void doSuspendResume() throws Exception
74 {
75 String response=process("suspend=200&resume=0",null);
76 assertContains("RESUMED",response);
77 assertNotContains("history: onTimeout",response);
78 assertContains("history: onComplete",response);
79 }
80
81 protected void doSuspendWaitComplete() throws Exception
82 {
83 String response=process("suspend=200&complete=50",null);
84 assertContains("COMPLETED",response);
85 assertContains("history: initial",response);
86 assertNotContains("history: onTimeout",response);
87 assertContains("history: onComplete",response);
88 assertNotContains("history: !initial",response);
89 }
90
91 protected void doSuspendComplete() throws Exception
92 {
93 String response=process("suspend=200&complete=0",null);
94 assertContains("COMPLETED",response);
95 assertContains("history: initial",response);
96 assertNotContains("history: onTimeout",response);
97 assertContains("history: onComplete",response);
98 assertNotContains("history: !initial",response);
99 }
100
101 protected void doSuspendWaitResumeSuspendWaitResume() throws Exception
102 {
103 String response=process("suspend=1000&resume=10&suspend2=1000&resume2=10",null);
104 assertEquals(2,count(response,"history: suspend"));
105 assertEquals(2,count(response,"history: resume"));
106 assertEquals(0,count(response,"history: onTimeout"));
107 assertEquals(1,count(response,"history: onComplete"));
108 assertContains("RESUMED",response);
109 }
110
111 protected void doSuspendWaitResumeSuspendComplete() throws Exception
112 {
113 String response=process("suspend=1000&resume=10&suspend2=1000&complete2=10",null);
114 assertEquals(2,count(response,"history: suspend"));
115 assertEquals(1,count(response,"history: resume"));
116 assertEquals(0,count(response,"history: onTimeout"));
117 assertEquals(1,count(response,"history: onComplete"));
118 assertContains("COMPLETED",response);
119 }
120
121 protected void doSuspendWaitResumeSuspend() throws Exception
122 {
123 String response=process("suspend=1000&resume=10&suspend2=10",null);
124 assertEquals(2,count(response,"history: suspend"));
125 assertEquals(1,count(response,"history: resume"));
126 assertEquals(1,count(response,"history: onTimeout"));
127 assertEquals(1,count(response,"history: onComplete"));
128 assertContains("TIMEOUT",response);
129 }
130
131 protected void doSuspendTimeoutSuspendResume() throws Exception
132 {
133 String response=process("suspend=10&suspend2=1000&resume2=10",null);
134 assertEquals(2,count(response,"history: suspend"));
135 assertEquals(1,count(response,"history: resume"));
136 assertEquals(1,count(response,"history: onTimeout"));
137 assertEquals(1,count(response,"history: onComplete"));
138 assertContains("RESUMED",response);
139 }
140
141 protected void doSuspendTimeoutSuspendComplete() throws Exception
142 {
143 String response=process("suspend=10&suspend2=1000&complete2=10",null);
144 assertEquals(2,count(response,"history: suspend"));
145 assertEquals(0,count(response,"history: resume"));
146 assertEquals(1,count(response,"history: onTimeout"));
147 assertEquals(1,count(response,"history: onComplete"));
148 assertContains("COMPLETED",response);
149 }
150
151 protected void doSuspendTimeoutSuspend() throws Exception
152 {
153 String response=process("suspend=10&suspend2=10",null);
154 assertEquals(2,count(response,"history: suspend"));
155 assertEquals(0,count(response,"history: resume"));
156 assertEquals(2,count(response,"history: onTimeout"));
157 assertEquals(1,count(response,"history: onComplete"));
158 assertContains("TIMEOUT",response);
159 }
160
161 protected void doSuspendThrowResume() throws Exception
162 {
163 String response=process("suspend=200&resume=10&undispatch=true",null);
164 assertContains("RESUMED",response);
165 assertNotContains("history: onTimeout",response);
166 assertContains("history: onComplete",response);
167 }
168
169 protected void doSuspendResumeThrow() throws Exception
170 {
171 String response=process("suspend=200&resume=0&undispatch=true",null);
172 assertContains("RESUMED",response);
173 assertNotContains("history: onTimeout",response);
174 assertContains("history: onComplete",response);
175 }
176
177 protected void doSuspendThrowComplete() throws Exception
178 {
179 String response=process("suspend=200&complete=10&undispatch=true",null);
180 assertContains("COMPLETED",response);
181 assertNotContains("history: onTimeout",response);
182 assertContains("history: onComplete",response);
183 }
184
185 protected void doSuspendCompleteThrow() throws Exception
186 {
187 String response=process("suspend=200&complete=0&undispatch=true",null);
188 assertContains("COMPLETED",response);
189 assertNotContains("history: onTimeout",response);
190 assertContains("history: onComplete",response);
191 }
192
193
194 private int count(String responses,String substring)
195 {
196 int count=0;
197 int i=responses.indexOf(substring);
198 while (i>=0)
199 {
200 count++;
201 i=responses.indexOf(substring,i+substring.length());
202 }
203
204 return count;
205 }
206
207 protected void assertContains(String content,String response)
208 {
209 assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
210 if (response.indexOf(content,15)<0)
211 {
212 System.err.println("'"+content+"' NOT IN:\n"+response+"\n--");
213 assertTrue(false);
214 }
215 }
216
217 protected void assertNotContains(String content,String response)
218 {
219 assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
220 if (response.indexOf(content,15)>=0)
221 {
222 System.err.println("'"+content+"' IS IN:\n"+response+"'\n--");
223 assertTrue(false);
224 }
225 }
226
227 public synchronized String process(String query,String content) throws Exception
228 {
229 String request = "GET /";
230
231 if (query!=null)
232 request+="?"+query;
233 request+=" HTTP/1.1\r\n"+
234 "Host: localhost\r\n"+
235 "Connection: close\r\n";
236 if (content==null)
237 request+="\r\n";
238 else
239 {
240 request+="Content-Length: "+content.length()+"\r\n";
241 request+="\r\n" + content;
242 }
243
244 int port=_port;
245 String response=null;
246 try
247 {
248 Socket socket = new Socket("localhost",port);
249 socket.setSoTimeout(10000);
250 socket.getOutputStream().write(request.getBytes("UTF-8"));
251
252 response = toString(socket.getInputStream());
253 }
254 catch(Exception e)
255 {
256 System.err.println("failed on port "+port);
257 e.printStackTrace();
258 throw e;
259 }
260 return response;
261 }
262
263
264 protected abstract String toString(InputStream in) throws IOException;
265
266
267 private static class SuspendServlet extends HttpServlet
268 {
269 private Timer _timer=new Timer();
270
271 public SuspendServlet()
272 {}
273
274
275 protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
276 {
277 final Continuation continuation = ContinuationSupport.getContinuation(request);
278
279 response.addHeader("history",continuation.getClass().toString());
280
281 int read_before=0;
282 long sleep_for=-1;
283 long suspend_for=-1;
284 long suspend2_for=-1;
285 long resume_after=-1;
286 long resume2_after=-1;
287 long complete_after=-1;
288 long complete2_after=-1;
289 boolean undispatch=false;
290
291 if (request.getParameter("read")!=null)
292 read_before=Integer.parseInt(request.getParameter("read"));
293 if (request.getParameter("sleep")!=null)
294 sleep_for=Integer.parseInt(request.getParameter("sleep"));
295 if (request.getParameter("suspend")!=null)
296 suspend_for=Integer.parseInt(request.getParameter("suspend"));
297 if (request.getParameter("suspend2")!=null)
298 suspend2_for=Integer.parseInt(request.getParameter("suspend2"));
299 if (request.getParameter("resume")!=null)
300 resume_after=Integer.parseInt(request.getParameter("resume"));
301 if (request.getParameter("resume2")!=null)
302 resume2_after=Integer.parseInt(request.getParameter("resume2"));
303 if (request.getParameter("complete")!=null)
304 complete_after=Integer.parseInt(request.getParameter("complete"));
305 if (request.getParameter("complete2")!=null)
306 complete2_after=Integer.parseInt(request.getParameter("complete2"));
307 if (request.getParameter("undispatch")!=null)
308 undispatch=Boolean.parseBoolean(request.getParameter("undispatch"));
309
310 if (continuation.isInitial())
311 {
312 ((HttpServletResponse)response).addHeader("history","initial");
313 if (read_before>0)
314 {
315 byte[] buf=new byte[read_before];
316 request.getInputStream().read(buf);
317 }
318 else if (read_before<0)
319 {
320 InputStream in = request.getInputStream();
321 int b=in.read();
322 while(b!=-1)
323 b=in.read();
324 }
325
326 if (suspend_for>=0)
327 {
328 if (suspend_for>0)
329 continuation.setTimeout(suspend_for);
330 continuation.addContinuationListener(__listener);
331 ((HttpServletResponse)response).addHeader("history","suspend");
332 continuation.suspend(response);
333
334 if (complete_after>0)
335 {
336 TimerTask complete = new TimerTask()
337 {
338 @Override
339 public void run()
340 {
341 try
342 {
343 response.setStatus(200);
344 response.getOutputStream().println("COMPLETED\n");
345 continuation.complete();
346 }
347 catch(Exception e)
348 {
349 e.printStackTrace();
350 }
351 }
352 };
353 synchronized (_timer)
354 {
355 _timer.schedule(complete,complete_after);
356 }
357 }
358 else if (complete_after==0)
359 {
360 response.setStatus(200);
361 response.getOutputStream().println("COMPLETED\n");
362 continuation.complete();
363 }
364 else if (resume_after>0)
365 {
366 TimerTask resume = new TimerTask()
367 {
368 @Override
369 public void run()
370 {
371 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","resume");
372 continuation.resume();
373 }
374 };
375 synchronized (_timer)
376 {
377 _timer.schedule(resume,resume_after);
378 }
379 }
380 else if (resume_after==0)
381 {
382 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","resume");
383 continuation.resume();
384 }
385
386 if (undispatch)
387 continuation.undispatch();
388 }
389 else if (sleep_for>=0)
390 {
391 try
392 {
393 Thread.sleep(sleep_for);
394 }
395 catch (InterruptedException e)
396 {
397 e.printStackTrace();
398 }
399 response.setStatus(200);
400 response.getOutputStream().println("SLEPT\n");
401 }
402 else
403 {
404 response.setStatus(200);
405 response.getOutputStream().println("NORMAL\n");
406 }
407 }
408 else
409 {
410 ((HttpServletResponse)response).addHeader("history","!initial");
411 if (suspend2_for>=0 && request.getAttribute("2nd")==null)
412 {
413 request.setAttribute("2nd","cycle");
414
415 if (suspend2_for>0)
416 continuation.setTimeout(suspend2_for);
417
418 ((HttpServletResponse)response).addHeader("history","suspend");
419 continuation.suspend(response);
420
421 if (complete2_after>0)
422 {
423 TimerTask complete = new TimerTask()
424 {
425 @Override
426 public void run()
427 {
428 try
429 {
430 response.setStatus(200);
431 response.getOutputStream().println("COMPLETED\n");
432 continuation.complete();
433 }
434 catch(Exception e)
435 {
436 e.printStackTrace();
437 }
438 }
439 };
440 synchronized (_timer)
441 {
442 _timer.schedule(complete,complete2_after);
443 }
444 }
445 else if (complete2_after==0)
446 {
447 response.setStatus(200);
448 response.getOutputStream().println("COMPLETED\n");
449 continuation.complete();
450 }
451 else if (resume2_after>0)
452 {
453 TimerTask resume = new TimerTask()
454 {
455 @Override
456 public void run()
457 {
458 ((HttpServletResponse)response).addHeader("history","resume");
459 continuation.resume();
460 }
461 };
462 synchronized (_timer)
463 {
464 _timer.schedule(resume,resume2_after);
465 }
466 }
467 else if (resume2_after==0)
468 {
469 ((HttpServletResponse)response).addHeader("history","resume");
470 continuation.resume();
471 }
472 if (undispatch)
473 continuation.undispatch();
474 return;
475 }
476 else if (continuation.isExpired())
477 {
478 response.setStatus(200);
479 response.getOutputStream().println("TIMEOUT\n");
480 }
481 else if (continuation.isResumed())
482 {
483 response.setStatus(200);
484 response.getOutputStream().println("RESUMED\n");
485 }
486 else
487 {
488 response.setStatus(200);
489 response.getOutputStream().println("unknown???\n");
490 }
491 }
492 }
493 }
494
495
496 private static ContinuationListener __listener = new ContinuationListener()
497 {
498 public void onComplete(Continuation continuation)
499 {
500 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","onComplete");
501 }
502
503 public void onTimeout(Continuation continuation)
504 {
505 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","onTimeout");
506 continuation.resume();
507 }
508
509 };
510 }