View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util;
20  
21  import java.nio.channels.ClosedChannelException;
22  
23  import org.eclipse.jetty.util.thread.Locker;
24  
25  /**
26   * This specialized callback implements a pattern that allows
27   * a large job to be broken into smaller tasks using iteration
28   * rather than recursion.
29   * <p>
30   * A typical example is the write of a large content to a socket,
31   * divided in chunks. Chunk C1 is written by thread T1, which
32   * also invokes the callback, which writes chunk C2, which invokes
33   * the callback again, which writes chunk C3, and so forth.
34   * </p>
35   * <p>
36   * The problem with the example is that if the callback thread
37   * is the same that performs the I/O operation, then the process
38   * is recursive and may result in a stack overflow.
39   * To avoid the stack overflow, a thread dispatch must be performed,
40   * causing context switching and cache misses, affecting performance.
41   * </p>
42   * <p>
43   * To avoid this issue, this callback uses an AtomicReference to
44   * record whether success callback has been called during the processing
45   * of a sub task, and if so then the processing iterates rather than
46   * recurring.
47   * </p>
48   * <p>
49   * Subclasses must implement method {@link #process()} where the sub
50   * task is executed and a suitable {@link IteratingCallback.Action} is
51   * returned to this callback to indicate the overall progress of the job.
52   * This callback is passed to the asynchronous execution of each sub
53   * task and a call the {@link #succeeded()} on this callback represents
54   * the completion of the sub task.
55   * </p>
56   */
57  public abstract class IteratingCallback implements Callback
58  {
59      /**
60       * The internal states of this callback
61       */
62      private enum State
63      {
64          /**
65           * This callback is IDLE, ready to iterate.
66           */
67          IDLE,
68  
69          /**
70           * This callback is iterating calls to {@link #process()} and is dealing with
71           * the returns.  To get into processing state, it much of held the lock state
72           * and set iterating to true.
73           */
74          PROCESSING,
75  
76          /**
77           * Waiting for a schedule callback
78           */
79          PENDING,
80  
81          /**
82           * Called by a schedule callback
83           */
84          CALLED,
85  
86          /**
87           * The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return
88           * from {@link IteratingCallback#process()}
89           */
90          SUCCEEDED,
91  
92          /**
93           * The overall job has failed as indicated by a call to {@link IteratingCallback#failed(Throwable)}
94           */
95          FAILED,
96  
97          /**
98           * This callback has been closed and cannot be reset.
99           */
100         CLOSED
101     }
102 
103     /**
104      * The indication of the overall progress of the overall job that
105      * implementations of {@link #process()} must return.
106      */
107     protected enum Action
108     {
109         /**
110          * Indicates that {@link #process()} has no more work to do,
111          * but the overall job is not completed yet, probably waiting
112          * for additional events to trigger more work.
113          */
114         IDLE,
115         /**
116          * Indicates that {@link #process()} is executing asynchronously
117          * a sub task, where the execution has started but the callback
118          * may have not yet been invoked.
119          */
120         SCHEDULED,
121 
122         /**
123          * Indicates that {@link #process()} has completed the overall job.
124          */
125         SUCCEEDED
126     }
127 
128     private Locker _locker = new Locker();
129     private State _state;
130     private boolean _iterate;
131 
132 
133     protected IteratingCallback()
134     {
135         _state = State.IDLE;
136     }
137 
138     protected IteratingCallback(boolean needReset)
139     {
140         _state = needReset ? State.SUCCEEDED : State.IDLE;
141     }
142 
143     /**
144      * Method called by {@link #iterate()} to process the sub task.
145      * <p>
146      * Implementations must start the asynchronous execution of the sub task
147      * (if any) and return an appropriate action:
148      * </p>
149      * <ul>
150      * <li>{@link Action#IDLE} when no sub tasks are available for execution
151      * but the overall job is not completed yet</li>
152      * <li>{@link Action#SCHEDULED} when the sub task asynchronous execution
153      * has been started</li>
154      * <li>{@link Action#SUCCEEDED} when the overall job is completed</li>
155      * </ul>
156      *
157      * @return the appropriate Action
158      *
159      * @throws Exception if the sub task processing throws
160      */
161     protected abstract Action process() throws Exception;
162 
163     /**
164      * Invoked when the overall task has completed successfully.
165      *
166      * @see #onCompleteFailure(Throwable)
167      */
168     protected void onCompleteSuccess()
169     {
170     }
171 
172     /**
173      * Invoked when the overall task has completed with a failure.
174      * @param cause the throwable to indicate cause of failure
175      *
176      * @see #onCompleteSuccess()
177      */
178     protected void onCompleteFailure(Throwable cause)
179     {
180     }
181 
182     /**
183      * This method must be invoked by applications to start the processing
184      * of sub tasks.  It can be called at any time by any thread, and it's
185      * contract is that when called, then the {@link #process()} method will
186      * be called during or soon after, either by the calling thread or by
187      * another thread.
188      */
189     public void iterate()
190     {
191         boolean process=false;
192 
193         loop: while (true)
194         {
195             try (Locker.Lock lock = _locker.lock())
196             {
197                 switch (_state)
198                 {
199                     case PENDING:
200                     case CALLED:
201                         // process will be called when callback is handled
202                         break loop;
203 
204                     case IDLE:
205                         _state=State.PROCESSING;
206                         process=true;
207                         break loop;
208 
209                     case PROCESSING:
210                         _iterate=true;
211                         break loop;
212 
213                     case FAILED:
214                     case SUCCEEDED:
215                         break loop;
216 
217                     case CLOSED:
218                     default:
219                         throw new IllegalStateException(toString());
220                 }
221             }
222         }
223         if (process)
224             processing();
225     }
226 
227     private void processing()
228     {
229         // This should only ever be called when in processing state, however a failed or close call
230         // may happen concurrently, so state is not assumed.
231 
232         boolean on_complete_success=false;
233 
234         // While we are processing
235         processing: while (true)
236         {
237             // Call process to get the action that we have to take.
238             Action action;
239             try
240             {
241                 action = process();
242             }
243             catch (Throwable x)
244             {
245                 failed(x);
246                 break processing;
247             }
248 
249             // acted on the action we have just received
250             try(Locker.Lock lock = _locker.lock())
251             {
252                 switch (_state)
253                 {
254                     case PROCESSING:
255                     {
256                         switch (action)
257                         {
258                             case IDLE:
259                             {
260                                 // Has iterate been called while we were processing?
261                                 if (_iterate)
262                                 {
263                                     // yes, so skip idle and keep processing
264                                     _iterate=false;
265                                     _state=State.PROCESSING;
266                                     continue processing;
267                                 }
268 
269                                 // No, so we can go idle
270                                 _state=State.IDLE;
271                                 break processing;
272                             }
273 
274                             case SCHEDULED:
275                             {
276                                 // we won the race against the callback, so the callback has to process and we can break processing
277                                 _state=State.PENDING;
278                                 break processing;
279                             }
280 
281                             case SUCCEEDED:
282                             {
283                                 // we lost the race against the callback,
284                                 _iterate=false;
285                                 _state=State.SUCCEEDED;
286                                 on_complete_success=true;
287                                 break processing;
288                             }
289 
290                             default:
291                                 throw new IllegalStateException(String.format("%s[action=%s]", this, action));
292                         }
293                     }
294 
295                     case CALLED:
296                     {
297                         switch (action)
298                         {
299                             case SCHEDULED:
300                             {
301                                 // we lost the race, so we have to keep processing
302                                 _state=State.PROCESSING;
303                                 continue processing;
304                             }
305 
306                             default:
307                                 throw new IllegalStateException(String.format("%s[action=%s]", this, action));
308                         }
309                     }
310 
311                     case SUCCEEDED:
312                     case FAILED:
313                     case CLOSED:
314                         break processing;
315 
316                     case IDLE:
317                     case PENDING:
318                     default:
319                         throw new IllegalStateException(String.format("%s[action=%s]", this, action));
320                 }
321             }
322         }
323 
324         if (on_complete_success)
325             onCompleteSuccess();
326     }
327 
328     /**
329      * Invoked when the sub task succeeds.
330      * Subclasses that override this method must always remember to call
331      * {@code super.succeeded()}.
332      */
333     @Override
334     public void succeeded()
335     {
336         boolean process=false;
337         try(Locker.Lock lock = _locker.lock())
338         {
339             switch (_state)
340             {
341                 case PROCESSING:
342                 {
343                     _state=State.CALLED;
344                     break;
345                 }
346                 case PENDING:
347                 {
348                     _state=State.PROCESSING;
349                     process=true;
350                     break;
351                 }
352                 case CLOSED:
353                 case FAILED:
354                 {
355                     // Too late!
356                     break;
357                 }
358                 default:
359                 {
360                     throw new IllegalStateException(toString());
361                 }
362             }
363         }
364         if (process)
365             processing();
366     }
367 
368     /**
369      * Invoked when the sub task fails.
370      * Subclasses that override this method must always remember to call
371      * {@code super.failed(Throwable)}.
372      */
373     @Override
374     public void failed(Throwable x)
375     {
376         boolean failure=false;
377         try(Locker.Lock lock = _locker.lock())
378         {
379             switch (_state)
380             {
381                 case SUCCEEDED:
382                 case FAILED:
383                 case IDLE:
384                 case CLOSED:
385                 case CALLED:
386                     // too late!.
387                     break;
388 
389                 case PENDING:
390                 case PROCESSING:
391                 {
392                     _state=State.FAILED;
393                     failure=true;
394                     break;
395                 }
396                 default:
397                     throw new IllegalStateException(toString());
398             }
399         }
400         if (failure)
401             onCompleteFailure(x);
402     }
403 
404     public void close()
405     {
406         boolean failure=false;
407         try(Locker.Lock lock = _locker.lock())
408         {
409             switch (_state)
410             {
411                 case IDLE:
412                 case SUCCEEDED:
413                 case FAILED:
414                     _state=State.CLOSED;
415                     break;
416 
417                 case CLOSED:
418                     break;
419 
420                 default:
421                     _state=State.CLOSED;
422                     failure=true;
423             }
424         }
425 
426         if(failure)
427             onCompleteFailure(new ClosedChannelException());
428     }
429 
430     /*
431      * only for testing
432      * @return whether this callback is idle and {@link #iterate()} needs to be called
433      */
434     boolean isIdle()
435     {
436         try(Locker.Lock lock = _locker.lock())
437         {
438             return _state == State.IDLE;
439         }
440     }
441 
442     public boolean isClosed()
443     {
444         try(Locker.Lock lock = _locker.lock())
445         {
446             return _state == State.CLOSED;
447         }
448     }
449 
450     /**
451      * @return whether this callback has failed
452      */
453     public boolean isFailed()
454     {
455         try(Locker.Lock lock = _locker.lock())
456         {
457             return _state == State.FAILED;
458         }
459     }
460 
461     /**
462      * @return whether this callback has succeeded
463      */
464     public boolean isSucceeded()
465     {
466         try(Locker.Lock lock = _locker.lock())
467         {
468             return _state == State.SUCCEEDED;
469         }
470     }
471 
472     /**
473      * Resets this callback.
474      * <p>
475      * A callback can only be reset to IDLE from the
476      * SUCCEEDED or FAILED states or if it is already IDLE.
477      * </p>
478      *
479      * @return true if the reset was successful
480      */
481     public boolean reset()
482     {
483         try(Locker.Lock lock = _locker.lock())
484         {
485             switch(_state)
486             {
487                 case IDLE:
488                     return true;
489 
490                 case SUCCEEDED:
491                 case FAILED:
492                     _iterate=false;
493                     _state=State.IDLE;
494                     return true;
495 
496                 default:
497                     return false;
498             }
499         }
500     }
501 
502     @Override
503     public String toString()
504     {
505         return String.format("%s[%s]", super.toString(), _state);
506     }
507 }