1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.util;
20
21 import java.util.Collection;
22 import java.util.Objects;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.concurrent.atomic.AtomicLongArray;
27 import java.util.concurrent.locks.Condition;
28 import java.util.concurrent.locks.Lock;
29 import java.util.concurrent.locks.ReentrantLock;
30
31
32
33
34
35
36
37
38 public abstract class ConcurrentArrayBlockingQueue<E> extends ConcurrentArrayQueue<E> implements BlockingQueue<E>
39 {
40 private final Lock _lock = new ReentrantLock();
41 private final Condition _consumer = _lock.newCondition();
42
43 public ConcurrentArrayBlockingQueue(int blockSize)
44 {
45 super(blockSize);
46 }
47
48 @Override
49 public E poll()
50 {
51 E result = super.poll();
52 if (result != null && decrementAndGetSize() > 0)
53 signalConsumer();
54 return result;
55 }
56
57 @Override
58 public boolean remove(Object o)
59 {
60 boolean result = super.remove(o);
61 if (result && decrementAndGetSize() > 0)
62 signalConsumer();
63 return result;
64 }
65
66 protected abstract int decrementAndGetSize();
67
68 protected void signalConsumer()
69 {
70 final Lock lock = _lock;
71 lock.lock();
72 try
73 {
74 _consumer.signal();
75 }
76 finally
77 {
78 lock.unlock();
79 }
80 }
81
82 @Override
83 public E take() throws InterruptedException
84 {
85 while (true)
86 {
87 E result = poll();
88 if (result != null)
89 return result;
90
91 final Lock lock = _lock;
92 lock.lockInterruptibly();
93 try
94 {
95 if (size() == 0)
96 {
97 _consumer.await();
98 }
99 }
100 finally
101 {
102 lock.unlock();
103 }
104 }
105 }
106
107 @Override
108 public E poll(long timeout, TimeUnit unit) throws InterruptedException
109 {
110 long nanos = unit.toNanos(timeout);
111
112 while (true)
113 {
114
115
116 E result = poll();
117 if (result != null)
118 return result;
119
120 final Lock lock = _lock;
121 lock.lockInterruptibly();
122 try
123 {
124 if (size() == 0)
125 {
126 if (nanos <= 0)
127 return null;
128 nanos = _consumer.awaitNanos(nanos);
129 }
130 }
131 finally
132 {
133 lock.unlock();
134 }
135 }
136 }
137
138 @Override
139 public int drainTo(Collection<? super E> c)
140 {
141 return drainTo(c, Integer.MAX_VALUE);
142 }
143
144 @Override
145 public int drainTo(Collection<? super E> c, int maxElements)
146 {
147 if (c == this)
148 throw new IllegalArgumentException();
149
150 int added = 0;
151 while (added < maxElements)
152 {
153 E element = poll();
154 if (element == null)
155 break;
156 c.add(element);
157 ++added;
158 }
159 return added;
160 }
161
162
163
164
165
166
167 public static class Unbounded<E> extends ConcurrentArrayBlockingQueue<E>
168 {
169 private static final int SIZE_LEFT_OFFSET = MemoryUtils.getLongsPerCacheLine() - 1;
170 private static final int SIZE_RIGHT_OFFSET = SIZE_LEFT_OFFSET + MemoryUtils.getLongsPerCacheLine();
171
172 private final AtomicLongArray _sizes = new AtomicLongArray(SIZE_RIGHT_OFFSET+1);
173
174 public Unbounded()
175 {
176 this(DEFAULT_BLOCK_SIZE);
177 }
178
179 public Unbounded(int blockSize)
180 {
181 super(blockSize);
182 }
183
184 @Override
185 public boolean offer(E item)
186 {
187 boolean result = super.offer(item);
188 if (result && getAndIncrementSize() == 0)
189 signalConsumer();
190 return result;
191 }
192
193 private int getAndIncrementSize()
194 {
195 long sizeRight = _sizes.getAndIncrement(SIZE_RIGHT_OFFSET);
196 long sizeLeft = _sizes.get(SIZE_LEFT_OFFSET);
197 return (int)(sizeRight - sizeLeft);
198 }
199
200 @Override
201 protected int decrementAndGetSize()
202 {
203 long sizeLeft = _sizes.incrementAndGet(SIZE_LEFT_OFFSET);
204 long sizeRight = _sizes.get(SIZE_RIGHT_OFFSET);
205 return (int)(sizeRight - sizeLeft);
206 }
207
208 @Override
209 public int size()
210 {
211 long sizeLeft = _sizes.get(SIZE_LEFT_OFFSET);
212 long sizeRight = _sizes.get(SIZE_RIGHT_OFFSET);
213 return (int)(sizeRight - sizeLeft);
214 }
215
216 @Override
217 public int remainingCapacity()
218 {
219 return Integer.MAX_VALUE;
220 }
221
222 @Override
223 public void put(E element) throws InterruptedException
224 {
225 offer(element);
226 }
227
228 @Override
229 public boolean offer(E element, long timeout, TimeUnit unit) throws InterruptedException
230 {
231 return offer(element);
232 }
233 }
234
235
236
237
238
239
240 public static class Bounded<E> extends ConcurrentArrayBlockingQueue<E>
241 {
242 private final AtomicInteger _size = new AtomicInteger();
243 private final Lock _lock = new ReentrantLock();
244 private final Condition _producer = _lock.newCondition();
245 private final int _capacity;
246
247 public Bounded(int capacity)
248 {
249 this(DEFAULT_BLOCK_SIZE, capacity);
250 }
251
252 public Bounded(int blockSize, int capacity)
253 {
254 super(blockSize);
255 this._capacity = capacity;
256 }
257
258 @Override
259 public boolean offer(E item)
260 {
261 while (true)
262 {
263 int size = size();
264 int nextSize = size + 1;
265
266 if (nextSize > _capacity)
267 return false;
268
269 if (_size.compareAndSet(size, nextSize))
270 {
271 if (super.offer(item))
272 {
273 if (size == 0)
274 signalConsumer();
275 return true;
276 }
277 else
278 {
279 decrementAndGetSize();
280 }
281 }
282 }
283 }
284
285 @Override
286 public E poll()
287 {
288 E result = super.poll();
289 if (result != null)
290 signalProducer();
291 return result;
292 }
293
294 @Override
295 public boolean remove(Object o)
296 {
297 boolean result = super.remove(o);
298 if (result)
299 signalProducer();
300 return result;
301 }
302
303 @Override
304 protected int decrementAndGetSize()
305 {
306 return _size.decrementAndGet();
307 }
308
309 @Override
310 public int size()
311 {
312 return _size.get();
313 }
314
315 @Override
316 public int remainingCapacity()
317 {
318 return _capacity - size();
319 }
320
321 @Override
322 public void put(E item) throws InterruptedException
323 {
324 item = Objects.requireNonNull(item);
325
326 while (true)
327 {
328 final Lock lock = _lock;
329 lock.lockInterruptibly();
330 try
331 {
332 if (size() == _capacity)
333 _producer.await();
334 }
335 finally
336 {
337 lock.unlock();
338 }
339 if (offer(item))
340 break;
341 }
342 }
343
344 @Override
345 public boolean offer(E item, long timeout, TimeUnit unit) throws InterruptedException
346 {
347 item = Objects.requireNonNull(item);
348
349 long nanos = unit.toNanos(timeout);
350 while (true)
351 {
352 final Lock lock = _lock;
353 lock.lockInterruptibly();
354 try
355 {
356 if (size() == _capacity)
357 {
358 if (nanos <= 0)
359 return false;
360 nanos = _producer.awaitNanos(nanos);
361 }
362 }
363 finally
364 {
365 lock.unlock();
366 }
367 if (offer(item))
368 break;
369 }
370
371 return true;
372 }
373
374 @Override
375 public int drainTo(Collection<? super E> c, int maxElements)
376 {
377 int result = super.drainTo(c, maxElements);
378 if (result > 0)
379 signalProducers();
380 return result;
381 }
382
383 @Override
384 public void clear()
385 {
386 super.clear();
387 signalProducers();
388 }
389
390 private void signalProducer()
391 {
392 final Lock lock = _lock;
393 lock.lock();
394 try
395 {
396 _producer.signal();
397 }
398 finally
399 {
400 lock.unlock();
401 }
402 }
403
404 private void signalProducers()
405 {
406 final Lock lock = _lock;
407 lock.lock();
408 try
409 {
410 _producer.signalAll();
411 }
412 finally
413 {
414 lock.unlock();
415 }
416 }
417 }
418 }