View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util;
20  
21  import java.util.concurrent.atomic.AtomicInteger;
22  
23  /**
24   * <p>A callback wrapper that succeeds the wrapped callback when the count is
25   * reached, or on first failure.</p>
26   * <p>This callback is particularly useful when an async operation is split
27   * into multiple parts, for example when an original byte buffer that needs
28   * to be written, along with a callback, is split into multiple byte buffers,
29   * since it allows the original callback to be wrapped and notified only when
30   * the last part has been processed.</p>
31   * <p>Example:</p>
32   * <pre>
33   * public void process(EndPoint endPoint, ByteBuffer buffer, Callback callback)
34   * {
35   *     ByteBuffer[] buffers = split(buffer);
36   *     CountCallback countCallback = new CountCallback(callback, buffers.length);
37   *     endPoint.write(countCallback, buffers);
38   * }
39   * </pre>
40   */
41  public class CountingCallback extends Callback.Nested
42  {
43      private final AtomicInteger count;
44  
45      public CountingCallback(Callback callback, int count)
46      {
47          super(callback);
48          this.count = new AtomicInteger(count);
49      }
50  
51      @Override
52      public void succeeded()
53      {
54          // Forward success on the last success.
55          while (true)
56          {
57              int current = count.get();
58  
59              // Already completed ?
60              if (current == 0)
61                  return;
62  
63              if (count.compareAndSet(current, current - 1))
64              {
65                  if (current == 1)
66                      super.succeeded();
67                  return;
68              }
69          }
70      }
71  
72      @Override
73      public void failed(Throwable failure)
74      {
75          // Forward failure on the first failure.
76          while (true)
77          {
78              int current = count.get();
79  
80              // Already completed ?
81              if (current == 0)
82                  return;
83  
84              if (count.compareAndSet(current, 0))
85              {
86                  super.failed(failure);
87                  return;
88              }
89          }
90      }
91  
92      @Override
93      public String toString()
94      {
95          return String.format("%s@%x", getClass().getSimpleName(), hashCode());
96      }
97  }