View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.concurrent.CancellationException;
25  import java.util.concurrent.locks.Condition;
26  import java.util.concurrent.locks.ReentrantLock;
27  
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  import org.eclipse.jetty.util.thread.NonBlockingThread;
31  
32  
33  /* ------------------------------------------------------------ */
34  /** Provides a reusable BlockingCallback.
35   * A typical usage pattern is:
36   * <pre>
37   * void someBlockingCall(Object... args) throws IOException
38   * {
39   *   try(Blocker blocker=sharedBlockingCallback.acquire())
40   *   {
41   *     someAsyncCall(args,blocker);
42   *     blocker.block();
43   *   }
44   * }
45   * </pre>
46   */
47  public class SharedBlockingCallback
48  {
49      private static final Logger LOG = Log.getLogger(SharedBlockingCallback.class);
50  
51      
52      private static Throwable IDLE = new Throwable()
53      {
54          @Override
55          public String toString()
56          {
57              return "IDLE";
58          }
59      };
60  
61      private static Throwable SUCCEEDED = new Throwable()
62      {
63          @Override
64          public String toString()
65          {
66              return "SUCCEEDED";
67          }
68      };
69  
70      final Blocker _blocker;
71      
72      public SharedBlockingCallback()
73      {
74          this(new Blocker());
75      }
76      
77      protected SharedBlockingCallback(Blocker blocker)
78      {
79          _blocker=blocker;
80      }
81      
82      public Blocker acquire() throws IOException
83      {
84          _blocker._lock.lock();
85          try
86          {
87              while (_blocker._state != IDLE)
88                  _blocker._idle.await();
89              _blocker._state = null;
90          }
91          catch (final InterruptedException e)
92          {
93              throw new InterruptedIOException()
94              {
95                  {
96                      initCause(e);
97                  }
98              };
99          }
100         finally
101         {
102             _blocker._lock.unlock();
103         }
104         return _blocker;
105     }
106 
107     
108     /* ------------------------------------------------------------ */
109     /** A Closeable Callback.
110      * Uses the auto close mechanism to check block has been called OK.
111      */
112     public static class Blocker implements Callback, Closeable
113     {
114         final ReentrantLock _lock = new ReentrantLock();
115         final Condition _idle = _lock.newCondition();
116         final Condition _complete = _lock.newCondition();
117         Throwable _state = IDLE;
118 
119         public Blocker()
120         {
121         }
122 
123         @Override
124         public void succeeded()
125         {
126             _lock.lock();
127             try
128             {
129                 if (_state == null)
130                 {
131                     _state = SUCCEEDED;
132                     _complete.signalAll();
133                 }
134                 else if (_state == IDLE)
135                     throw new IllegalStateException("IDLE");
136             }
137             finally
138             {
139                 _lock.unlock();
140             }
141         }
142 
143         @Override
144         public void failed(Throwable cause)
145         {
146             _lock.lock();
147             try
148             {
149                 if (_state == null)
150                 {
151                     _state = cause;
152                     _complete.signalAll();
153                 }
154                 else if (_state == IDLE)
155                     throw new IllegalStateException("IDLE");
156             }
157             finally
158             {
159                 _lock.unlock();
160             }
161         }
162 
163         /**
164          * Block until the Callback has succeeded or failed and after the return leave in the state to allow reuse. This is useful for code that wants to
165          * repeatable use a FutureCallback to convert an asynchronous API to a blocking API.
166          * 
167          * @throws IOException
168          *             if exception was caught during blocking, or callback was cancelled
169          */
170         public void block() throws IOException
171         {
172             if (NonBlockingThread.isNonBlockingThread())
173                 LOG.warn("Blocking a NonBlockingThread: ",new Throwable());
174             
175             _lock.lock();
176             try
177             {
178                 while (_state == null)
179                     _complete.await();
180 
181                 if (_state == SUCCEEDED)
182                     return;
183                 if (_state == IDLE)
184                     throw new IllegalStateException("IDLE");
185                 if (_state instanceof IOException)
186                     throw (IOException)_state;
187                 if (_state instanceof CancellationException)
188                     throw (CancellationException)_state;
189                 if (_state instanceof RuntimeException)
190                     throw (RuntimeException)_state;
191                 if (_state instanceof Error)
192                     throw (Error)_state;
193                 throw new IOException(_state);
194             }
195             catch (final InterruptedException e)
196             {
197                 throw new InterruptedIOException()
198                 {
199                     {
200                         initCause(e);
201                     }
202                 };
203             }
204             finally
205             {
206                 _lock.unlock();
207             }
208         }
209         
210         /**
211          * Check the Callback has succeeded or failed and after the return leave in the state to allow reuse.
212          * 
213          * @throws IOException
214          *             if exception was caught during blocking, or callback was cancelled
215          */
216         @Override
217         public void close() throws IOException
218         {
219             _lock.lock();
220             try
221             {
222                 if (_state == IDLE)
223                     throw new IllegalStateException("IDLE");
224                 if (_state == null)
225                     LOG.debug("Blocker not complete",new Throwable());
226             }
227             finally
228             {
229                 _state = IDLE;
230                 _idle.signalAll();
231                 _lock.unlock();
232             }
233         }
234 
235         @Override
236         public String toString()
237         {
238             _lock.lock();
239             try
240             {
241                 return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state);
242             }
243             finally
244             {
245                 _lock.unlock();
246             }
247         }
248     }
249 }