View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.concurrent.CancellationException;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.TimeoutException;
27  import java.util.concurrent.locks.Condition;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.eclipse.jetty.util.log.Log;
31  import org.eclipse.jetty.util.log.Logger;
32  import org.eclipse.jetty.util.thread.NonBlockingThread;
33  
34  
35  /* ------------------------------------------------------------ */
36  /** Provides a reusable BlockingCallback.
37   * A typical usage pattern is:
38   * <pre>
39   * void someBlockingCall(Object... args) throws IOException
40   * {
41   *   try(Blocker blocker=sharedBlockingCallback.acquire())
42   *   {
43   *     someAsyncCall(args,blocker);
44   *     blocker.block();
45   *   }
46   * }
47   * </pre>
48   */
49  public class SharedBlockingCallback
50  {
51      static final Logger LOG = Log.getLogger(SharedBlockingCallback.class);
52  
53      final ReentrantLock _lock = new ReentrantLock();
54      final Condition _idle = _lock.newCondition();
55      final Condition _complete = _lock.newCondition();
56  
57      
58      private static Throwable IDLE = new Throwable()
59      {
60          @Override
61          public String toString()
62          {
63              return "IDLE";
64          }
65      };
66  
67      private static Throwable SUCCEEDED = new Throwable()
68      {
69          @Override
70          public String toString()
71          {
72              return "SUCCEEDED";
73          }
74      };
75      
76      private static Throwable FAILED = new Throwable()
77      {
78          @Override
79          public String toString()
80          {
81              return "FAILED";
82          }
83      };
84  
85      Blocker _blocker;
86      
87      public SharedBlockingCallback()
88      {
89          _blocker=new Blocker();
90      }
91      
92      protected long getIdleTimeout()
93      {
94          return -1;
95      }
96      
97      public Blocker acquire() throws IOException
98      {
99          _lock.lock();
100         long idle = getIdleTimeout();
101         try
102         {
103             while (_blocker._state != IDLE)
104             {
105                 if (idle>0 && (idle < Long.MAX_VALUE/2))
106                 {
107                     // Wait a little bit longer than the blocker might block
108                     if (!_idle.await(idle*2,TimeUnit.MILLISECONDS))
109                         throw new IOException(new TimeoutException());
110                 }
111                 else
112                     _idle.await();
113             }
114             _blocker._state = null;
115         }
116         catch (final InterruptedException e)
117         {
118             throw new InterruptedIOException();
119         }
120         finally
121         {
122             _lock.unlock();
123         }
124         return _blocker;
125     }
126 
127     protected void notComplete(Blocker blocker)
128     {
129         LOG.warn("Blocker not complete {}",blocker);
130         if (LOG.isDebugEnabled())
131             LOG.debug(new Throwable());
132     }
133     
134     /* ------------------------------------------------------------ */
135     /** A Closeable Callback.
136      * Uses the auto close mechanism to check block has been called OK.
137      */
138     public class Blocker implements Callback, Closeable
139     {
140         Throwable _state = IDLE;
141         
142         protected Blocker()
143         {
144         }
145 
146         @Override
147         public void succeeded()
148         {
149             _lock.lock();
150             try
151             {
152                 if (_state == null)
153                 {
154                     _state = SUCCEEDED;
155                     _complete.signalAll();
156                 }
157                 else
158                     throw new IllegalStateException(_state);
159             }
160             finally
161             {
162                 _lock.unlock();
163             }
164         }
165 
166         @Override
167         public void failed(Throwable cause)
168         {
169             _lock.lock();
170             try
171             {
172                 if (_state == null)
173                 {
174                     if (cause==null)
175                         _state=FAILED;
176                     else if (cause instanceof BlockerTimeoutException)
177                         // Not this blockers timeout
178                         _state=new IOException(cause);
179                     else 
180                         _state=cause;
181                     _complete.signalAll();
182                 }
183                 else 
184                     throw new IllegalStateException(_state);
185             }
186             finally
187             {
188                 _lock.unlock();
189             }
190         }
191 
192         /**
193          * Block until the Callback has succeeded or failed and after the return leave in the state to allow reuse. This is useful for code that wants to
194          * repeatable use a FutureCallback to convert an asynchronous API to a blocking API.
195          * 
196          * @throws IOException
197          *             if exception was caught during blocking, or callback was cancelled
198          */
199         public void block() throws IOException
200         {
201             if (NonBlockingThread.isNonBlockingThread())
202                 LOG.warn("Blocking a NonBlockingThread: ",new Throwable());
203             
204             _lock.lock();
205             long idle = getIdleTimeout();
206             try
207             {
208                 while (_state == null)
209                 {
210                     if (idle>0 && (idle < Long.MAX_VALUE/2))
211                     {
212                         // Wait a little bit longer than expected callback idle timeout
213                         if (!_complete.await(idle+idle/2,TimeUnit.MILLISECONDS))
214                             // The callback has not arrived in sufficient time.
215                             // We will synthesize a TimeoutException 
216                             _state=new BlockerTimeoutException();
217                     }
218                     else
219                         _complete.await();
220                 }
221 
222                 if (_state == SUCCEEDED)
223                     return;
224                 if (_state == IDLE)
225                     throw new IllegalStateException("IDLE");
226                 if (_state instanceof IOException)
227                     throw (IOException)_state;
228                 if (_state instanceof CancellationException)
229                     throw (CancellationException)_state;
230                 if (_state instanceof RuntimeException)
231                     throw (RuntimeException)_state;
232                 if (_state instanceof Error)
233                     throw (Error)_state;
234                 throw new IOException(_state);
235             }
236             catch (final InterruptedException e)
237             {
238                 throw new InterruptedIOException();
239             }
240             finally
241             {
242                 _lock.unlock();
243             }
244         }
245         
246         /**
247          * Check the Callback has succeeded or failed and after the return leave in the state to allow reuse.
248          * 
249          * @throws IOException
250          *             if exception was caught during blocking, or callback was cancelled
251          */
252         @Override
253         public void close() throws IOException
254         {
255             _lock.lock();
256             try
257             {
258                 if (_state == IDLE)
259                     throw new IllegalStateException("IDLE");
260                 if (_state == null)
261                     notComplete(this);
262             }
263             finally
264             {
265                 try 
266                 {
267                     // If the blocker timed itself out, remember the state
268                     if (_state instanceof BlockerTimeoutException)
269                         // and create a new Blocker
270                         _blocker=new Blocker();
271                     else
272                         // else reuse Blocker
273                         _state = IDLE;
274                     _idle.signalAll();
275                     _complete.signalAll();
276                 } 
277                 finally 
278                 {
279                     _lock.unlock();
280                 }
281             }
282         }
283 
284         @Override
285         public String toString()
286         {
287             _lock.lock();
288             try
289             {
290                 return String.format("%s@%x{%s}",Blocker.class.getSimpleName(),hashCode(),_state);
291             }
292             finally
293             {
294                 _lock.unlock();
295             }
296         }
297     }
298     
299     private static class BlockerTimeoutException extends TimeoutException
300     { 
301     }
302 }