View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util;
20  
21  import java.util.concurrent.atomic.AtomicInteger;
22  
23  /**
24   * <p>A callback wrapper that succeeds the wrapped callback when the count is
25   * reached, or on first failure.</p>
26   * <p>This callback is particularly useful when an async operation is split
27   * into multiple parts, for example when an original byte buffer that needs
28   * to be written, along with a callback, is split into multiple byte buffers,
29   * since it allows the original callback to be wrapped and notified only when
30   * the last part has been processed.</p>
31   * <p>Example:</p>
32   * <pre>
33   * public void process(EndPoint endPoint, ByteBuffer buffer, Callback callback)
34   * {
35   *     ByteBuffer[] buffers = split(buffer);
36   *     CountCallback countCallback = new CountCallback(callback, buffers.length);
37   *     endPoint.write(countCallback, buffers);
38   * }
39   * </pre>
40   */
41  public class CountingCallback implements Callback
42  {
43      private final Callback callback;
44      private final AtomicInteger count;
45  
46      public CountingCallback(Callback callback, int count)
47      {
48          this.callback = callback;
49          this.count = new AtomicInteger(count);
50      }
51  
52      @Override
53      public void succeeded()
54      {
55          // Forward success on the last success.
56          while (true)
57          {
58              int current = count.get();
59  
60              // Already completed ?
61              if (current == 0)
62                  return;
63  
64              if (count.compareAndSet(current, current - 1))
65              {
66                  if (current == 1)
67                      callback.succeeded();
68                  return;
69              }
70          }
71      }
72  
73      @Override
74      public void failed(Throwable failure)
75      {
76          // Forward failure on the first failure.
77          while (true)
78          {
79              int current = count.get();
80  
81              // Already completed ?
82              if (current == 0)
83                  return;
84  
85              if (count.compareAndSet(current, 0))
86              {
87                  callback.failed(failure);
88                  return;
89              }
90          }
91      }
92  
93      @Override
94      public String toString()
95      {
96          return String.format("%s@%x", getClass().getSimpleName(), hashCode());
97      }
98  }