1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.util;
20
21 import java.util.AbstractList;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.ListIterator;
25 import java.util.NoSuchElementException;
26 import java.util.Objects;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.concurrent.locks.Condition;
31 import java.util.concurrent.locks.Lock;
32 import java.util.concurrent.locks.ReentrantLock;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
51 {
52
53
54
55
56 private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
57
58
59
60 private static final int TAIL_OFFSET = HEAD_OFFSET + MemoryUtils.getIntegersPerCacheLine();
61
62
63
64 public static final int DEFAULT_CAPACITY = 128;
65
66
67
68 public static final int DEFAULT_GROWTH = 64;
69
70 private final int _maxCapacity;
71 private final int _growCapacity;
72
73
74
75 private final int[] _indexes = new int[TAIL_OFFSET + 1];
76 private final Lock _tailLock = new ReentrantLock();
77 private final AtomicInteger _size = new AtomicInteger();
78 private final Lock _headLock = new ReentrantLock();
79 private final Condition _notEmpty = _headLock.newCondition();
80 private Object[] _elements;
81
82
83
84
85
86
87
88 public BlockingArrayQueue()
89 {
90 _elements = new Object[DEFAULT_CAPACITY];
91 _growCapacity = DEFAULT_GROWTH;
92 _maxCapacity = Integer.MAX_VALUE;
93 }
94
95
96
97
98
99
100
101 public BlockingArrayQueue(int maxCapacity)
102 {
103 _elements = new Object[maxCapacity];
104 _growCapacity = -1;
105 _maxCapacity = maxCapacity;
106 }
107
108
109
110
111
112
113
114
115
116 public BlockingArrayQueue(int capacity, int growBy)
117 {
118 _elements = new Object[capacity];
119 _growCapacity = growBy;
120 _maxCapacity = Integer.MAX_VALUE;
121 }
122
123
124
125
126
127
128
129
130
131
132
133 public BlockingArrayQueue(int capacity, int growBy, int maxCapacity)
134 {
135 if (capacity > maxCapacity)
136 throw new IllegalArgumentException();
137 _elements = new Object[capacity];
138 _growCapacity = growBy;
139 _maxCapacity = maxCapacity;
140 }
141
142
143
144
145
146 @Override
147 public void clear()
148 {
149
150 _tailLock.lock();
151 try
152 {
153
154 _headLock.lock();
155 try
156 {
157 _indexes[HEAD_OFFSET] = 0;
158 _indexes[TAIL_OFFSET] = 0;
159 _size.set(0);
160 }
161 finally
162 {
163 _headLock.unlock();
164 }
165 }
166 finally
167 {
168 _tailLock.unlock();
169 }
170 }
171
172 @Override
173 public int size()
174 {
175 return _size.get();
176 }
177
178 @Override
179 public Iterator<E> iterator()
180 {
181 return listIterator();
182 }
183
184
185
186
187
188 @SuppressWarnings("unchecked")
189 @Override
190 public E poll()
191 {
192 if (_size.get() == 0)
193 return null;
194
195 E e = null;
196
197 _headLock.lock();
198 try
199 {
200 if (_size.get() > 0)
201 {
202 final int head = _indexes[HEAD_OFFSET];
203 e = (E)_elements[head];
204 _elements[head] = null;
205 _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
206 if (_size.decrementAndGet() > 0)
207 _notEmpty.signal();
208 }
209 }
210 finally
211 {
212 _headLock.unlock();
213 }
214 return e;
215 }
216
217 @SuppressWarnings("unchecked")
218 @Override
219 public E peek()
220 {
221 if (_size.get() == 0)
222 return null;
223
224 E e = null;
225
226 _headLock.lock();
227 try
228 {
229 if (_size.get() > 0)
230 e = (E)_elements[_indexes[HEAD_OFFSET]];
231 }
232 finally
233 {
234 _headLock.unlock();
235 }
236 return e;
237 }
238
239 @Override
240 public E remove()
241 {
242 E e = poll();
243 if (e == null)
244 throw new NoSuchElementException();
245 return e;
246 }
247
248 @Override
249 public E element()
250 {
251 E e = peek();
252 if (e == null)
253 throw new NoSuchElementException();
254 return e;
255 }
256
257
258
259
260
261 @Override
262 public boolean offer(E e)
263 {
264 Objects.requireNonNull(e);
265
266 boolean notEmpty = false;
267 _tailLock.lock();
268 try
269 {
270 int size = _size.get();
271 if (size >= _maxCapacity)
272 return false;
273
274
275 if (size == _elements.length)
276 {
277 _headLock.lock();
278 try
279 {
280 if (!grow())
281 return false;
282 }
283 finally
284 {
285 _headLock.unlock();
286 }
287 }
288
289
290 int tail = _indexes[TAIL_OFFSET];
291 _elements[tail] = e;
292 _indexes[TAIL_OFFSET] = (tail + 1) % _elements.length;
293 notEmpty = _size.getAndIncrement() == 0;
294 }
295 finally
296 {
297 _tailLock.unlock();
298 }
299
300 if (notEmpty)
301 {
302 _headLock.lock();
303 try
304 {
305 _notEmpty.signal();
306 }
307 finally
308 {
309 _headLock.unlock();
310 }
311 }
312
313 return true;
314 }
315
316 @Override
317 public boolean add(E e)
318 {
319 if (offer(e))
320 return true;
321 else
322 throw new IllegalStateException();
323 }
324
325 @Override
326 public void put(E o) throws InterruptedException
327 {
328
329 throw new UnsupportedOperationException();
330 }
331
332 @Override
333 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
334 {
335
336 throw new UnsupportedOperationException();
337 }
338
339 @SuppressWarnings("unchecked")
340 @Override
341 public E take() throws InterruptedException
342 {
343 E e = null;
344
345 _headLock.lockInterruptibly();
346 try
347 {
348 try
349 {
350 while (_size.get() == 0)
351 {
352 _notEmpty.await();
353 }
354 }
355 catch (InterruptedException ie)
356 {
357 _notEmpty.signal();
358 throw ie;
359 }
360
361 final int head = _indexes[HEAD_OFFSET];
362 e = (E)_elements[head];
363 _elements[head] = null;
364 _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
365
366 if (_size.decrementAndGet() > 0)
367 _notEmpty.signal();
368 }
369 finally
370 {
371 _headLock.unlock();
372 }
373 return e;
374 }
375
376 @SuppressWarnings("unchecked")
377 @Override
378 public E poll(long time, TimeUnit unit) throws InterruptedException
379 {
380 long nanos = unit.toNanos(time);
381 E e = null;
382
383 _headLock.lockInterruptibly();
384 try
385 {
386 try
387 {
388 while (_size.get() == 0)
389 {
390 if (nanos <= 0)
391 return null;
392 nanos = _notEmpty.awaitNanos(nanos);
393 }
394 }
395 catch (InterruptedException x)
396 {
397 _notEmpty.signal();
398 throw x;
399 }
400
401 int head = _indexes[HEAD_OFFSET];
402 e = (E)_elements[head];
403 _elements[head] = null;
404 _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
405
406 if (_size.decrementAndGet() > 0)
407 _notEmpty.signal();
408 }
409 finally
410 {
411 _headLock.unlock();
412 }
413 return e;
414 }
415
416 @Override
417 public boolean remove(Object o)
418 {
419
420 _tailLock.lock();
421 try
422 {
423
424 _headLock.lock();
425 try
426 {
427 if (isEmpty())
428 return false;
429
430 final int head = _indexes[HEAD_OFFSET];
431 final int tail = _indexes[TAIL_OFFSET];
432 final int capacity = _elements.length;
433
434 int i = head;
435 while (true)
436 {
437 if (Objects.equals(_elements[i],o))
438 {
439 remove(i >= head?i - head:capacity - head + i);
440 return true;
441 }
442 ++i;
443 if (i == capacity)
444 i = 0;
445 if (i == tail)
446 return false;
447 }
448 }
449 finally
450 {
451 _headLock.unlock();
452 }
453 }
454 finally
455 {
456 _tailLock.unlock();
457 }
458 }
459
460 @Override
461 public int remainingCapacity()
462 {
463
464 _tailLock.lock();
465 try
466 {
467
468 _headLock.lock();
469 try
470 {
471 return getCapacity() - size();
472 }
473 finally
474 {
475 _headLock.unlock();
476 }
477 }
478 finally
479 {
480 _tailLock.unlock();
481 }
482 }
483
484 @Override
485 public int drainTo(Collection<? super E> c)
486 {
487 throw new UnsupportedOperationException();
488 }
489
490 @Override
491 public int drainTo(Collection<? super E> c, int maxElements)
492 {
493 throw new UnsupportedOperationException();
494 }
495
496
497
498
499
500 @SuppressWarnings("unchecked")
501 @Override
502 public E get(int index)
503 {
504
505 _tailLock.lock();
506 try
507 {
508
509 _headLock.lock();
510 try
511 {
512 if (index < 0 || index >= _size.get())
513 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
514 int i = _indexes[HEAD_OFFSET] + index;
515 int capacity = _elements.length;
516 if (i >= capacity)
517 i -= capacity;
518 return (E)_elements[i];
519 }
520 finally
521 {
522 _headLock.unlock();
523 }
524 }
525 finally
526 {
527 _tailLock.unlock();
528 }
529 }
530
531 @Override
532 public void add(int index, E e)
533 {
534 if (e == null)
535 throw new NullPointerException();
536
537 _tailLock.lock();
538 try
539 {
540
541 _headLock.lock();
542 try
543 {
544 final int size = _size.get();
545
546 if (index < 0 || index > size)
547 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
548
549 if (index == size)
550 {
551 add(e);
552 }
553 else
554 {
555 if (_indexes[TAIL_OFFSET] == _indexes[HEAD_OFFSET])
556 if (!grow())
557 throw new IllegalStateException("full");
558
559
560 int i = _indexes[HEAD_OFFSET] + index;
561 int capacity = _elements.length;
562
563 if (i >= capacity)
564 i -= capacity;
565
566 _size.incrementAndGet();
567 int tail = _indexes[TAIL_OFFSET];
568 _indexes[TAIL_OFFSET] = tail = (tail + 1) % capacity;
569
570 if (i < tail)
571 {
572 System.arraycopy(_elements,i,_elements,i + 1,tail - i);
573 _elements[i] = e;
574 }
575 else
576 {
577 if (tail > 0)
578 {
579 System.arraycopy(_elements,0,_elements,1,tail);
580 _elements[0] = _elements[capacity - 1];
581 }
582
583 System.arraycopy(_elements,i,_elements,i + 1,capacity - i - 1);
584 _elements[i] = e;
585 }
586 }
587 }
588 finally
589 {
590 _headLock.unlock();
591 }
592 }
593 finally
594 {
595 _tailLock.unlock();
596 }
597 }
598
599 @SuppressWarnings("unchecked")
600 @Override
601 public E set(int index, E e)
602 {
603 Objects.requireNonNull(e);
604
605 _tailLock.lock();
606 try
607 {
608
609 _headLock.lock();
610 try
611 {
612 if (index < 0 || index >= _size.get())
613 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
614
615 int i = _indexes[HEAD_OFFSET] + index;
616 int capacity = _elements.length;
617 if (i >= capacity)
618 i -= capacity;
619 E old = (E)_elements[i];
620 _elements[i] = e;
621 return old;
622 }
623 finally
624 {
625 _headLock.unlock();
626 }
627 }
628 finally
629 {
630 _tailLock.unlock();
631 }
632 }
633
634 @SuppressWarnings("unchecked")
635 @Override
636 public E remove(int index)
637 {
638
639 _tailLock.lock();
640 try
641 {
642
643 _headLock.lock();
644 try
645 {
646 if (index < 0 || index >= _size.get())
647 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
648
649 int i = _indexes[HEAD_OFFSET] + index;
650 int capacity = _elements.length;
651 if (i >= capacity)
652 i -= capacity;
653 E old = (E)_elements[i];
654
655 int tail = _indexes[TAIL_OFFSET];
656 if (i < tail)
657 {
658 System.arraycopy(_elements,i + 1,_elements,i,tail - i);
659 --_indexes[TAIL_OFFSET];
660 }
661 else
662 {
663 System.arraycopy(_elements,i + 1,_elements,i,capacity - i - 1);
664 _elements[capacity - 1] = _elements[0];
665 if (tail > 0)
666 {
667 System.arraycopy(_elements,1,_elements,0,tail);
668 --_indexes[TAIL_OFFSET];
669 }
670 else
671 {
672 _indexes[TAIL_OFFSET] = capacity - 1;
673 }
674 _elements[_indexes[TAIL_OFFSET]] = null;
675 }
676
677 _size.decrementAndGet();
678
679 return old;
680 }
681 finally
682 {
683 _headLock.unlock();
684 }
685 }
686 finally
687 {
688 _tailLock.unlock();
689 }
690 }
691
692 @Override
693 public ListIterator<E> listIterator(int index)
694 {
695
696 _tailLock.lock();
697 try
698 {
699
700 _headLock.lock();
701 try
702 {
703 Object[] elements = new Object[size()];
704 if (size() > 0)
705 {
706 int head = _indexes[HEAD_OFFSET];
707 int tail = _indexes[TAIL_OFFSET];
708 if (head < tail)
709 {
710 System.arraycopy(_elements,head,elements,0,tail - head);
711 }
712 else
713 {
714 int chunk = _elements.length - head;
715 System.arraycopy(_elements,head,elements,0,chunk);
716 System.arraycopy(_elements,0,elements,chunk,tail);
717 }
718 }
719 return new Itr(elements,index);
720 }
721 finally
722 {
723 _headLock.unlock();
724 }
725 }
726 finally
727 {
728 _tailLock.unlock();
729 }
730 }
731
732
733
734
735
736
737
738
739 public int getCapacity()
740 {
741 _tailLock.lock();
742 try
743 {
744 return _elements.length;
745 }
746 finally
747 {
748 _tailLock.unlock();
749 }
750 }
751
752
753
754
755 public int getMaxCapacity()
756 {
757 return _maxCapacity;
758 }
759
760
761
762
763
764 private boolean grow()
765 {
766 if (_growCapacity <= 0)
767 return false;
768
769 _tailLock.lock();
770 try
771 {
772
773 _headLock.lock();
774 try
775 {
776 final int head = _indexes[HEAD_OFFSET];
777 final int tail = _indexes[TAIL_OFFSET];
778 final int newTail;
779 final int capacity = _elements.length;
780
781 Object[] elements = new Object[capacity + _growCapacity];
782
783 if (head < tail)
784 {
785 newTail = tail - head;
786 System.arraycopy(_elements,head,elements,0,newTail);
787 }
788 else if (head > tail || _size.get() > 0)
789 {
790 newTail = capacity + tail - head;
791 int cut = capacity - head;
792 System.arraycopy(_elements,head,elements,0,cut);
793 System.arraycopy(_elements,0,elements,cut,tail);
794 }
795 else
796 {
797 newTail = 0;
798 }
799
800 _elements = elements;
801 _indexes[HEAD_OFFSET] = 0;
802 _indexes[TAIL_OFFSET] = newTail;
803 return true;
804 }
805 finally
806 {
807 _headLock.unlock();
808 }
809 }
810 finally
811 {
812 _tailLock.unlock();
813 }
814 }
815
816 private class Itr implements ListIterator<E>
817 {
818 private final Object[] _elements;
819 private int _cursor;
820
821 public Itr(Object[] elements, int offset)
822 {
823 _elements = elements;
824 _cursor = offset;
825 }
826
827 @Override
828 public boolean hasNext()
829 {
830 return _cursor < _elements.length;
831 }
832
833 @SuppressWarnings("unchecked")
834 @Override
835 public E next()
836 {
837 return (E)_elements[_cursor++];
838 }
839
840 @Override
841 public boolean hasPrevious()
842 {
843 return _cursor > 0;
844 }
845
846 @SuppressWarnings("unchecked")
847 @Override
848 public E previous()
849 {
850 return (E)_elements[--_cursor];
851 }
852
853 @Override
854 public int nextIndex()
855 {
856 return _cursor + 1;
857 }
858
859 @Override
860 public int previousIndex()
861 {
862 return _cursor - 1;
863 }
864
865 @Override
866 public void remove()
867 {
868 throw new UnsupportedOperationException();
869 }
870
871 @Override
872 public void set(E e)
873 {
874 throw new UnsupportedOperationException();
875 }
876
877 @Override
878 public void add(E e)
879 {
880 throw new UnsupportedOperationException();
881 }
882 }
883 }