1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.util;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.concurrent.CancellationException;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.concurrent.locks.Condition;
28 import java.util.concurrent.locks.ReentrantLock;
29
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32 import org.eclipse.jetty.util.thread.NonBlockingThread;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 public class SharedBlockingCallback
50 {
51 static final Logger LOG = Log.getLogger(SharedBlockingCallback.class);
52
53 final ReentrantLock _lock = new ReentrantLock();
54 final Condition _idle = _lock.newCondition();
55 final Condition _complete = _lock.newCondition();
56
57
58 private static Throwable IDLE = new Throwable()
59 {
60 @Override
61 public String toString()
62 {
63 return "IDLE";
64 }
65 };
66
67 private static Throwable SUCCEEDED = new Throwable()
68 {
69 @Override
70 public String toString()
71 {
72 return "SUCCEEDED";
73 }
74 };
75
76 private static Throwable FAILED = new Throwable()
77 {
78 @Override
79 public String toString()
80 {
81 return "FAILED";
82 }
83 };
84
85 Blocker _blocker;
86
87 public SharedBlockingCallback()
88 {
89 _blocker=new Blocker();
90 }
91
92 protected long getIdleTimeout()
93 {
94 return -1;
95 }
96
97 public Blocker acquire() throws IOException
98 {
99 _lock.lock();
100 long idle = getIdleTimeout();
101 try
102 {
103 while (_blocker._state != IDLE)
104 {
105 if (idle>0 && (idle < Long.MAX_VALUE/2))
106 {
107
108 if (!_idle.await(idle*2,TimeUnit.MILLISECONDS))
109 throw new IOException(new TimeoutException());
110 }
111 else
112 _idle.await();
113 }
114 _blocker._state = null;
115 }
116 catch (final InterruptedException e)
117 {
118 throw new InterruptedIOException();
119 }
120 finally
121 {
122 _lock.unlock();
123 }
124 return _blocker;
125 }
126
127 protected void notComplete(Blocker blocker)
128 {
129 LOG.warn("Blocker not complete {}",blocker);
130 if (LOG.isDebugEnabled())
131 LOG.debug(new Throwable());
132 }
133
134
135
136
137
138 public class Blocker implements Callback, Closeable
139 {
140 Throwable _state = IDLE;
141
142 protected Blocker()
143 {
144 }
145
146 @Override
147 public void succeeded()
148 {
149 _lock.lock();
150 try
151 {
152 if (_state == null)
153 {
154 _state = SUCCEEDED;
155 _complete.signalAll();
156 }
157 else
158 throw new IllegalStateException(_state);
159 }
160 finally
161 {
162 _lock.unlock();
163 }
164 }
165
166 @Override
167 public void failed(Throwable cause)
168 {
169 _lock.lock();
170 try
171 {
172 if (_state == null)
173 {
174 if (cause==null)
175 _state=FAILED;
176 else if (cause instanceof BlockerTimeoutException)
177
178 _state=new IOException(cause);
179 else
180 _state=cause;
181 _complete.signalAll();
182 }
183 else
184 throw new IllegalStateException(_state);
185 }
186 finally
187 {
188 _lock.unlock();
189 }
190 }
191
192
193
194
195
196
197
198
199 public void block() throws IOException
200 {
201 if (NonBlockingThread.isNonBlockingThread())
202 LOG.warn("Blocking a NonBlockingThread: ",new Throwable());
203
204 _lock.lock();
205 long idle = getIdleTimeout();
206 try
207 {
208 while (_state == null)
209 {
210 if (idle>0 && (idle < Long.MAX_VALUE/2))
211 {
212
213 if (!_complete.await(idle+idle/2,TimeUnit.MILLISECONDS))
214
215
216 _state=new BlockerTimeoutException();
217 }
218 else
219 _complete.await();
220 }
221
222 if (_state == SUCCEEDED)
223 return;
224 if (_state == IDLE)
225 throw new IllegalStateException("IDLE");
226 if (_state instanceof IOException)
227 throw (IOException)_state;
228 if (_state instanceof CancellationException)
229 throw (CancellationException)_state;
230 if (_state instanceof RuntimeException)
231 throw (RuntimeException)_state;
232 if (_state instanceof Error)
233 throw (Error)_state;
234 throw new IOException(_state);
235 }
236 catch (final InterruptedException e)
237 {
238 throw new InterruptedIOException();
239 }
240 finally
241 {
242 _lock.unlock();
243 }
244 }
245
246
247
248
249
250
251
252 @Override
253 public void close() throws IOException
254 {
255 _lock.lock();
256 try
257 {
258 if (_state == IDLE)
259 throw new IllegalStateException("IDLE");
260 if (_state == null)
261 notComplete(this);
262 }
263 finally
264 {
265 try
266 {
267
268 if (_state instanceof BlockerTimeoutException)
269
270 _blocker=new Blocker();
271 else
272
273 _state = IDLE;
274 _idle.signalAll();
275 _complete.signalAll();
276 }
277 finally
278 {
279 _lock.unlock();
280 }
281 }
282 }
283
284 @Override
285 public String toString()
286 {
287 _lock.lock();
288 try
289 {
290 return String.format("%s@%x{%s}",Blocker.class.getSimpleName(),hashCode(),_state);
291 }
292 finally
293 {
294 _lock.unlock();
295 }
296 }
297 }
298
299 private static class BlockerTimeoutException extends TimeoutException
300 {
301 }
302 }