1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.continuation.test;
15
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.net.Socket;
19 import java.util.Timer;
20 import java.util.TimerTask;
21
22 import javax.servlet.ServletException;
23 import javax.servlet.http.HttpServlet;
24 import javax.servlet.http.HttpServletRequest;
25 import javax.servlet.http.HttpServletResponse;
26
27 import junit.framework.TestCase;
28
29 import org.eclipse.jetty.continuation.Continuation;
30 import org.eclipse.jetty.continuation.ContinuationListener;
31 import org.eclipse.jetty.continuation.ContinuationSupport;
32
33
34
35 public abstract class ContinuationBase extends TestCase
36 {
37 protected SuspendServlet _servlet=new SuspendServlet();
38 protected int _port;
39
40 protected void doNormal(String type) throws Exception
41 {
42 String response=process(null,null);
43 assertContains(type,response);
44 assertContains("NORMAL",response);
45 assertNotContains("history: onTimeout",response);
46 assertNotContains("history: onComplete",response);
47 }
48
49 protected void doSleep() throws Exception
50 {
51 String response=process("sleep=200",null);
52 assertContains("SLEPT",response);
53 assertNotContains("history: onTimeout",response);
54 assertNotContains("history: onComplete",response);
55 }
56
57 protected void doSuspend() throws Exception
58 {
59 String response=process("suspend=200",null);
60 assertContains("TIMEOUT",response);
61 assertContains("history: onTimeout",response);
62 assertContains("history: onComplete",response);
63 }
64
65 protected void doSuspendWaitResume() throws Exception
66 {
67 String response=process("suspend=200&resume=10",null);
68 assertContains("RESUMED",response);
69 assertNotContains("history: onTimeout",response);
70 assertContains("history: onComplete",response);
71 }
72
73 protected void doSuspendResume() throws Exception
74 {
75 String response=process("suspend=200&resume=0",null);
76 assertContains("RESUMED",response);
77 assertNotContains("history: onTimeout",response);
78 assertContains("history: onComplete",response);
79 }
80
81 protected void doSuspendWaitComplete() throws Exception
82 {
83 String response=process("suspend=200&complete=50",null);
84 assertContains("COMPLETED",response);
85 assertContains("history: initial",response);
86 assertNotContains("history: onTimeout",response);
87 assertContains("history: onComplete",response);
88 assertNotContains("history: !initial",response);
89 }
90
91 protected void doSuspendComplete() throws Exception
92 {
93 String response=process("suspend=200&complete=0",null);
94 assertContains("COMPLETED",response);
95 assertContains("history: initial",response);
96 assertNotContains("history: onTimeout",response);
97 assertContains("history: onComplete",response);
98 assertNotContains("history: !initial",response);
99 }
100
101 protected void doSuspendWaitResumeSuspendWaitResume() throws Exception
102 {
103 String response=process("suspend=1000&resume=10&suspend2=1000&resume2=10",null);
104 assertEquals(2,count(response,"history: suspend"));
105 assertEquals(2,count(response,"history: resume"));
106 assertEquals(0,count(response,"history: onTimeout"));
107 assertEquals(1,count(response,"history: onComplete"));
108 assertContains("RESUMED",response);
109 }
110
111 protected void doSuspendWaitResumeSuspendComplete() throws Exception
112 {
113 String response=process("suspend=1000&resume=10&suspend2=1000&complete2=10",null);
114 assertEquals(2,count(response,"history: suspend"));
115 assertEquals(1,count(response,"history: resume"));
116 assertEquals(0,count(response,"history: onTimeout"));
117 assertEquals(1,count(response,"history: onComplete"));
118 assertContains("COMPLETED",response);
119 }
120
121 protected void doSuspendWaitResumeSuspend() throws Exception
122 {
123 String response=process("suspend=1000&resume=10&suspend2=10",null);
124 assertEquals(2,count(response,"history: suspend"));
125 assertEquals(1,count(response,"history: resume"));
126 assertEquals(1,count(response,"history: onTimeout"));
127 assertEquals(1,count(response,"history: onComplete"));
128 assertContains("TIMEOUT",response);
129 }
130
131 protected void doSuspendTimeoutSuspendResume() throws Exception
132 {
133 String response=process("suspend=10&suspend2=1000&resume2=10",null);
134 assertEquals(2,count(response,"history: suspend"));
135 assertEquals(1,count(response,"history: resume"));
136 assertEquals(1,count(response,"history: onTimeout"));
137 assertEquals(1,count(response,"history: onComplete"));
138 assertContains("RESUMED",response);
139 }
140
141 protected void doSuspendTimeoutSuspendComplete() throws Exception
142 {
143 String response=process("suspend=10&suspend2=1000&complete2=10",null);
144 assertEquals(2,count(response,"history: suspend"));
145 assertEquals(0,count(response,"history: resume"));
146 assertEquals(1,count(response,"history: onTimeout"));
147 assertEquals(1,count(response,"history: onComplete"));
148 assertContains("COMPLETED",response);
149 }
150
151 protected void doSuspendTimeoutSuspend() throws Exception
152 {
153 String response=process("suspend=10&suspend2=10",null);
154 assertEquals(2,count(response,"history: suspend"));
155 assertEquals(0,count(response,"history: resume"));
156 assertEquals(2,count(response,"history: onTimeout"));
157 assertEquals(1,count(response,"history: onComplete"));
158 assertContains("TIMEOUT",response);
159 }
160
161 protected void doSuspendThrowResume() throws Exception
162 {
163 String response=process("suspend=200&resume=10&undispatch=true",null);
164 assertContains("RESUMED",response);
165 assertNotContains("history: onTimeout",response);
166 assertContains("history: onComplete",response);
167 }
168
169 protected void doSuspendResumeThrow() throws Exception
170 {
171 String response=process("suspend=200&resume=0&undispatch=true",null);
172 assertContains("RESUMED",response);
173 assertNotContains("history: onTimeout",response);
174 assertContains("history: onComplete",response);
175 }
176
177 protected void doSuspendThrowComplete() throws Exception
178 {
179 String response=process("suspend=200&complete=10&undispatch=true",null);
180 assertContains("COMPLETED",response);
181 assertNotContains("history: onTimeout",response);
182 assertContains("history: onComplete",response);
183 }
184
185 protected void doSuspendCompleteThrow() throws Exception
186 {
187 String response=process("suspend=200&complete=0&undispatch=true",null);
188 assertContains("COMPLETED",response);
189 assertNotContains("history: onTimeout",response);
190 assertContains("history: onComplete",response);
191 }
192
193
194 private int count(String responses,String substring)
195 {
196 int count=0;
197 int i=responses.indexOf(substring);
198 while (i>=0)
199 {
200 count++;
201 i=responses.indexOf(substring,i+substring.length());
202 }
203
204 return count;
205 }
206
207 protected void assertContains(String content,String response)
208 {
209 assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
210 if (response.indexOf(content,15)<0)
211 {
212 System.err.println("'"+content+"' NOT IN:\n"+response+"\n--");
213 assertTrue(false);
214 }
215 }
216
217 protected void assertNotContains(String content,String response)
218 {
219 assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
220 if (response.indexOf(content,15)>=0)
221 {
222 System.err.println("'"+content+"' IS IN:\n"+response+"'\n--");
223 assertTrue(false);
224 }
225 }
226
227 public synchronized String process(String query,String content) throws Exception
228 {
229 String request = "GET /";
230
231 if (query!=null)
232 request+="?"+query;
233 request+=" HTTP/1.1\r\n"+
234 "Host: localhost\r\n"+
235 "Connection: close\r\n";
236 if (content!=null)
237 request+="Content-Length: "+content.length()+"\r\n";
238 request+="\r\n" + content;
239
240 int port=_port;
241 String response=null;
242 try
243 {
244 Socket socket = new Socket("localhost",port);
245 socket.setSoTimeout(10000);
246 socket.getOutputStream().write(request.getBytes("UTF-8"));
247
248 response = toString(socket.getInputStream());
249 }
250 catch(Exception e)
251 {
252 System.err.println("failed on port "+port);
253 e.printStackTrace();
254 throw e;
255 }
256 return response;
257 }
258
259
260 protected abstract String toString(InputStream in) throws IOException;
261
262
263 private static class SuspendServlet extends HttpServlet
264 {
265 private Timer _timer=new Timer();
266
267 public SuspendServlet()
268 {}
269
270
271 protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
272 {
273 final Continuation continuation = ContinuationSupport.getContinuation(request);
274
275 response.addHeader("history",continuation.getClass().toString());
276
277 int read_before=0;
278 long sleep_for=-1;
279 long suspend_for=-1;
280 long suspend2_for=-1;
281 long resume_after=-1;
282 long resume2_after=-1;
283 long complete_after=-1;
284 long complete2_after=-1;
285 boolean undispatch=false;
286
287 if (request.getParameter("read")!=null)
288 read_before=Integer.parseInt(request.getParameter("read"));
289 if (request.getParameter("sleep")!=null)
290 sleep_for=Integer.parseInt(request.getParameter("sleep"));
291 if (request.getParameter("suspend")!=null)
292 suspend_for=Integer.parseInt(request.getParameter("suspend"));
293 if (request.getParameter("suspend2")!=null)
294 suspend2_for=Integer.parseInt(request.getParameter("suspend2"));
295 if (request.getParameter("resume")!=null)
296 resume_after=Integer.parseInt(request.getParameter("resume"));
297 if (request.getParameter("resume2")!=null)
298 resume2_after=Integer.parseInt(request.getParameter("resume2"));
299 if (request.getParameter("complete")!=null)
300 complete_after=Integer.parseInt(request.getParameter("complete"));
301 if (request.getParameter("complete2")!=null)
302 complete2_after=Integer.parseInt(request.getParameter("complete2"));
303 if (request.getParameter("undispatch")!=null)
304 undispatch=Boolean.parseBoolean(request.getParameter("undispatch"));
305
306 if (continuation.isInitial())
307 {
308 ((HttpServletResponse)response).addHeader("history","initial");
309 if (read_before>0)
310 {
311 byte[] buf=new byte[read_before];
312 request.getInputStream().read(buf);
313 }
314 else if (read_before<0)
315 {
316 InputStream in = request.getInputStream();
317 int b=in.read();
318 while(b!=-1)
319 b=in.read();
320 }
321
322 if (suspend_for>=0)
323 {
324 if (suspend_for>0)
325 continuation.setTimeout(suspend_for);
326 continuation.addContinuationListener(__listener);
327 ((HttpServletResponse)response).addHeader("history","suspend");
328 continuation.suspend(response);
329
330 if (complete_after>0)
331 {
332 TimerTask complete = new TimerTask()
333 {
334 @Override
335 public void run()
336 {
337 try
338 {
339 response.setStatus(200);
340 response.getOutputStream().println("COMPLETED\n");
341 continuation.complete();
342 }
343 catch(Exception e)
344 {
345 e.printStackTrace();
346 }
347 }
348 };
349 synchronized (_timer)
350 {
351 _timer.schedule(complete,complete_after);
352 }
353 }
354 else if (complete_after==0)
355 {
356 response.setStatus(200);
357 response.getOutputStream().println("COMPLETED\n");
358 continuation.complete();
359 }
360 else if (resume_after>0)
361 {
362 TimerTask resume = new TimerTask()
363 {
364 @Override
365 public void run()
366 {
367 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","resume");
368 continuation.resume();
369 }
370 };
371 synchronized (_timer)
372 {
373 _timer.schedule(resume,resume_after);
374 }
375 }
376 else if (resume_after==0)
377 {
378 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","resume");
379 continuation.resume();
380 }
381
382 if (undispatch)
383 continuation.undispatch();
384 }
385 else if (sleep_for>=0)
386 {
387 try
388 {
389 Thread.sleep(sleep_for);
390 }
391 catch (InterruptedException e)
392 {
393 e.printStackTrace();
394 }
395 response.setStatus(200);
396 response.getOutputStream().println("SLEPT\n");
397 }
398 else
399 {
400 response.setStatus(200);
401 response.getOutputStream().println("NORMAL\n");
402 }
403 }
404 else
405 {
406 ((HttpServletResponse)response).addHeader("history","!initial");
407 if (suspend2_for>=0 && request.getAttribute("2nd")==null)
408 {
409 request.setAttribute("2nd","cycle");
410
411 if (suspend2_for>0)
412 continuation.setTimeout(suspend2_for);
413
414 ((HttpServletResponse)response).addHeader("history","suspend");
415 continuation.suspend(response);
416
417 if (complete2_after>0)
418 {
419 TimerTask complete = new TimerTask()
420 {
421 @Override
422 public void run()
423 {
424 try
425 {
426 response.setStatus(200);
427 response.getOutputStream().println("COMPLETED\n");
428 continuation.complete();
429 }
430 catch(Exception e)
431 {
432 e.printStackTrace();
433 }
434 }
435 };
436 synchronized (_timer)
437 {
438 _timer.schedule(complete,complete2_after);
439 }
440 }
441 else if (complete2_after==0)
442 {
443 response.setStatus(200);
444 response.getOutputStream().println("COMPLETED\n");
445 continuation.complete();
446 }
447 else if (resume2_after>0)
448 {
449 TimerTask resume = new TimerTask()
450 {
451 @Override
452 public void run()
453 {
454 ((HttpServletResponse)response).addHeader("history","resume");
455 continuation.resume();
456 }
457 };
458 synchronized (_timer)
459 {
460 _timer.schedule(resume,resume2_after);
461 }
462 }
463 else if (resume2_after==0)
464 {
465 ((HttpServletResponse)response).addHeader("history","resume");
466 continuation.resume();
467 }
468 if (undispatch)
469 continuation.undispatch();
470 return;
471 }
472 else if (continuation.isExpired())
473 {
474 response.setStatus(200);
475 response.getOutputStream().println("TIMEOUT\n");
476 }
477 else if (continuation.isResumed())
478 {
479 response.setStatus(200);
480 response.getOutputStream().println("RESUMED\n");
481 }
482 else
483 {
484 response.setStatus(200);
485 response.getOutputStream().println("unknown???\n");
486 }
487 }
488 }
489 }
490
491
492 private static ContinuationListener __listener = new ContinuationListener()
493 {
494 public void onComplete(Continuation continuation)
495 {
496 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","onComplete");
497 }
498
499 public void onTimeout(Continuation continuation)
500 {
501 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","onTimeout");
502 continuation.resume();
503 }
504
505 };
506 }