View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util;
20  
21  import java.util.concurrent.atomic.AtomicReference;
22  
23  
24  /* ------------------------------------------------------------ */
25  /** Iterating Callback.
26   * <p>This specialised callback is used when breaking up an
27   * asynchronous task into smaller asynchronous tasks.  A typical pattern
28   * is that a successful callback is used to schedule the next sub task, but 
29   * if that task completes quickly and uses the calling thread to callback
30   * the success notification, this can result in a growing stack depth.
31   * </p>
32   * <p>To avoid this issue, this callback uses an AtomicReference to note 
33   * if the success callback has been called during the processing of a 
34   * sub task, and if so then the processing iterates rather than recurses.
35   * </p>
36   * <p>This callback is passed to the asynchronous handling of each sub
37   * task and a call the {@link #succeeded()} on this call back represents
38   * completion of the subtask.  Only once all the subtasks are completed is 
39   * the {@link Callback#succeeded()} method called on the {@link Callback} instance
40   * passed the the {@link #IteratingCallback(Callback)} constructor.</p>
41   *  
42   */
43  public abstract class IteratingCallback implements Callback
44  {
45      private enum State { WAITING, ITERATING, SUCCEEDED, FAILED };
46      private final AtomicReference<State> _state = new AtomicReference<>(State.WAITING);
47      private final Callback _callback;
48      
49      public IteratingCallback(Callback callback)
50      {
51          _callback=callback;
52      }
53      
54      /* ------------------------------------------------------------ */
55      /**
56       * Process a subtask.
57       * <p>Called by {@link #iterate()} to process a sub task of the overall task
58       * <p>
59       * @return True if the total task is complete. If false is returned
60       * then this Callback must be scheduled to receive either a call to 
61       * {@link #succeeded()} or {@link #failed(Throwable)}.
62       * @throws Exception
63       */
64      abstract protected boolean process() throws Exception;
65      
66      /* ------------------------------------------------------------ */
67      /** This method is called initially to start processing and 
68       * is then called by subsequent sub task success to continue
69       * processing.
70       */
71      public void iterate()
72      {
73          try
74          {
75              // Keep iterating as long as succeeded() is called during process()
76              // If we are in WAITING state, either this is the first iteration or
77              // succeeded()/failed() were called already.
78              while(_state.compareAndSet(State.WAITING,State.ITERATING))
79              {
80                  // Make some progress by calling process()
81                  if (process())
82                  {
83                      // A true return indicates we are finished a no further callbacks 
84                      // are scheduled. So we must still be ITERATING.
85                      if (_state.compareAndSet(State.ITERATING,State.SUCCEEDED))
86                          _callback.succeeded();
87                      else
88                          throw new IllegalStateException("Already "+_state.get());
89                      return;
90                  }
91                  // else a callback has been scheduled.  If it has not happened yet,
92                  // we will still be ITERATING
93                  else if (_state.compareAndSet(State.ITERATING,State.WAITING))
94                      // no callback yet, so break the loop and wait for it
95                      break;
96                  
97                  // The callback must have happened and we are either WAITING already or FAILED
98                  // the loop test will work out which
99              }
100         }
101         catch(Exception e)
102         {
103             failed(e);
104         }
105     }
106     
107     @Override
108     public void succeeded()
109     {
110         // Try a short cut for the fast method.  If we are still iterating
111         if (_state.compareAndSet(State.ITERATING,State.WAITING))
112             // then next loop will continue processing, so nothing to do here
113             return;
114 
115         // OK do it properly
116         loop: while(true)
117         {
118             switch(_state.get())
119             {
120                 case ITERATING:
121                     if (_state.compareAndSet(State.ITERATING,State.WAITING))
122                         break loop;
123                     continue;
124                     
125                 case WAITING:
126                     // we are really waiting, so use this callback thread to iterate some more 
127                     iterate();
128                     break loop;
129                     
130                 default:
131                     throw new IllegalStateException("Already "+_state.get());
132             }
133         }
134     }
135 
136     @Override
137     public void failed(Throwable x)
138     {
139         loop: while(true)
140         {
141             switch(_state.get())
142             {
143                 case ITERATING:
144                     if (_state.compareAndSet(State.ITERATING,State.FAILED))
145                         break loop;
146                     continue;
147                     
148                 case WAITING:
149                     if (_state.compareAndSet(State.WAITING,State.FAILED))
150                         break loop;
151                     continue;
152                     
153                 default:
154                     throw new IllegalStateException("Already "+_state.get());
155             }
156         }
157     
158         _callback.failed(x);
159     }
160 
161 }