1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.continuation.test;
15
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.net.Socket;
19 import java.util.Timer;
20 import java.util.TimerTask;
21
22 import javax.servlet.ServletException;
23 import javax.servlet.http.HttpServlet;
24 import javax.servlet.http.HttpServletRequest;
25 import javax.servlet.http.HttpServletResponse;
26
27 import junit.framework.TestCase;
28
29 import org.eclipse.jetty.continuation.Continuation;
30 import org.eclipse.jetty.continuation.ContinuationListener;
31 import org.eclipse.jetty.continuation.ContinuationSupport;
32
33
34
35 public abstract class ContinuationBase extends TestCase
36 {
37 protected SuspendServlet _servlet=new SuspendServlet();
38 protected int _port;
39
40 protected void doNormal(String type) throws Exception
41 {
42 String response=process(null,null);
43 assertContains(type,response);
44 assertContains("NORMAL",response);
45 assertNotContains("history: onTimeout",response);
46 assertNotContains("history: onComplete",response);
47 }
48
49 protected void doSleep() throws Exception
50 {
51 String response=process("sleep=200",null);
52 assertContains("SLEPT",response);
53 assertNotContains("history: onTimeout",response);
54 assertNotContains("history: onComplete",response);
55 }
56
57 protected void doSuspend() throws Exception
58 {
59 String response=process("suspend=200",null);
60 assertContains("TIMEOUT",response);
61 assertContains("history: onTimeout",response);
62 assertContains("history: onComplete",response);
63 }
64
65 protected void doSuspendWaitResume() throws Exception
66 {
67 String response=process("suspend=200&resume=10",null);
68 assertContains("RESUMED",response);
69 assertNotContains("history: onTimeout",response);
70 assertContains("history: onComplete",response);
71 }
72
73 protected void doSuspendResume() throws Exception
74 {
75 String response=process("suspend=200&resume=0",null);
76 assertContains("RESUMED",response);
77 assertNotContains("history: onTimeout",response);
78 assertContains("history: onComplete",response);
79 }
80
81 protected void doSuspendWaitComplete() throws Exception
82 {
83 String response=process("suspend=200&complete=10",null);
84 assertContains("COMPLETED",response);
85 assertNotContains("history: onTimeout",response);
86 assertContains("history: onComplete",response);
87 }
88
89 protected void doSuspendComplete() throws Exception
90 {
91 String response=process("suspend=200&complete=0",null);
92 assertContains("COMPLETED",response);
93 assertNotContains("history: onTimeout",response);
94 assertContains("history: onComplete",response);
95 }
96
97 protected void doSuspendWaitResumeSuspendWaitResume() throws Exception
98 {
99 String response=process("suspend=1000&resume=10&suspend2=1000&resume2=10",null);
100 assertEquals(2,count(response,"history: suspend"));
101 assertEquals(2,count(response,"history: resume"));
102 assertEquals(0,count(response,"history: onTimeout"));
103 assertEquals(1,count(response,"history: onComplete"));
104 assertContains("RESUMED",response);
105 }
106
107 protected void doSuspendWaitResumeSuspendComplete() throws Exception
108 {
109 String response=process("suspend=1000&resume=10&suspend2=1000&complete2=10",null);
110 assertEquals(2,count(response,"history: suspend"));
111 assertEquals(1,count(response,"history: resume"));
112 assertEquals(0,count(response,"history: onTimeout"));
113 assertEquals(1,count(response,"history: onComplete"));
114 assertContains("COMPLETED",response);
115 }
116
117 protected void doSuspendWaitResumeSuspend() throws Exception
118 {
119 String response=process("suspend=1000&resume=10&suspend2=10",null);
120 assertEquals(2,count(response,"history: suspend"));
121 assertEquals(1,count(response,"history: resume"));
122 assertEquals(1,count(response,"history: onTimeout"));
123 assertEquals(1,count(response,"history: onComplete"));
124 assertContains("TIMEOUT",response);
125 }
126
127 protected void doSuspendTimeoutSuspendResume() throws Exception
128 {
129 String response=process("suspend=10&suspend2=1000&resume2=10",null);
130 assertEquals(2,count(response,"history: suspend"));
131 assertEquals(1,count(response,"history: resume"));
132 assertEquals(1,count(response,"history: onTimeout"));
133 assertEquals(1,count(response,"history: onComplete"));
134 assertContains("RESUMED",response);
135 }
136
137 protected void doSuspendTimeoutSuspendComplete() throws Exception
138 {
139 String response=process("suspend=10&suspend2=1000&complete2=10",null);
140 assertEquals(2,count(response,"history: suspend"));
141 assertEquals(0,count(response,"history: resume"));
142 assertEquals(1,count(response,"history: onTimeout"));
143 assertEquals(1,count(response,"history: onComplete"));
144 assertContains("COMPLETED",response);
145 }
146
147 protected void doSuspendTimeoutSuspend() throws Exception
148 {
149 String response=process("suspend=10&suspend2=10",null);
150 assertEquals(2,count(response,"history: suspend"));
151 assertEquals(0,count(response,"history: resume"));
152 assertEquals(2,count(response,"history: onTimeout"));
153 assertEquals(1,count(response,"history: onComplete"));
154 assertContains("TIMEOUT",response);
155 }
156
157 protected void doSuspendThrowResume() throws Exception
158 {
159 String response=process("suspend=200&resume=10&undispatch=true",null);
160 assertContains("RESUMED",response);
161 assertNotContains("history: onTimeout",response);
162 assertContains("history: onComplete",response);
163 }
164
165 protected void doSuspendResumeThrow() throws Exception
166 {
167 String response=process("suspend=200&resume=0&undispatch=true",null);
168 assertContains("RESUMED",response);
169 assertNotContains("history: onTimeout",response);
170 assertContains("history: onComplete",response);
171 }
172
173 protected void doSuspendThrowComplete() throws Exception
174 {
175 String response=process("suspend=200&complete=10&undispatch=true",null);
176 assertContains("COMPLETED",response);
177 assertNotContains("history: onTimeout",response);
178 assertContains("history: onComplete",response);
179 }
180
181 protected void doSuspendCompleteThrow() throws Exception
182 {
183 String response=process("suspend=200&complete=0&undispatch=true",null);
184 assertContains("COMPLETED",response);
185 assertNotContains("history: onTimeout",response);
186 assertContains("history: onComplete",response);
187 }
188
189
190 private int count(String responses,String substring)
191 {
192 int count=0;
193 int i=responses.indexOf(substring);
194 while (i>=0)
195 {
196 count++;
197 i=responses.indexOf(substring,i+substring.length());
198 }
199
200 return count;
201 }
202
203 protected void assertContains(String content,String response)
204 {
205 assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
206 if (response.indexOf(content,15)<0)
207 {
208 System.err.println("'"+content+"' NOT IN:\n"+response+"\n--");
209 assertTrue(false);
210 }
211 }
212
213 protected void assertNotContains(String content,String response)
214 {
215 assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
216 if (response.indexOf(content,15)>=0)
217 {
218 System.err.println("'"+content+"' IS IN:\n"+response+"'\n--");
219 assertTrue(false);
220 }
221 }
222
223 public synchronized String process(String query,String content) throws Exception
224 {
225 String request = "GET /";
226
227 if (query!=null)
228 request+="?"+query;
229 request+=" HTTP/1.1\r\n"+
230 "Host: localhost\r\n"+
231 "Connection: close\r\n";
232 if (content!=null)
233 request+="Content-Length: "+content.length()+"\r\n";
234 request+="\r\n" + content;
235
236 Socket socket = new Socket("localhost",_port);
237 socket.getOutputStream().write(request.getBytes("UTF-8"));
238
239 String response = toString(socket.getInputStream());
240 return response;
241 }
242
243
244 protected abstract String toString(InputStream in) throws IOException;
245
246
247 private static class SuspendServlet extends HttpServlet
248 {
249 private Timer _timer=new Timer();
250
251 public SuspendServlet()
252 {}
253
254
255 protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
256 {
257 final Continuation continuation = ContinuationSupport.getContinuation(request);
258
259 response.addHeader("history",continuation.getClass().toString());
260
261 int read_before=0;
262 long sleep_for=-1;
263 long suspend_for=-1;
264 long suspend2_for=-1;
265 long resume_after=-1;
266 long resume2_after=-1;
267 long complete_after=-1;
268 long complete2_after=-1;
269 boolean undispatch=false;
270
271 if (request.getParameter("read")!=null)
272 read_before=Integer.parseInt(request.getParameter("read"));
273 if (request.getParameter("sleep")!=null)
274 sleep_for=Integer.parseInt(request.getParameter("sleep"));
275 if (request.getParameter("suspend")!=null)
276 suspend_for=Integer.parseInt(request.getParameter("suspend"));
277 if (request.getParameter("suspend2")!=null)
278 suspend2_for=Integer.parseInt(request.getParameter("suspend2"));
279 if (request.getParameter("resume")!=null)
280 resume_after=Integer.parseInt(request.getParameter("resume"));
281 if (request.getParameter("resume2")!=null)
282 resume2_after=Integer.parseInt(request.getParameter("resume2"));
283 if (request.getParameter("complete")!=null)
284 complete_after=Integer.parseInt(request.getParameter("complete"));
285 if (request.getParameter("complete2")!=null)
286 complete2_after=Integer.parseInt(request.getParameter("complete2"));
287 if (request.getParameter("undispatch")!=null)
288 undispatch=Boolean.parseBoolean(request.getParameter("undispatch"));
289
290 if (continuation.isInitial())
291 {
292 if (read_before>0)
293 {
294 byte[] buf=new byte[read_before];
295 request.getInputStream().read(buf);
296 }
297 else if (read_before<0)
298 {
299 InputStream in = request.getInputStream();
300 int b=in.read();
301 while(b!=-1)
302 b=in.read();
303 }
304
305 if (suspend_for>=0)
306 {
307 if (suspend_for>0)
308 continuation.setTimeout(suspend_for);
309 continuation.addContinuationListener(__listener);
310 ((HttpServletResponse)response).addHeader("history","suspend");
311 continuation.suspend(response);
312
313 if (complete_after>0)
314 {
315 TimerTask complete = new TimerTask()
316 {
317 @Override
318 public void run()
319 {
320 try
321 {
322 response.setStatus(200);
323 response.getOutputStream().println("COMPLETED\n");
324 continuation.complete();
325 }
326 catch(Exception e)
327 {
328 e.printStackTrace();
329 }
330 }
331 };
332 synchronized (_timer)
333 {
334 _timer.schedule(complete,complete_after);
335 }
336 }
337 else if (complete_after==0)
338 {
339 response.setStatus(200);
340 response.getOutputStream().println("COMPLETED\n");
341 continuation.complete();
342 }
343 else if (resume_after>0)
344 {
345 TimerTask resume = new TimerTask()
346 {
347 @Override
348 public void run()
349 {
350 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","resume");
351 continuation.resume();
352 }
353 };
354 synchronized (_timer)
355 {
356 _timer.schedule(resume,resume_after);
357 }
358 }
359 else if (resume_after==0)
360 {
361 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","resume");
362 continuation.resume();
363 }
364
365 if (undispatch)
366 continuation.undispatch();
367 }
368 else if (sleep_for>=0)
369 {
370 try
371 {
372 Thread.sleep(sleep_for);
373 }
374 catch (InterruptedException e)
375 {
376 e.printStackTrace();
377 }
378 response.setStatus(200);
379 response.getOutputStream().println("SLEPT\n");
380 }
381 else
382 {
383 response.setStatus(200);
384 response.getOutputStream().println("NORMAL\n");
385 }
386 }
387 else if (suspend2_for>=0 && request.getAttribute("2nd")==null)
388 {
389 request.setAttribute("2nd","cycle");
390
391 if (suspend2_for>0)
392 continuation.setTimeout(suspend2_for);
393
394 ((HttpServletResponse)response).addHeader("history","suspend");
395 continuation.suspend(response);
396
397 if (complete2_after>0)
398 {
399 TimerTask complete = new TimerTask()
400 {
401 @Override
402 public void run()
403 {
404 try
405 {
406 response.setStatus(200);
407 response.getOutputStream().println("COMPLETED\n");
408 continuation.complete();
409 }
410 catch(Exception e)
411 {
412 e.printStackTrace();
413 }
414 }
415 };
416 synchronized (_timer)
417 {
418 _timer.schedule(complete,complete2_after);
419 }
420 }
421 else if (complete2_after==0)
422 {
423 response.setStatus(200);
424 response.getOutputStream().println("COMPLETED\n");
425 continuation.complete();
426 }
427 else if (resume2_after>0)
428 {
429 TimerTask resume = new TimerTask()
430 {
431 @Override
432 public void run()
433 {
434 ((HttpServletResponse)response).addHeader("history","resume");
435 continuation.resume();
436 }
437 };
438 synchronized (_timer)
439 {
440 _timer.schedule(resume,resume2_after);
441 }
442 }
443 else if (resume2_after==0)
444 {
445 ((HttpServletResponse)response).addHeader("history","resume");
446 continuation.resume();
447 }
448 if (undispatch)
449 continuation.undispatch();
450 return;
451 }
452 else if (continuation.isExpired())
453 {
454 response.setStatus(200);
455 response.getOutputStream().println("TIMEOUT\n");
456 }
457 else if (continuation.isResumed())
458 {
459 response.setStatus(200);
460 response.getOutputStream().println("RESUMED\n");
461 }
462 else
463 {
464 response.setStatus(200);
465 response.getOutputStream().println("unknown???\n");
466 }
467 }
468 }
469
470
471 private static ContinuationListener __listener = new ContinuationListener()
472 {
473 public void onComplete(Continuation continuation)
474 {
475 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","onComplete");
476 }
477
478 public void onTimeout(Continuation continuation)
479 {
480 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","onTimeout");
481 continuation.resume();
482 }
483
484 };
485 }