1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.util;
20
21 import java.util.AbstractList;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.ListIterator;
25 import java.util.NoSuchElementException;
26 import java.util.Objects;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.concurrent.locks.Condition;
31 import java.util.concurrent.locks.Lock;
32 import java.util.concurrent.locks.ReentrantLock;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
48 {
49
50
51
52
53 private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
54
55
56
57 private static final int TAIL_OFFSET = HEAD_OFFSET + MemoryUtils.getIntegersPerCacheLine();
58
59
60
61 public static final int DEFAULT_CAPACITY = 128;
62
63
64
65 public static final int DEFAULT_GROWTH = 64;
66
67 private final int _maxCapacity;
68 private final int _growCapacity;
69
70
71
72 private final int[] _indexes = new int[TAIL_OFFSET + 1];
73 private final Lock _tailLock = new ReentrantLock();
74 private final AtomicInteger _size = new AtomicInteger();
75 private final Lock _headLock = new ReentrantLock();
76 private final Condition _notEmpty = _headLock.newCondition();
77 private Object[] _elements;
78
79
80
81
82
83
84
85 public BlockingArrayQueue()
86 {
87 _elements = new Object[DEFAULT_CAPACITY];
88 _growCapacity = DEFAULT_GROWTH;
89 _maxCapacity = Integer.MAX_VALUE;
90 }
91
92
93
94
95
96
97
98 public BlockingArrayQueue(int maxCapacity)
99 {
100 _elements = new Object[maxCapacity];
101 _growCapacity = -1;
102 _maxCapacity = maxCapacity;
103 }
104
105
106
107
108
109
110
111
112
113 public BlockingArrayQueue(int capacity, int growBy)
114 {
115 _elements = new Object[capacity];
116 _growCapacity = growBy;
117 _maxCapacity = Integer.MAX_VALUE;
118 }
119
120
121
122
123
124
125
126
127
128
129
130 public BlockingArrayQueue(int capacity, int growBy, int maxCapacity)
131 {
132 if (capacity > maxCapacity)
133 throw new IllegalArgumentException();
134 _elements = new Object[capacity];
135 _growCapacity = growBy;
136 _maxCapacity = maxCapacity;
137 }
138
139
140
141
142
143 @Override
144 public void clear()
145 {
146
147 _tailLock.lock();
148 try
149 {
150
151 _headLock.lock();
152 try
153 {
154 _indexes[HEAD_OFFSET] = 0;
155 _indexes[TAIL_OFFSET] = 0;
156 _size.set(0);
157 }
158 finally
159 {
160 _headLock.unlock();
161 }
162 }
163 finally
164 {
165 _tailLock.unlock();
166 }
167 }
168
169 @Override
170 public int size()
171 {
172 return _size.get();
173 }
174
175 @Override
176 public Iterator<E> iterator()
177 {
178 return listIterator();
179 }
180
181
182
183
184
185 @SuppressWarnings("unchecked")
186 @Override
187 public E poll()
188 {
189 if (_size.get() == 0)
190 return null;
191
192 E e = null;
193
194 _headLock.lock();
195 try
196 {
197 if (_size.get() > 0)
198 {
199 final int head = _indexes[HEAD_OFFSET];
200 e = (E)_elements[head];
201 _elements[head] = null;
202 _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
203 if (_size.decrementAndGet() > 0)
204 _notEmpty.signal();
205 }
206 }
207 finally
208 {
209 _headLock.unlock();
210 }
211 return e;
212 }
213
214 @SuppressWarnings("unchecked")
215 @Override
216 public E peek()
217 {
218 if (_size.get() == 0)
219 return null;
220
221 E e = null;
222
223 _headLock.lock();
224 try
225 {
226 if (_size.get() > 0)
227 e = (E)_elements[_indexes[HEAD_OFFSET]];
228 }
229 finally
230 {
231 _headLock.unlock();
232 }
233 return e;
234 }
235
236 @Override
237 public E remove()
238 {
239 E e = poll();
240 if (e == null)
241 throw new NoSuchElementException();
242 return e;
243 }
244
245 @Override
246 public E element()
247 {
248 E e = peek();
249 if (e == null)
250 throw new NoSuchElementException();
251 return e;
252 }
253
254
255
256
257
258 @Override
259 public boolean offer(E e)
260 {
261 Objects.requireNonNull(e);
262
263 boolean notEmpty = false;
264 _tailLock.lock();
265 try
266 {
267 int size = _size.get();
268 if (size >= _maxCapacity)
269 return false;
270
271
272 if (size == _elements.length)
273 {
274 _headLock.lock();
275 try
276 {
277 if (!grow())
278 return false;
279 }
280 finally
281 {
282 _headLock.unlock();
283 }
284 }
285
286
287 int tail = _indexes[TAIL_OFFSET];
288 _elements[tail] = e;
289 _indexes[TAIL_OFFSET] = (tail + 1) % _elements.length;
290 notEmpty = _size.getAndIncrement() == 0;
291 }
292 finally
293 {
294 _tailLock.unlock();
295 }
296
297 if (notEmpty)
298 {
299 _headLock.lock();
300 try
301 {
302 _notEmpty.signal();
303 }
304 finally
305 {
306 _headLock.unlock();
307 }
308 }
309
310 return true;
311 }
312
313 @Override
314 public boolean add(E e)
315 {
316 if (offer(e))
317 return true;
318 else
319 throw new IllegalStateException();
320 }
321
322 @Override
323 public void put(E o) throws InterruptedException
324 {
325
326 throw new UnsupportedOperationException();
327 }
328
329 @Override
330 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
331 {
332
333 throw new UnsupportedOperationException();
334 }
335
336 @SuppressWarnings("unchecked")
337 @Override
338 public E take() throws InterruptedException
339 {
340 E e = null;
341
342 _headLock.lockInterruptibly();
343 try
344 {
345 try
346 {
347 while (_size.get() == 0)
348 {
349 _notEmpty.await();
350 }
351 }
352 catch (InterruptedException ie)
353 {
354 _notEmpty.signal();
355 throw ie;
356 }
357
358 final int head = _indexes[HEAD_OFFSET];
359 e = (E)_elements[head];
360 _elements[head] = null;
361 _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
362
363 if (_size.decrementAndGet() > 0)
364 _notEmpty.signal();
365 }
366 finally
367 {
368 _headLock.unlock();
369 }
370 return e;
371 }
372
373 @SuppressWarnings("unchecked")
374 @Override
375 public E poll(long time, TimeUnit unit) throws InterruptedException
376 {
377 long nanos = unit.toNanos(time);
378 E e = null;
379
380 _headLock.lockInterruptibly();
381 try
382 {
383 try
384 {
385 while (_size.get() == 0)
386 {
387 if (nanos <= 0)
388 return null;
389 nanos = _notEmpty.awaitNanos(nanos);
390 }
391 }
392 catch (InterruptedException x)
393 {
394 _notEmpty.signal();
395 throw x;
396 }
397
398 int head = _indexes[HEAD_OFFSET];
399 e = (E)_elements[head];
400 _elements[head] = null;
401 _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
402
403 if (_size.decrementAndGet() > 0)
404 _notEmpty.signal();
405 }
406 finally
407 {
408 _headLock.unlock();
409 }
410 return e;
411 }
412
413 @Override
414 public boolean remove(Object o)
415 {
416
417 _tailLock.lock();
418 try
419 {
420
421 _headLock.lock();
422 try
423 {
424 if (isEmpty())
425 return false;
426
427 final int head = _indexes[HEAD_OFFSET];
428 final int tail = _indexes[TAIL_OFFSET];
429 final int capacity = _elements.length;
430
431 int i = head;
432 while (true)
433 {
434 if (Objects.equals(_elements[i],o))
435 {
436 remove(i >= head?i - head:capacity - head + i);
437 return true;
438 }
439 ++i;
440 if (i == capacity)
441 i = 0;
442 if (i == tail)
443 return false;
444 }
445 }
446 finally
447 {
448 _headLock.unlock();
449 }
450 }
451 finally
452 {
453 _tailLock.unlock();
454 }
455 }
456
457 @Override
458 public int remainingCapacity()
459 {
460
461 _tailLock.lock();
462 try
463 {
464
465 _headLock.lock();
466 try
467 {
468 return getCapacity() - size();
469 }
470 finally
471 {
472 _headLock.unlock();
473 }
474 }
475 finally
476 {
477 _tailLock.unlock();
478 }
479 }
480
481 @Override
482 public int drainTo(Collection<? super E> c)
483 {
484 throw new UnsupportedOperationException();
485 }
486
487 @Override
488 public int drainTo(Collection<? super E> c, int maxElements)
489 {
490 throw new UnsupportedOperationException();
491 }
492
493
494
495
496
497 @SuppressWarnings("unchecked")
498 @Override
499 public E get(int index)
500 {
501
502 _tailLock.lock();
503 try
504 {
505
506 _headLock.lock();
507 try
508 {
509 if (index < 0 || index >= _size.get())
510 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
511 int i = _indexes[HEAD_OFFSET] + index;
512 int capacity = _elements.length;
513 if (i >= capacity)
514 i -= capacity;
515 return (E)_elements[i];
516 }
517 finally
518 {
519 _headLock.unlock();
520 }
521 }
522 finally
523 {
524 _tailLock.unlock();
525 }
526 }
527
528 @Override
529 public void add(int index, E e)
530 {
531 if (e == null)
532 throw new NullPointerException();
533
534 _tailLock.lock();
535 try
536 {
537
538 _headLock.lock();
539 try
540 {
541 final int size = _size.get();
542
543 if (index < 0 || index > size)
544 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
545
546 if (index == size)
547 {
548 add(e);
549 }
550 else
551 {
552 if (_indexes[TAIL_OFFSET] == _indexes[HEAD_OFFSET])
553 if (!grow())
554 throw new IllegalStateException("full");
555
556
557 int i = _indexes[HEAD_OFFSET] + index;
558 int capacity = _elements.length;
559
560 if (i >= capacity)
561 i -= capacity;
562
563 _size.incrementAndGet();
564 int tail = _indexes[TAIL_OFFSET];
565 _indexes[TAIL_OFFSET] = tail = (tail + 1) % capacity;
566
567 if (i < tail)
568 {
569 System.arraycopy(_elements,i,_elements,i + 1,tail - i);
570 _elements[i] = e;
571 }
572 else
573 {
574 if (tail > 0)
575 {
576 System.arraycopy(_elements,0,_elements,1,tail);
577 _elements[0] = _elements[capacity - 1];
578 }
579
580 System.arraycopy(_elements,i,_elements,i + 1,capacity - i - 1);
581 _elements[i] = e;
582 }
583 }
584 }
585 finally
586 {
587 _headLock.unlock();
588 }
589 }
590 finally
591 {
592 _tailLock.unlock();
593 }
594 }
595
596 @SuppressWarnings("unchecked")
597 @Override
598 public E set(int index, E e)
599 {
600 Objects.requireNonNull(e);
601
602 _tailLock.lock();
603 try
604 {
605
606 _headLock.lock();
607 try
608 {
609 if (index < 0 || index >= _size.get())
610 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
611
612 int i = _indexes[HEAD_OFFSET] + index;
613 int capacity = _elements.length;
614 if (i >= capacity)
615 i -= capacity;
616 E old = (E)_elements[i];
617 _elements[i] = e;
618 return old;
619 }
620 finally
621 {
622 _headLock.unlock();
623 }
624 }
625 finally
626 {
627 _tailLock.unlock();
628 }
629 }
630
631 @SuppressWarnings("unchecked")
632 @Override
633 public E remove(int index)
634 {
635
636 _tailLock.lock();
637 try
638 {
639
640 _headLock.lock();
641 try
642 {
643 if (index < 0 || index >= _size.get())
644 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
645
646 int i = _indexes[HEAD_OFFSET] + index;
647 int capacity = _elements.length;
648 if (i >= capacity)
649 i -= capacity;
650 E old = (E)_elements[i];
651
652 int tail = _indexes[TAIL_OFFSET];
653 if (i < tail)
654 {
655 System.arraycopy(_elements,i + 1,_elements,i,tail - i);
656 --_indexes[TAIL_OFFSET];
657 }
658 else
659 {
660 System.arraycopy(_elements,i + 1,_elements,i,capacity - i - 1);
661 _elements[capacity - 1] = _elements[0];
662 if (tail > 0)
663 {
664 System.arraycopy(_elements,1,_elements,0,tail);
665 --_indexes[TAIL_OFFSET];
666 }
667 else
668 {
669 _indexes[TAIL_OFFSET] = capacity - 1;
670 }
671 _elements[_indexes[TAIL_OFFSET]] = null;
672 }
673
674 _size.decrementAndGet();
675
676 return old;
677 }
678 finally
679 {
680 _headLock.unlock();
681 }
682 }
683 finally
684 {
685 _tailLock.unlock();
686 }
687 }
688
689 @Override
690 public ListIterator<E> listIterator(int index)
691 {
692
693 _tailLock.lock();
694 try
695 {
696
697 _headLock.lock();
698 try
699 {
700 Object[] elements = new Object[size()];
701 if (size() > 0)
702 {
703 int head = _indexes[HEAD_OFFSET];
704 int tail = _indexes[TAIL_OFFSET];
705 if (head < tail)
706 {
707 System.arraycopy(_elements,head,elements,0,tail - head);
708 }
709 else
710 {
711 int chunk = _elements.length - head;
712 System.arraycopy(_elements,head,elements,0,chunk);
713 System.arraycopy(_elements,0,elements,chunk,tail);
714 }
715 }
716 return new Itr(elements,index);
717 }
718 finally
719 {
720 _headLock.unlock();
721 }
722 }
723 finally
724 {
725 _tailLock.unlock();
726 }
727 }
728
729
730
731
732
733
734
735
736 public int getCapacity()
737 {
738 _tailLock.lock();
739 try
740 {
741 return _elements.length;
742 }
743 finally
744 {
745 _tailLock.unlock();
746 }
747 }
748
749
750
751
752 public int getMaxCapacity()
753 {
754 return _maxCapacity;
755 }
756
757
758
759
760
761 private boolean grow()
762 {
763 if (_growCapacity <= 0)
764 return false;
765
766 _tailLock.lock();
767 try
768 {
769
770 _headLock.lock();
771 try
772 {
773 final int head = _indexes[HEAD_OFFSET];
774 final int tail = _indexes[TAIL_OFFSET];
775 final int newTail;
776 final int capacity = _elements.length;
777
778 Object[] elements = new Object[capacity + _growCapacity];
779
780 if (head < tail)
781 {
782 newTail = tail - head;
783 System.arraycopy(_elements,head,elements,0,newTail);
784 }
785 else if (head > tail || _size.get() > 0)
786 {
787 newTail = capacity + tail - head;
788 int cut = capacity - head;
789 System.arraycopy(_elements,head,elements,0,cut);
790 System.arraycopy(_elements,0,elements,cut,tail);
791 }
792 else
793 {
794 newTail = 0;
795 }
796
797 _elements = elements;
798 _indexes[HEAD_OFFSET] = 0;
799 _indexes[TAIL_OFFSET] = newTail;
800 return true;
801 }
802 finally
803 {
804 _headLock.unlock();
805 }
806 }
807 finally
808 {
809 _tailLock.unlock();
810 }
811 }
812
813 private class Itr implements ListIterator<E>
814 {
815 private final Object[] _elements;
816 private int _cursor;
817
818 public Itr(Object[] elements, int offset)
819 {
820 _elements = elements;
821 _cursor = offset;
822 }
823
824 @Override
825 public boolean hasNext()
826 {
827 return _cursor < _elements.length;
828 }
829
830 @SuppressWarnings("unchecked")
831 @Override
832 public E next()
833 {
834 return (E)_elements[_cursor++];
835 }
836
837 @Override
838 public boolean hasPrevious()
839 {
840 return _cursor > 0;
841 }
842
843 @SuppressWarnings("unchecked")
844 @Override
845 public E previous()
846 {
847 return (E)_elements[--_cursor];
848 }
849
850 @Override
851 public int nextIndex()
852 {
853 return _cursor + 1;
854 }
855
856 @Override
857 public int previousIndex()
858 {
859 return _cursor - 1;
860 }
861
862 @Override
863 public void remove()
864 {
865 throw new UnsupportedOperationException();
866 }
867
868 @Override
869 public void set(E e)
870 {
871 throw new UnsupportedOperationException();
872 }
873
874 @Override
875 public void add(E e)
876 {
877 throw new UnsupportedOperationException();
878 }
879 }
880 }