View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.continuation.test;
15  
16  import java.io.IOException;
17  import java.io.InputStream;
18  import java.net.Socket;
19  import java.util.Timer;
20  import java.util.TimerTask;
21  
22  import javax.servlet.ServletException;
23  import javax.servlet.http.HttpServlet;
24  import javax.servlet.http.HttpServletRequest;
25  import javax.servlet.http.HttpServletResponse;
26  
27  import junit.framework.TestCase;
28  
29  import org.eclipse.jetty.continuation.Continuation;
30  import org.eclipse.jetty.continuation.ContinuationListener;
31  import org.eclipse.jetty.continuation.ContinuationSupport;
32  
33  
34  
35  public abstract class ContinuationBase extends TestCase
36  {
37      protected SuspendServlet _servlet=new SuspendServlet();
38      protected int _port;
39      
40      protected void doNormal(String type) throws Exception
41      {
42          String response=process(null,null);
43          assertContains(type,response);
44          assertContains("NORMAL",response);
45          assertNotContains("history: onTimeout",response);
46          assertNotContains("history: onComplete",response);
47      }
48  
49      protected void doSleep() throws Exception
50      {
51          String response=process("sleep=200",null);
52          assertContains("SLEPT",response);
53          assertNotContains("history: onTimeout",response);
54          assertNotContains("history: onComplete",response);
55      }
56  
57      protected void doSuspend() throws Exception
58      {
59          String response=process("suspend=200",null);
60          assertContains("TIMEOUT",response);
61          assertContains("history: onTimeout",response);
62          assertContains("history: onComplete",response);
63      }
64  
65      protected void doSuspendWaitResume() throws Exception
66      {
67          String response=process("suspend=200&resume=10",null);
68          assertContains("RESUMED",response);
69          assertNotContains("history: onTimeout",response);
70          assertContains("history: onComplete",response);
71      }
72  
73      protected void doSuspendResume() throws Exception
74      {
75          String response=process("suspend=200&resume=0",null);
76          assertContains("RESUMED",response);
77          assertNotContains("history: onTimeout",response);
78          assertContains("history: onComplete",response);
79      }
80  
81      protected void doSuspendWaitComplete() throws Exception
82      {
83          String response=process("suspend=200&complete=50",null);
84          assertContains("COMPLETED",response);
85          assertContains("history: initial",response);
86          assertNotContains("history: onTimeout",response);
87          assertContains("history: onComplete",response);
88          assertNotContains("history: !initial",response);
89      }
90  
91      protected void doSuspendComplete() throws Exception
92      {
93          String response=process("suspend=200&complete=0",null);
94          assertContains("COMPLETED",response);
95          assertContains("history: initial",response);
96          assertNotContains("history: onTimeout",response);
97          assertContains("history: onComplete",response);
98          assertNotContains("history: !initial",response);
99      }
100 
101     protected void doSuspendWaitResumeSuspendWaitResume() throws Exception
102     {
103         String response=process("suspend=1000&resume=10&suspend2=1000&resume2=10",null);
104         assertEquals(2,count(response,"history: suspend"));
105         assertEquals(2,count(response,"history: resume"));
106         assertEquals(0,count(response,"history: onTimeout"));
107         assertEquals(1,count(response,"history: onComplete"));
108         assertContains("RESUMED",response);
109     }
110     
111     protected void doSuspendWaitResumeSuspendComplete() throws Exception
112     {
113         String response=process("suspend=1000&resume=10&suspend2=1000&complete2=10",null);
114         assertEquals(2,count(response,"history: suspend"));
115         assertEquals(1,count(response,"history: resume"));
116         assertEquals(0,count(response,"history: onTimeout"));
117         assertEquals(1,count(response,"history: onComplete"));
118         assertContains("COMPLETED",response);
119     }
120 
121     protected void doSuspendWaitResumeSuspend() throws Exception
122     {
123         String response=process("suspend=1000&resume=10&suspend2=10",null);
124         assertEquals(2,count(response,"history: suspend"));
125         assertEquals(1,count(response,"history: resume"));
126         assertEquals(1,count(response,"history: onTimeout"));
127         assertEquals(1,count(response,"history: onComplete"));
128         assertContains("TIMEOUT",response);
129     }
130 
131     protected void doSuspendTimeoutSuspendResume() throws Exception
132     {
133         String response=process("suspend=10&suspend2=1000&resume2=10",null);
134         assertEquals(2,count(response,"history: suspend"));
135         assertEquals(1,count(response,"history: resume"));
136         assertEquals(1,count(response,"history: onTimeout"));
137         assertEquals(1,count(response,"history: onComplete"));
138         assertContains("RESUMED",response);
139     }
140 
141     protected void doSuspendTimeoutSuspendComplete() throws Exception
142     {
143         String response=process("suspend=10&suspend2=1000&complete2=10",null);
144         assertEquals(2,count(response,"history: suspend"));
145         assertEquals(0,count(response,"history: resume"));
146         assertEquals(1,count(response,"history: onTimeout"));
147         assertEquals(1,count(response,"history: onComplete"));
148         assertContains("COMPLETED",response);
149     }
150 
151     protected void doSuspendTimeoutSuspend() throws Exception
152     {
153         String response=process("suspend=10&suspend2=10",null);
154         assertEquals(2,count(response,"history: suspend"));
155         assertEquals(0,count(response,"history: resume"));
156         assertEquals(2,count(response,"history: onTimeout"));
157         assertEquals(1,count(response,"history: onComplete"));
158         assertContains("TIMEOUT",response);
159     }
160 
161     protected void doSuspendThrowResume() throws Exception
162     {
163         String response=process("suspend=200&resume=10&undispatch=true",null);
164         assertContains("RESUMED",response);
165         assertNotContains("history: onTimeout",response);
166         assertContains("history: onComplete",response);
167     }
168 
169     protected void doSuspendResumeThrow() throws Exception
170     {
171         String response=process("suspend=200&resume=0&undispatch=true",null);
172         assertContains("RESUMED",response);
173         assertNotContains("history: onTimeout",response);
174         assertContains("history: onComplete",response);
175     }
176 
177     protected void doSuspendThrowComplete() throws Exception
178     {
179         String response=process("suspend=200&complete=10&undispatch=true",null);
180         assertContains("COMPLETED",response);
181         assertNotContains("history: onTimeout",response);
182         assertContains("history: onComplete",response);
183     }
184 
185     protected void doSuspendCompleteThrow() throws Exception
186     {
187         String response=process("suspend=200&complete=0&undispatch=true",null);
188         assertContains("COMPLETED",response);
189         assertNotContains("history: onTimeout",response);
190         assertContains("history: onComplete",response);
191     }
192 
193     
194     private int count(String responses,String substring)
195     {
196         int count=0;
197         int i=responses.indexOf(substring);
198         while (i>=0)
199         {
200             count++;
201             i=responses.indexOf(substring,i+substring.length());
202         }
203         
204         return count;
205     }
206     
207     protected void assertContains(String content,String response)
208     {
209         assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
210         if (response.indexOf(content,15)<0)
211         {
212             System.err.println("'"+content+"' NOT IN:\n"+response+"\n--");
213             assertTrue(false);
214         }
215     }
216     
217     protected void assertNotContains(String content,String response)
218     {
219         assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
220         if (response.indexOf(content,15)>=0)
221         {
222             System.err.println("'"+content+"' IS IN:\n"+response+"'\n--");
223             assertTrue(false);
224         }
225     }
226     
227     public synchronized String process(String query,String content) throws Exception
228     {
229         String request = "GET /";
230         
231         if (query!=null)
232             request+="?"+query;
233         request+=" HTTP/1.1\r\n"+
234         "Host: localhost\r\n"+
235         "Connection: close\r\n";
236         if (content==null)
237             request+="\r\n";
238         else
239         {
240             request+="Content-Length: "+content.length()+"\r\n";
241             request+="\r\n" + content;
242         }
243         
244         int port=_port;
245         String response=null;
246         try
247         {
248             Socket socket = new Socket("localhost",port);
249             socket.setSoTimeout(10000);
250             socket.getOutputStream().write(request.getBytes("UTF-8"));
251 
252             response = toString(socket.getInputStream());
253         }
254         catch(Exception e)
255         {
256             System.err.println("failed on port "+port);
257             e.printStackTrace();
258             throw e;
259         }
260         return response;
261     }
262     
263     
264     protected abstract String toString(InputStream in) throws IOException;
265     
266     
267     private static class SuspendServlet extends HttpServlet
268     {
269         private Timer _timer=new Timer();
270         
271         public SuspendServlet()
272         {}
273         
274         /* ------------------------------------------------------------ */
275         protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
276         {
277             final Continuation continuation = ContinuationSupport.getContinuation(request);
278 
279             response.addHeader("history",continuation.getClass().toString());
280             
281             int read_before=0;
282             long sleep_for=-1;
283             long suspend_for=-1;
284             long suspend2_for=-1;
285             long resume_after=-1;
286             long resume2_after=-1;
287             long complete_after=-1;
288             long complete2_after=-1;
289             boolean undispatch=false;
290             
291             if (request.getParameter("read")!=null)
292                 read_before=Integer.parseInt(request.getParameter("read"));
293             if (request.getParameter("sleep")!=null)
294                 sleep_for=Integer.parseInt(request.getParameter("sleep"));
295             if (request.getParameter("suspend")!=null)
296                 suspend_for=Integer.parseInt(request.getParameter("suspend"));
297             if (request.getParameter("suspend2")!=null)
298                 suspend2_for=Integer.parseInt(request.getParameter("suspend2"));
299             if (request.getParameter("resume")!=null)
300                 resume_after=Integer.parseInt(request.getParameter("resume"));
301             if (request.getParameter("resume2")!=null)
302                 resume2_after=Integer.parseInt(request.getParameter("resume2"));
303             if (request.getParameter("complete")!=null)
304                 complete_after=Integer.parseInt(request.getParameter("complete"));
305             if (request.getParameter("complete2")!=null)
306                 complete2_after=Integer.parseInt(request.getParameter("complete2"));
307             if (request.getParameter("undispatch")!=null)
308                 undispatch=Boolean.parseBoolean(request.getParameter("undispatch"));
309             
310             if (continuation.isInitial())
311             {
312                 ((HttpServletResponse)response).addHeader("history","initial");
313                 if (read_before>0)
314                 {
315                     byte[] buf=new byte[read_before];
316                     request.getInputStream().read(buf);
317                 }
318                 else if (read_before<0)
319                 {
320                     InputStream in = request.getInputStream();
321                     int b=in.read();
322                     while(b!=-1)
323                         b=in.read();
324                 }
325 
326                 if (suspend_for>=0)
327                 {
328                     if (suspend_for>0)
329                         continuation.setTimeout(suspend_for);
330                     continuation.addContinuationListener(__listener);
331                     ((HttpServletResponse)response).addHeader("history","suspend");
332                     continuation.suspend(response);
333                     
334                     if (complete_after>0)
335                     {
336                         TimerTask complete = new TimerTask()
337                         {
338                             @Override
339                             public void run()
340                             {
341                                 try
342                                 {
343                                     response.setStatus(200);
344                                     response.getOutputStream().println("COMPLETED\n");
345                                     continuation.complete();
346                                 }
347                                 catch(Exception e)
348                                 {
349                                     e.printStackTrace();
350                                 }
351                             }
352                         };
353                         synchronized (_timer)
354                         {
355                             _timer.schedule(complete,complete_after);
356                         }
357                     }
358                     else if (complete_after==0)
359                     {
360                         response.setStatus(200);
361                         response.getOutputStream().println("COMPLETED\n");
362                         continuation.complete();
363                     }
364                     else if (resume_after>0)
365                     {
366                         TimerTask resume = new TimerTask()
367                         {
368                             @Override
369                             public void run()
370                             {
371                                 ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","resume");
372                                 continuation.resume();
373                             }
374                         };
375                         synchronized (_timer)
376                         {
377                             _timer.schedule(resume,resume_after);
378                         }
379                     }
380                     else if (resume_after==0)
381                     {
382                         ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","resume");
383                         continuation.resume();
384                     }
385                     
386                     if (undispatch)
387                         continuation.undispatch();
388                 }
389                 else if (sleep_for>=0)
390                 {
391                     try
392                     {
393                         Thread.sleep(sleep_for);
394                     }
395                     catch (InterruptedException e)
396                     {
397                         e.printStackTrace();
398                     }
399                     response.setStatus(200);
400                     response.getOutputStream().println("SLEPT\n");
401                 }
402                 else
403                 {
404                     response.setStatus(200);
405                     response.getOutputStream().println("NORMAL\n");
406                 }
407             }
408             else    
409             {
410                 ((HttpServletResponse)response).addHeader("history","!initial");
411                 if (suspend2_for>=0 && request.getAttribute("2nd")==null)
412                 {
413                     request.setAttribute("2nd","cycle");
414 
415                     if (suspend2_for>0)
416                         continuation.setTimeout(suspend2_for);
417                     // continuation.addContinuationListener(__listener);
418                     ((HttpServletResponse)response).addHeader("history","suspend");
419                     continuation.suspend(response);
420 
421                     if (complete2_after>0)
422                     {
423                         TimerTask complete = new TimerTask()
424                         {
425                             @Override
426                             public void run()
427                             {
428                                 try
429                                 {
430                                     response.setStatus(200);
431                                     response.getOutputStream().println("COMPLETED\n");
432                                     continuation.complete();
433                                 }
434                                 catch(Exception e)
435                                 {
436                                     e.printStackTrace();
437                                 }
438                             }
439                         };
440                         synchronized (_timer)
441                         {
442                             _timer.schedule(complete,complete2_after);
443                         }
444                     }
445                     else if (complete2_after==0)
446                     {
447                         response.setStatus(200);
448                         response.getOutputStream().println("COMPLETED\n");
449                         continuation.complete();
450                     }
451                     else if (resume2_after>0)
452                     {
453                         TimerTask resume = new TimerTask()
454                         {
455                             @Override
456                             public void run()
457                             {
458                                 ((HttpServletResponse)response).addHeader("history","resume");
459                                 continuation.resume();
460                             }
461                         };
462                         synchronized (_timer)
463                         {
464                             _timer.schedule(resume,resume2_after);
465                         }
466                     }
467                     else if (resume2_after==0)
468                     {
469                         ((HttpServletResponse)response).addHeader("history","resume");
470                         continuation.resume();
471                     }
472                     if (undispatch)
473                         continuation.undispatch();
474                     return;
475                 }
476                 else if (continuation.isExpired())
477                 {
478                     response.setStatus(200);
479                     response.getOutputStream().println("TIMEOUT\n");
480                 }
481                 else if (continuation.isResumed())
482                 {
483                     response.setStatus(200);
484                     response.getOutputStream().println("RESUMED\n");
485                 }
486                 else 
487                 {
488                     response.setStatus(200);
489                     response.getOutputStream().println("unknown???\n");
490                 }
491             }
492         }
493     }
494     
495     
496     private static ContinuationListener __listener = new ContinuationListener()
497     {
498         public void onComplete(Continuation continuation)
499         {
500             ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","onComplete");
501         }
502 
503         public void onTimeout(Continuation continuation)
504         {
505             ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","onTimeout");
506             continuation.resume();
507         }
508         
509     };
510 }