View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.concurrent.CancellationException;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.TimeoutException;
27  import java.util.concurrent.locks.Condition;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.eclipse.jetty.util.log.Log;
31  import org.eclipse.jetty.util.log.Logger;
32  
33  /**
34   * Provides a reusable {@link Callback} that can block the thread
35   * while waiting to be completed.
36   * <p>
37   * A typical usage pattern is:
38   * <pre>
39   * void someBlockingCall(Object... args) throws IOException
40   * {
41   *     try(Blocker blocker = sharedBlockingCallback.acquire())
42   *     {
43   *         someAsyncCall(args, blocker);
44   *         blocker.block();
45   *     }
46   * }
47   * </pre>
48   */
49  public class SharedBlockingCallback
50  {
51      static final Logger LOG = Log.getLogger(SharedBlockingCallback.class);
52      private static Throwable IDLE = new Throwable()
53      {
54          @Override
55          public String toString()
56          {
57              return "IDLE";
58          }
59      };
60      private static Throwable SUCCEEDED = new Throwable()
61      {
62          @Override
63          public String toString()
64          {
65              return "SUCCEEDED";
66          }
67      };
68      private static Throwable FAILED = new Throwable()
69      {
70          @Override
71          public String toString()
72          {
73              return "FAILED";
74          }
75      };
76  
77      private final ReentrantLock _lock = new ReentrantLock();
78      private final Condition _idle = _lock.newCondition();
79      private final Condition _complete = _lock.newCondition();
80      private Blocker _blocker = new Blocker();
81      
82      protected long getIdleTimeout()
83      {
84          return -1;
85      }
86      
87      public Blocker acquire() throws IOException
88      {
89          _lock.lock();
90          long idle = getIdleTimeout();
91          try
92          {
93              while (_blocker._state != IDLE)
94              {
95                  if (idle>0 && (idle < Long.MAX_VALUE/2))
96                  {
97                      // Wait a little bit longer than the blocker might block
98                      if (!_idle.await(idle*2,TimeUnit.MILLISECONDS))
99                          throw new IOException(new TimeoutException());
100                 }
101                 else
102                     _idle.await();
103             }
104             _blocker._state = null;
105         }
106         catch (final InterruptedException e)
107         {
108             throw new InterruptedIOException();
109         }
110         finally
111         {
112             _lock.unlock();
113         }
114         return _blocker;
115     }
116 
117     protected void notComplete(Blocker blocker)
118     {
119         LOG.warn("Blocker not complete {}",blocker);
120         if (LOG.isDebugEnabled())
121             LOG.debug(new Throwable());
122     }
123     
124     /**
125      * A Closeable Callback.
126      * Uses the auto close mechanism to check block has been called OK.
127      * <p>Implements {@link Callback.NonBlocking} because calls to this
128      * callback do not blocak, rather they wakeup the thread that is blocked
129      * in {@link #block()}
130      */
131     public class Blocker implements Callback.NonBlocking, Closeable
132     {
133         private Throwable _state = IDLE;
134         
135         protected Blocker()
136         {
137         }
138         
139         @Override
140         public void succeeded()
141         {
142             _lock.lock();
143             try
144             {
145                 if (_state == null)
146                 {
147                     _state = SUCCEEDED;
148                     _complete.signalAll();
149                 }
150                 else
151                     throw new IllegalStateException(_state);
152             }
153             finally
154             {
155                 _lock.unlock();
156             }
157         }
158 
159         @Override
160         public void failed(Throwable cause)
161         {
162             _lock.lock();
163             try
164             {
165                 if (_state == null)
166                 {
167                     if (cause==null)
168                         _state=FAILED;
169                     else if (cause instanceof BlockerTimeoutException)
170                         // Not this blockers timeout
171                         _state=new IOException(cause);
172                     else 
173                         _state=cause;
174                     _complete.signalAll();
175                 }
176                 else 
177                     throw new IllegalStateException(_state);
178             }
179             finally
180             {
181                 _lock.unlock();
182             }
183         }
184 
185         /**
186          * Block until the Callback has succeeded or failed and after the return leave in the state to allow reuse. This is useful for code that wants to
187          * repeatable use a FutureCallback to convert an asynchronous API to a blocking API.
188          * 
189          * @throws IOException
190          *             if exception was caught during blocking, or callback was cancelled
191          */
192         public void block() throws IOException
193         {
194             _lock.lock();
195             long idle = getIdleTimeout();
196             try
197             {
198                 while (_state == null)
199                 {
200                     if (idle>0 && (idle < Long.MAX_VALUE/2))
201                     {
202                         // Wait a little bit longer than expected callback idle timeout
203                         if (!_complete.await(idle+idle/2,TimeUnit.MILLISECONDS))
204                             // The callback has not arrived in sufficient time.
205                             // We will synthesize a TimeoutException 
206                             _state=new BlockerTimeoutException();
207                     }
208                     else
209                     {
210                         _complete.await();
211                     }
212                 }
213 
214                 if (_state == SUCCEEDED)
215                     return;
216                 if (_state == IDLE)
217                     throw new IllegalStateException("IDLE");
218                 if (_state instanceof IOException)
219                     throw (IOException)_state;
220                 if (_state instanceof CancellationException)
221                     throw (CancellationException)_state;
222                 if (_state instanceof RuntimeException)
223                     throw (RuntimeException)_state;
224                 if (_state instanceof Error)
225                     throw (Error)_state;
226                 throw new IOException(_state);
227             }
228             catch (final InterruptedException e)
229             {
230                 throw new InterruptedIOException();
231             }
232             finally
233             {
234                 _lock.unlock();
235             }
236         }
237         
238         /**
239          * Check the Callback has succeeded or failed and after the return leave in the state to allow reuse.
240          */
241         @Override
242         public void close()
243         {
244             _lock.lock();
245             try
246             {
247                 if (_state == IDLE)
248                     throw new IllegalStateException("IDLE");
249                 if (_state == null)
250                     notComplete(this);
251             }
252             finally
253             {
254                 try 
255                 {
256                     // If the blocker timed itself out, remember the state
257                     if (_state instanceof BlockerTimeoutException)
258                         // and create a new Blocker
259                         _blocker=new Blocker();
260                     else
261                         // else reuse Blocker
262                         _state = IDLE;
263                     _idle.signalAll();
264                     _complete.signalAll();
265                 } 
266                 finally 
267                 {
268                     _lock.unlock();
269                 }
270             }
271         }
272 
273         @Override
274         public String toString()
275         {
276             _lock.lock();
277             try
278             {
279                 return String.format("%s@%x{%s}",Blocker.class.getSimpleName(),hashCode(),_state);
280             }
281             finally
282             {
283                 _lock.unlock();
284             }
285         }
286     }
287     
288     private static class BlockerTimeoutException extends TimeoutException
289     { 
290     }
291 }