1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.util;
15
16 import java.util.AbstractList;
17 import java.util.Collection;
18 import java.util.NoSuchElementException;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicInteger;
22 import java.util.concurrent.locks.Condition;
23 import java.util.concurrent.locks.ReentrantLock;
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
44 {
45 public final int DEFAULT_CAPACITY=128;
46 public final int DEFAULT_GROWTH=64;
47 private final int _limit;
48 private final AtomicInteger _size=new AtomicInteger();
49 private final int _growCapacity;
50
51 private volatile int _capacity;
52 private Object[] _elements;
53 private int _head;
54 private int _tail;
55
56 private final ReentrantLock _headLock = new ReentrantLock();
57 private final Condition _notEmpty = _headLock.newCondition();
58 private final ReentrantLock _tailLock = new ReentrantLock();
59
60
61
62
63
64 public BlockingArrayQueue()
65 {
66 _elements=new Object[DEFAULT_CAPACITY];
67 _growCapacity=DEFAULT_GROWTH;
68 _capacity=_elements.length;
69 _limit=Integer.MAX_VALUE;
70 }
71
72
73
74
75
76 public BlockingArrayQueue(int limit)
77 {
78 _elements=new Object[limit];
79 _capacity=_elements.length;
80 _growCapacity=-1;
81 _limit=limit;
82 }
83
84
85
86
87
88
89 public BlockingArrayQueue(int capacity,int growBy)
90 {
91 _elements=new Object[capacity];
92 _capacity=_elements.length;
93 _growCapacity=growBy;
94 _limit=Integer.MAX_VALUE;
95 }
96
97
98
99
100
101
102
103 public BlockingArrayQueue(int capacity,int growBy,int limit)
104 {
105 if (capacity>limit)
106 throw new IllegalArgumentException();
107
108 _elements=new Object[capacity];
109 _capacity=_elements.length;
110 _growCapacity=growBy;
111 _limit=limit;
112 }
113
114
115 public int getCapacity()
116 {
117 return _capacity;
118 }
119
120
121 public int getLimit()
122 {
123 return _limit;
124 }
125
126
127 @Override
128 public boolean add(E e)
129 {
130 return offer(e);
131 }
132
133
134 public E element()
135 {
136 E e = peek();
137 if (e==null)
138 throw new NoSuchElementException();
139 return e;
140 }
141
142
143 @SuppressWarnings("unchecked")
144 public E peek()
145 {
146 if (_size.get() == 0)
147 return null;
148
149 E e = null;
150 _headLock.lock();
151 try
152 {
153 if (_size.get() > 0)
154 e = (E)_elements[_head];
155 }
156 finally
157 {
158 _headLock.unlock();
159 }
160
161 return e;
162 }
163
164
165 public boolean offer(E e)
166 {
167 if (e == null)
168 throw new NullPointerException();
169
170 boolean not_empty=false;
171 _tailLock.lock();
172 try
173 {
174 if (_size.get() >= _limit)
175 return false;
176
177
178 if (_size.get()==_capacity)
179 {
180 _headLock.lock();
181 try
182 {
183 if (!grow())
184 return false;
185 }
186 finally
187 {
188 _headLock.unlock();
189 }
190 }
191
192
193 _elements[_tail]=e;
194 _tail=(_tail+1)%_capacity;
195
196 not_empty=0==_size.getAndIncrement();
197
198 }
199 finally
200 {
201 _tailLock.unlock();
202 }
203
204 if (not_empty)
205 {
206 _headLock.lock();
207 try
208 {
209 _notEmpty.signal();
210 }
211 finally
212 {
213 _headLock.unlock();
214 }
215 }
216
217 return true;
218 }
219
220
221
222 @SuppressWarnings("unchecked")
223 public E poll()
224 {
225 if (_size.get() == 0)
226 return null;
227
228 E e = null;
229 _headLock.lock();
230 try
231 {
232 if (_size.get() > 0)
233 {
234 final int head=_head;
235 e = (E)_elements[head];
236 _elements[head]=null;
237 _head=(head+1)%_capacity;
238
239 if (_size.decrementAndGet()>0)
240 _notEmpty.signal();
241 }
242 }
243 finally
244 {
245 _headLock.unlock();
246 }
247
248 return e;
249 }
250
251
252
253
254
255
256
257
258 @SuppressWarnings("unchecked")
259 public E take() throws InterruptedException
260 {
261 E e = null;
262 _headLock.lockInterruptibly();
263 try
264 {
265 try
266 {
267 while (_size.get() == 0)
268 {
269 _notEmpty.await();
270 }
271 }
272 catch (InterruptedException ie)
273 {
274 _notEmpty.signal();
275 throw ie;
276 }
277
278 final int head=_head;
279 e = (E)_elements[head];
280 _elements[head]=null;
281 _head=(head+1)%_capacity;
282
283 if (_size.decrementAndGet()>0)
284 _notEmpty.signal();
285 }
286 finally
287 {
288 _headLock.unlock();
289 }
290
291 return e;
292 }
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307 @SuppressWarnings("unchecked")
308 public E poll(long time, TimeUnit unit) throws InterruptedException
309 {
310
311 E e = null;
312
313 long nanos = unit.toNanos(time);
314
315 _headLock.lockInterruptibly();
316 try
317 {
318 try
319 {
320 while (_size.get() == 0)
321 {
322 if (nanos<=0)
323 return null;
324 nanos = _notEmpty.awaitNanos(nanos);
325 }
326 }
327 catch (InterruptedException ie)
328 {
329 _notEmpty.signal();
330 throw ie;
331 }
332
333 e = (E)_elements[_head];
334 _elements[_head]=null;
335 _head=(_head+1)%_capacity;
336
337 if (_size.decrementAndGet()>0)
338 _notEmpty.signal();
339 }
340 finally
341 {
342 _headLock.unlock();
343 }
344
345 return e;
346 }
347
348
349 public E remove()
350 {
351 E e=poll();
352 if (e==null)
353 throw new NoSuchElementException();
354 return e;
355 }
356
357
358 @Override
359 public void clear()
360 {
361 _tailLock.lock();
362 try
363 {
364 _headLock.lock();
365 try
366 {
367 _head=0;
368 _tail=0;
369 _size.set(0);
370 }
371 finally
372 {
373 _headLock.unlock();
374 }
375 }
376 finally
377 {
378 _tailLock.unlock();
379 }
380 }
381
382
383 @Override
384 public boolean isEmpty()
385 {
386 return _size.get()==0;
387 }
388
389
390 @Override
391 public int size()
392 {
393 return _size.get();
394 }
395
396
397 @SuppressWarnings("unchecked")
398 @Override
399 public E get(int index)
400 {
401 _tailLock.lock();
402 try
403 {
404 _headLock.lock();
405 try
406 {
407 if (index<0 || index>=_size.get())
408 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
409 int i = _head+index;
410 if (i>=_capacity)
411 i-=_capacity;
412 return (E)_elements[i];
413 }
414 finally
415 {
416 _headLock.unlock();
417 }
418 }
419 finally
420 {
421 _tailLock.unlock();
422 }
423 }
424
425
426 @Override
427 public E remove(int index)
428 {
429 _tailLock.lock();
430 try
431 {
432 _headLock.lock();
433 try
434 {
435
436 if (index<0 || index>=_size.get())
437 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
438
439 int i = _head+index;
440 if (i>=_capacity)
441 i-=_capacity;
442 @SuppressWarnings("unchecked")
443 E old=(E)_elements[i];
444
445 if (i<_tail)
446 {
447 System.arraycopy(_elements,i+1,_elements,i,_tail-i);
448 _tail--;
449 _size.decrementAndGet();
450 }
451 else
452 {
453 System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
454 if (_tail>0)
455 {
456 _elements[_capacity]=_elements[0];
457 System.arraycopy(_elements,1,_elements,0,_tail-1);
458 _tail--;
459 }
460 else
461 _tail=_capacity-1;
462
463 _size.decrementAndGet();
464 }
465
466 return old;
467 }
468 finally
469 {
470 _headLock.unlock();
471 }
472 }
473 finally
474 {
475 _tailLock.unlock();
476 }
477 }
478
479
480 @Override
481 public E set(int index, E e)
482 {
483 if (e == null)
484 throw new NullPointerException();
485
486 _tailLock.lock();
487 try
488 {
489 _headLock.lock();
490 try
491 {
492
493 if (index<0 || index>=_size.get())
494 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
495
496 int i = _head+index;
497 if (i>=_capacity)
498 i-=_capacity;
499 @SuppressWarnings("unchecked")
500 E old=(E)_elements[i];
501 _elements[i]=e;
502 return old;
503 }
504 finally
505 {
506 _headLock.unlock();
507 }
508 }
509 finally
510 {
511 _tailLock.unlock();
512 }
513 }
514
515
516 @Override
517 public void add(int index, E e)
518 {
519 if (e == null)
520 throw new NullPointerException();
521
522 _tailLock.lock();
523 try
524 {
525 _headLock.lock();
526 try
527 {
528
529 if (index<0 || index>_size.get())
530 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
531
532 if (index==_size.get())
533 {
534 add(e);
535 }
536 else
537 {
538 if (_tail==_head)
539 if (!grow())
540 throw new IllegalStateException("full");
541
542 int i = _head+index;
543 if (i>=_capacity)
544 i-=_capacity;
545
546 _size.incrementAndGet();
547 _tail=(_tail+1)%_capacity;
548
549
550 if (i<_tail)
551 {
552 System.arraycopy(_elements,i,_elements,i+1,_tail-i);
553 _elements[i]=e;
554 }
555 else
556 {
557 if (_tail>0)
558 {
559 System.arraycopy(_elements,0,_elements,1,_tail);
560 _elements[0]=_elements[_capacity-1];
561 }
562
563 System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
564 _elements[i]=e;
565 }
566 }
567 }
568 finally
569 {
570 _headLock.unlock();
571 }
572 }
573 finally
574 {
575 _tailLock.unlock();
576 }
577 }
578
579
580 private boolean grow()
581 {
582 if (_growCapacity<=0)
583 return false;
584
585 _tailLock.lock();
586 try
587 {
588 _headLock.lock();
589 try
590 {
591 final int head=_head;
592 final int tail=_tail;
593 final int new_tail;
594
595 Object[] elements=new Object[_capacity+_growCapacity];
596
597 if (head<tail)
598 {
599 new_tail=tail-head;
600 System.arraycopy(_elements,head,elements,0,new_tail);
601 }
602 else if (head>tail || _size.get()>0)
603 {
604 new_tail=_capacity+tail-head;
605 int cut=_capacity-head;
606 System.arraycopy(_elements,head,elements,0,cut);
607 System.arraycopy(_elements,0,elements,cut,tail);
608 }
609 else
610 {
611 new_tail=0;
612 }
613
614 _elements=elements;
615 _capacity=_elements.length;
616 _head=0;
617 _tail=new_tail;
618 return true;
619 }
620 finally
621 {
622 _headLock.unlock();
623 }
624 }
625 finally
626 {
627 _tailLock.unlock();
628 }
629
630 }
631
632
633 public int drainTo(Collection<? super E> c)
634 {
635 throw new UnsupportedOperationException();
636 }
637
638
639 public int drainTo(Collection<? super E> c, int maxElements)
640 {
641 throw new UnsupportedOperationException();
642 }
643
644
645 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
646 {
647 throw new UnsupportedOperationException();
648 }
649
650
651 public void put(E o) throws InterruptedException
652 {
653 if (!add(o))
654 throw new IllegalStateException("full");
655 }
656
657
658 public int remainingCapacity()
659 {
660 _tailLock.lock();
661 try
662 {
663 _headLock.lock();
664 try
665 {
666 return getCapacity()-size();
667 }
668 finally
669 {
670 _headLock.unlock();
671 }
672 }
673 finally
674 {
675 _tailLock.unlock();
676 }
677 }
678 }