1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.util;
20
21 import java.util.AbstractList;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.ListIterator;
25 import java.util.NoSuchElementException;
26 import java.util.Objects;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.concurrent.locks.Condition;
31 import java.util.concurrent.locks.Lock;
32 import java.util.concurrent.locks.ReentrantLock;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
50 {
51
52
53
54
55
56 private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
57
58
59
60
61 private static final int TAIL_OFFSET = HEAD_OFFSET + MemoryUtils.getIntegersPerCacheLine();
62
63
64
65 public static final int DEFAULT_CAPACITY = 128;
66
67
68
69 public static final int DEFAULT_GROWTH = 64;
70
71 private final int _maxCapacity;
72 private final int _growCapacity;
73
74
75
76 private final int[] _indexes = new int[TAIL_OFFSET + 1];
77 private final Lock _tailLock = new ReentrantLock();
78 private final AtomicInteger _size = new AtomicInteger();
79 private final Lock _headLock = new ReentrantLock();
80 private final Condition _notEmpty = _headLock.newCondition();
81 private Object[] _elements;
82
83
84
85
86
87
88
89 public BlockingArrayQueue()
90 {
91 _elements = new Object[DEFAULT_CAPACITY];
92 _growCapacity = DEFAULT_GROWTH;
93 _maxCapacity = Integer.MAX_VALUE;
94 }
95
96
97
98
99
100
101
102 public BlockingArrayQueue(int maxCapacity)
103 {
104 _elements = new Object[maxCapacity];
105 _growCapacity = -1;
106 _maxCapacity = maxCapacity;
107 }
108
109
110
111
112
113
114
115 public BlockingArrayQueue(int capacity, int growBy)
116 {
117 _elements = new Object[capacity];
118 _growCapacity = growBy;
119 _maxCapacity = Integer.MAX_VALUE;
120 }
121
122
123
124
125
126
127
128
129 public BlockingArrayQueue(int capacity, int growBy, int maxCapacity)
130 {
131 if (capacity > maxCapacity)
132 throw new IllegalArgumentException();
133 _elements = new Object[capacity];
134 _growCapacity = growBy;
135 _maxCapacity = maxCapacity;
136 }
137
138
139
140
141
142 @Override
143 public void clear()
144 {
145 final Lock tailLock = _tailLock;
146 tailLock.lock();
147 try
148 {
149 final Lock headLock = _headLock;
150 headLock.lock();
151 try
152 {
153 _indexes[HEAD_OFFSET] = 0;
154 _indexes[TAIL_OFFSET] = 0;
155 _size.set(0);
156 }
157 finally
158 {
159 headLock.unlock();
160 }
161 }
162 finally
163 {
164 tailLock.unlock();
165 }
166 }
167
168 @Override
169 public int size()
170 {
171 return _size.get();
172 }
173
174 @Override
175 public Iterator<E> iterator()
176 {
177 return listIterator();
178 }
179
180
181
182
183
184 @SuppressWarnings("unchecked")
185 @Override
186 public E poll()
187 {
188 if (_size.get() == 0)
189 return null;
190
191 E e = null;
192 final Lock headLock = _headLock;
193 headLock.lock();
194 try
195 {
196 if (_size.get() > 0)
197 {
198 final int head = _indexes[HEAD_OFFSET];
199 e = (E)_elements[head];
200 _elements[head] = null;
201 _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
202 if (_size.decrementAndGet() > 0)
203 _notEmpty.signal();
204 }
205 }
206 finally
207 {
208 headLock.unlock();
209 }
210 return e;
211 }
212
213 @SuppressWarnings("unchecked")
214 @Override
215 public E peek()
216 {
217 if (_size.get() == 0)
218 return null;
219
220 E e = null;
221 final Lock headLock = _headLock;
222 headLock.lock();
223 try
224 {
225 if (_size.get() > 0)
226 e = (E)_elements[_indexes[HEAD_OFFSET]];
227 }
228 finally
229 {
230 headLock.unlock();
231 }
232 return e;
233 }
234
235 @Override
236 public E remove()
237 {
238 E e = poll();
239 if (e == null)
240 throw new NoSuchElementException();
241 return e;
242 }
243
244 @Override
245 public E element()
246 {
247 E e = peek();
248 if (e == null)
249 throw new NoSuchElementException();
250 return e;
251 }
252
253
254
255
256
257 @Override
258 public boolean offer(E e)
259 {
260 Objects.requireNonNull(e);
261
262 final Lock tailLock = _tailLock;
263 final Lock headLock = _headLock;
264 boolean notEmpty = false;
265 tailLock.lock();
266 try
267 {
268 int size = _size.get();
269 if (size >= _maxCapacity)
270 return false;
271
272
273 if (size == _elements.length)
274 {
275 headLock.lock();
276 try
277 {
278 if (!grow())
279 return false;
280 }
281 finally
282 {
283 headLock.unlock();
284 }
285 }
286
287
288 int tail = _indexes[TAIL_OFFSET];
289 _elements[tail] = e;
290 _indexes[TAIL_OFFSET] = (tail + 1) % _elements.length;
291 notEmpty = _size.getAndIncrement() == 0;
292 }
293 finally
294 {
295 tailLock.unlock();
296 }
297
298 if (notEmpty)
299 {
300 headLock.lock();
301 try
302 {
303 _notEmpty.signal();
304 }
305 finally
306 {
307 headLock.unlock();
308 }
309 }
310
311 return true;
312 }
313
314 @Override
315 public boolean add(E e)
316 {
317 if (offer(e))
318 return true;
319 else
320 throw new IllegalStateException();
321 }
322
323 @Override
324 public void put(E o) throws InterruptedException
325 {
326
327 throw new UnsupportedOperationException();
328 }
329
330 @Override
331 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
332 {
333
334 throw new UnsupportedOperationException();
335 }
336
337 @SuppressWarnings("unchecked")
338 @Override
339 public E take() throws InterruptedException
340 {
341 E e = null;
342 final Lock headLock = _headLock;
343 headLock.lockInterruptibly();
344 try
345 {
346 try
347 {
348 while (_size.get() == 0)
349 {
350 _notEmpty.await();
351 }
352 }
353 catch (InterruptedException ie)
354 {
355 _notEmpty.signal();
356 throw ie;
357 }
358
359 final int head = _indexes[HEAD_OFFSET];
360 e = (E)_elements[head];
361 _elements[head] = null;
362 _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
363
364 if (_size.decrementAndGet() > 0)
365 _notEmpty.signal();
366 }
367 finally
368 {
369 headLock.unlock();
370 }
371 return e;
372 }
373
374 @SuppressWarnings("unchecked")
375 @Override
376 public E poll(long time, TimeUnit unit) throws InterruptedException
377 {
378 long nanos = unit.toNanos(time);
379 E e = null;
380 final Lock headLock = _headLock;
381 headLock.lockInterruptibly();
382 try
383 {
384 try
385 {
386 while (_size.get() == 0)
387 {
388 if (nanos <= 0)
389 return null;
390 nanos = _notEmpty.awaitNanos(nanos);
391 }
392 }
393 catch (InterruptedException x)
394 {
395 _notEmpty.signal();
396 throw x;
397 }
398
399 int head = _indexes[HEAD_OFFSET];
400 e = (E)_elements[head];
401 _elements[head] = null;
402 _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
403
404 if (_size.decrementAndGet() > 0)
405 _notEmpty.signal();
406 }
407 finally
408 {
409 headLock.unlock();
410 }
411 return e;
412 }
413
414 @Override
415 public boolean remove(Object o)
416 {
417 final Lock tailLock = _tailLock;
418 tailLock.lock();
419 try
420 {
421 final Lock headLock = _headLock;
422 headLock.lock();
423 try
424 {
425 if (isEmpty())
426 return false;
427
428 final int head = _indexes[HEAD_OFFSET];
429 final int tail = _indexes[TAIL_OFFSET];
430 final int capacity = _elements.length;
431
432 int i = head;
433 while (true)
434 {
435 if (Objects.equals(_elements[i], o))
436 {
437 remove(i >= head ? i - head : capacity - head + i);
438 return true;
439 }
440 ++i;
441 if (i == capacity)
442 i = 0;
443 if (i == tail)
444 return false;
445 }
446 }
447 finally
448 {
449 headLock.unlock();
450 }
451 }
452 finally
453 {
454 tailLock.unlock();
455 }
456 }
457
458 @Override
459 public int remainingCapacity()
460 {
461 final Lock tailLock = _tailLock;
462 tailLock.lock();
463 try
464 {
465 final Lock headLock = _headLock;
466 headLock.lock();
467 try
468 {
469 return getCapacity() - size();
470 }
471 finally
472 {
473 headLock.unlock();
474 }
475 }
476 finally
477 {
478 tailLock.unlock();
479 }
480 }
481
482 @Override
483 public int drainTo(Collection<? super E> c)
484 {
485 throw new UnsupportedOperationException();
486 }
487
488 @Override
489 public int drainTo(Collection<? super E> c, int maxElements)
490 {
491 throw new UnsupportedOperationException();
492 }
493
494
495
496
497
498 @SuppressWarnings("unchecked")
499 @Override
500 public E get(int index)
501 {
502 final Lock tailLock = _tailLock;
503 tailLock.lock();
504 try
505 {
506 final Lock headLock = _headLock;
507 headLock.lock();
508 try
509 {
510 if (index < 0 || index >= _size.get())
511 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
512 int i = _indexes[HEAD_OFFSET] + index;
513 int capacity = _elements.length;
514 if (i >= capacity)
515 i -= capacity;
516 return (E)_elements[i];
517 }
518 finally
519 {
520 headLock.unlock();
521 }
522 }
523 finally
524 {
525 tailLock.unlock();
526 }
527 }
528
529 @Override
530 public void add(int index, E e)
531 {
532 if (e == null)
533 throw new NullPointerException();
534
535 final Lock tailLock = _tailLock;
536 tailLock.lock();
537 try
538 {
539 final Lock headLock = _headLock;
540 headLock.lock();
541 try
542 {
543 final int size = _size.get();
544
545 if (index < 0 || index > size)
546 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
547
548 if (index == size)
549 {
550 add(e);
551 }
552 else
553 {
554 if (_indexes[TAIL_OFFSET] == _indexes[HEAD_OFFSET])
555 if (!grow())
556 throw new IllegalStateException("full");
557
558
559 int i = _indexes[HEAD_OFFSET] + index;
560 int capacity = _elements.length;
561
562 if (i >= capacity)
563 i -= capacity;
564
565 _size.incrementAndGet();
566 int tail = _indexes[TAIL_OFFSET];
567 _indexes[TAIL_OFFSET] = tail = (tail + 1) % capacity;
568
569 if (i < tail)
570 {
571 System.arraycopy(_elements, i, _elements, i + 1, tail - i);
572 _elements[i] = e;
573 }
574 else
575 {
576 if (tail > 0)
577 {
578 System.arraycopy(_elements, 0, _elements, 1, tail);
579 _elements[0] = _elements[capacity - 1];
580 }
581
582 System.arraycopy(_elements, i, _elements, i + 1, capacity - i - 1);
583 _elements[i] = e;
584 }
585 }
586 }
587 finally
588 {
589 headLock.unlock();
590 }
591 }
592 finally
593 {
594 tailLock.unlock();
595 }
596 }
597
598 @SuppressWarnings("unchecked")
599 @Override
600 public E set(int index, E e)
601 {
602 Objects.requireNonNull(e);
603
604 final Lock tailLock = _tailLock;
605 tailLock.lock();
606 try
607 {
608 final Lock headLock = _headLock;
609 headLock.lock();
610 try
611 {
612 if (index < 0 || index >= _size.get())
613 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
614
615 int i = _indexes[HEAD_OFFSET] + index;
616 int capacity = _elements.length;
617 if (i >= capacity)
618 i -= capacity;
619 E old = (E)_elements[i];
620 _elements[i] = e;
621 return old;
622 }
623 finally
624 {
625 headLock.unlock();
626 }
627 }
628 finally
629 {
630 tailLock.unlock();
631 }
632 }
633
634 @SuppressWarnings("unchecked")
635 @Override
636 public E remove(int index)
637 {
638 final Lock tailLock = _tailLock;
639 tailLock.lock();
640 try
641 {
642 final Lock headLock = _headLock;
643 headLock.lock();
644 try
645 {
646 if (index < 0 || index >= _size.get())
647 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
648
649 int i = _indexes[HEAD_OFFSET] + index;
650 int capacity = _elements.length;
651 if (i >= capacity)
652 i -= capacity;
653 E old = (E)_elements[i];
654
655 int tail = _indexes[TAIL_OFFSET];
656 if (i < tail)
657 {
658 System.arraycopy(_elements, i + 1, _elements, i, tail - i);
659 --_indexes[TAIL_OFFSET];
660 }
661 else
662 {
663 System.arraycopy(_elements, i + 1, _elements, i, capacity - i - 1);
664 _elements[capacity - 1] = _elements[0];
665 if (tail > 0)
666 {
667 System.arraycopy(_elements, 1, _elements, 0, tail);
668 --_indexes[TAIL_OFFSET];
669 }
670 else
671 {
672 _indexes[TAIL_OFFSET] = capacity - 1;
673 }
674 _elements[_indexes[TAIL_OFFSET]] = null;
675 }
676
677 _size.decrementAndGet();
678
679 return old;
680 }
681 finally
682 {
683 headLock.unlock();
684 }
685 }
686 finally
687 {
688 tailLock.unlock();
689 }
690 }
691
692 @Override
693 public ListIterator<E> listIterator(int index)
694 {
695 final Lock tailLock = _tailLock;
696 tailLock.lock();
697 try
698 {
699 final Lock headLock = _headLock;
700 headLock.lock();
701 try
702 {
703 Object[] elements = new Object[size()];
704 if (size() > 0)
705 {
706 int head = _indexes[HEAD_OFFSET];
707 int tail = _indexes[TAIL_OFFSET];
708 if (head < tail)
709 {
710 System.arraycopy(_elements, head, elements, 0, tail - head);
711 }
712 else
713 {
714 int chunk = _elements.length - head;
715 System.arraycopy(_elements, head, elements, 0, chunk);
716 System.arraycopy(_elements, 0, elements, chunk, tail);
717 }
718 }
719 return new Itr(elements, index);
720 }
721 finally
722 {
723 headLock.unlock();
724 }
725 }
726 finally
727 {
728 tailLock.unlock();
729 }
730 }
731
732
733
734
735
736
737
738
739 public int getCapacity()
740 {
741 return _elements.length;
742 }
743
744
745
746
747 public int getMaxCapacity()
748 {
749 return _maxCapacity;
750 }
751
752
753
754
755
756 private boolean grow()
757 {
758 if (_growCapacity <= 0)
759 return false;
760
761 final Lock tailLock = _tailLock;
762 tailLock.lock();
763 try
764 {
765 final Lock headLock = _headLock;
766 headLock.lock();
767 try
768 {
769 final int head = _indexes[HEAD_OFFSET];
770 final int tail = _indexes[TAIL_OFFSET];
771 final int newTail;
772 final int capacity = _elements.length;
773
774 Object[] elements = new Object[capacity + _growCapacity];
775
776 if (head < tail)
777 {
778 newTail = tail - head;
779 System.arraycopy(_elements, head, elements, 0, newTail);
780 }
781 else if (head > tail || _size.get() > 0)
782 {
783 newTail = capacity + tail - head;
784 int cut = capacity - head;
785 System.arraycopy(_elements, head, elements, 0, cut);
786 System.arraycopy(_elements, 0, elements, cut, tail);
787 }
788 else
789 {
790 newTail = 0;
791 }
792
793 _elements = elements;
794 _indexes[HEAD_OFFSET] = 0;
795 _indexes[TAIL_OFFSET] = newTail;
796 return true;
797 }
798 finally
799 {
800 headLock.unlock();
801 }
802 }
803 finally
804 {
805 tailLock.unlock();
806 }
807 }
808
809 private class Itr implements ListIterator<E>
810 {
811 private final Object[] _elements;
812 private int _cursor;
813
814 public Itr(Object[] elements, int offset)
815 {
816 _elements = elements;
817 _cursor = offset;
818 }
819
820 @Override
821 public boolean hasNext()
822 {
823 return _cursor < _elements.length;
824 }
825
826 @SuppressWarnings("unchecked")
827 @Override
828 public E next()
829 {
830 return (E)_elements[_cursor++];
831 }
832
833 @Override
834 public boolean hasPrevious()
835 {
836 return _cursor > 0;
837 }
838
839 @SuppressWarnings("unchecked")
840 @Override
841 public E previous()
842 {
843 return (E)_elements[--_cursor];
844 }
845
846 @Override
847 public int nextIndex()
848 {
849 return _cursor + 1;
850 }
851
852 @Override
853 public int previousIndex()
854 {
855 return _cursor - 1;
856 }
857
858 @Override
859 public void remove()
860 {
861 throw new UnsupportedOperationException();
862 }
863
864 @Override
865 public void set(E e)
866 {
867 throw new UnsupportedOperationException();
868 }
869
870 @Override
871 public void add(E e)
872 {
873 throw new UnsupportedOperationException();
874 }
875 }
876 }