View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util;
20  
21  import java.util.AbstractList;
22  import java.util.Collection;
23  import java.util.Iterator;
24  import java.util.ListIterator;
25  import java.util.NoSuchElementException;
26  import java.util.Objects;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicInteger;
30  import java.util.concurrent.locks.Condition;
31  import java.util.concurrent.locks.Lock;
32  import java.util.concurrent.locks.ReentrantLock;
33  
34  /**
35   * A BlockingQueue backed by a circular array capable or growing.
36   * <p>
37   * This queue is uses a variant of the two lock queue algorithm to provide an efficient queue or list backed by a growable circular array.
38   * </p>
39   * <p>
40   * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is able to grow and provides a blocking put call.
41   * </p>
42   * <p>
43   * The queue has both a capacity (the size of the array currently allocated) and a max capacity (the maximum size that may be allocated), which defaults to
44   * {@link Integer#MAX_VALUE}.
45   * </p>
46   * 
47   * @param <E>
48   *            The element type
49   */
50  public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
51  {
52      /**
53       * The head offset in the {@link #_indexes} array, displaced by 15 slots to avoid false sharing with the array length (stored before the first element of
54       * the array itself).
55       */
56      private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
57      /**
58       * The tail offset in the {@link #_indexes} array, displaced by 16 slots from the head to avoid false sharing with it.
59       */
60      private static final int TAIL_OFFSET = HEAD_OFFSET + MemoryUtils.getIntegersPerCacheLine();
61      /**
62       * Default initial capacity, 128.
63       */
64      public static final int DEFAULT_CAPACITY = 128;
65      /**
66       * Default growth factor, 64.
67       */
68      public static final int DEFAULT_GROWTH = 64;
69  
70      private final int _maxCapacity;
71      private final int _growCapacity;
72      /**
73       * Array that holds the head and tail indexes, separated by a cache line to avoid false sharing
74       */
75      private final int[] _indexes = new int[TAIL_OFFSET + 1];
76      private final Lock _tailLock = new ReentrantLock();
77      private final AtomicInteger _size = new AtomicInteger();
78      private final Lock _headLock = new ReentrantLock();
79      private final Condition _notEmpty = _headLock.newCondition();
80      private Object[] _elements;
81  
82      /**
83       * Creates an unbounded {@link BlockingArrayQueue} with default initial capacity and grow factor.
84       * 
85       * @see #DEFAULT_CAPACITY
86       * @see #DEFAULT_GROWTH
87       */
88      public BlockingArrayQueue()
89      {
90          _elements = new Object[DEFAULT_CAPACITY];
91          _growCapacity = DEFAULT_GROWTH;
92          _maxCapacity = Integer.MAX_VALUE;
93      }
94  
95      /**
96       * Creates a bounded {@link BlockingArrayQueue} that does not grow. The capacity of the queue is fixed and equal to the given parameter.
97       * 
98       * @param maxCapacity
99       *            the maximum capacity
100      */
101     public BlockingArrayQueue(int maxCapacity)
102     {
103         _elements = new Object[maxCapacity];
104         _growCapacity = -1;
105         _maxCapacity = maxCapacity;
106     }
107 
108     /**
109      * Creates an unbounded {@link BlockingArrayQueue} that grows by the given parameter.
110      * 
111      * @param capacity
112      *            the initial capacity
113      * @param growBy
114      *            the growth factor
115      */
116     public BlockingArrayQueue(int capacity, int growBy)
117     {
118         _elements = new Object[capacity];
119         _growCapacity = growBy;
120         _maxCapacity = Integer.MAX_VALUE;
121     }
122 
123     /**
124      * Create a bounded {@link BlockingArrayQueue} that grows by the given parameter.
125      * 
126      * @param capacity
127      *            the initial capacity
128      * @param growBy
129      *            the growth factor
130      * @param maxCapacity
131      *            the maximum capacity
132      */
133     public BlockingArrayQueue(int capacity, int growBy, int maxCapacity)
134     {
135         if (capacity > maxCapacity)
136             throw new IllegalArgumentException();
137         _elements = new Object[capacity];
138         _growCapacity = growBy;
139         _maxCapacity = maxCapacity;
140     }
141 
142     /*----------------------------------------------------------------------------*/
143     /* Collection methods */
144     /*----------------------------------------------------------------------------*/
145 
146     @Override
147     public void clear()
148     {
149 
150         _tailLock.lock();
151         try
152         {
153 
154             _headLock.lock();
155             try
156             {
157                 _indexes[HEAD_OFFSET] = 0;
158                 _indexes[TAIL_OFFSET] = 0;
159                 _size.set(0);
160             }
161             finally
162             {
163                 _headLock.unlock();
164             }
165         }
166         finally
167         {
168             _tailLock.unlock();
169         }
170     }
171 
172     @Override
173     public int size()
174     {
175         return _size.get();
176     }
177 
178     @Override
179     public Iterator<E> iterator()
180     {
181         return listIterator();
182     }
183 
184     /*----------------------------------------------------------------------------*/
185     /* Queue methods */
186     /*----------------------------------------------------------------------------*/
187 
188     @SuppressWarnings("unchecked")
189     @Override
190     public E poll()
191     {
192         if (_size.get() == 0)
193             return null;
194 
195         E e = null;
196 
197         _headLock.lock(); // Size cannot shrink
198         try
199         {
200             if (_size.get() > 0)
201             {
202                 final int head = _indexes[HEAD_OFFSET];
203                 e = (E)_elements[head];
204                 _elements[head] = null;
205                 _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
206                 if (_size.decrementAndGet() > 0)
207                     _notEmpty.signal();
208             }
209         }
210         finally
211         {
212             _headLock.unlock();
213         }
214         return e;
215     }
216 
217     @SuppressWarnings("unchecked")
218     @Override
219     public E peek()
220     {
221         if (_size.get() == 0)
222             return null;
223 
224         E e = null;
225 
226         _headLock.lock(); // Size cannot shrink
227         try
228         {
229             if (_size.get() > 0)
230                 e = (E)_elements[_indexes[HEAD_OFFSET]];
231         }
232         finally
233         {
234             _headLock.unlock();
235         }
236         return e;
237     }
238 
239     @Override
240     public E remove()
241     {
242         E e = poll();
243         if (e == null)
244             throw new NoSuchElementException();
245         return e;
246     }
247 
248     @Override
249     public E element()
250     {
251         E e = peek();
252         if (e == null)
253             throw new NoSuchElementException();
254         return e;
255     }
256 
257     /*----------------------------------------------------------------------------*/
258     /* BlockingQueue methods */
259     /*----------------------------------------------------------------------------*/
260 
261     @Override
262     public boolean offer(E e)
263     {
264         Objects.requireNonNull(e);
265 
266         boolean notEmpty = false;
267         _tailLock.lock(); // Size cannot grow... only shrink
268         try
269         {
270             int size = _size.get();
271             if (size >= _maxCapacity)
272                 return false;
273 
274             // Should we expand array?
275             if (size == _elements.length)
276             {
277                 _headLock.lock();
278                 try
279                 {
280                     if (!grow())
281                         return false;
282                 }
283                 finally
284                 {
285                     _headLock.unlock();
286                 }
287             }
288 
289             // Re-read head and tail after a possible grow
290             int tail = _indexes[TAIL_OFFSET];
291             _elements[tail] = e;
292             _indexes[TAIL_OFFSET] = (tail + 1) % _elements.length;
293             notEmpty = _size.getAndIncrement() == 0;
294         }
295         finally
296         {
297             _tailLock.unlock();
298         }
299 
300         if (notEmpty)
301         {
302             _headLock.lock();
303             try
304             {
305                 _notEmpty.signal();
306             }
307             finally
308             {
309                 _headLock.unlock();
310             }
311         }
312 
313         return true;
314     }
315 
316     @Override
317     public boolean add(E e)
318     {
319         if (offer(e))
320             return true;
321         else
322             throw new IllegalStateException();
323     }
324 
325     @Override
326     public void put(E o) throws InterruptedException
327     {
328         // The mechanism to await and signal when the queue is full is not implemented
329         throw new UnsupportedOperationException();
330     }
331 
332     @Override
333     public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
334     {
335         // The mechanism to await and signal when the queue is full is not implemented
336         throw new UnsupportedOperationException();
337     }
338 
339     @SuppressWarnings("unchecked")
340     @Override
341     public E take() throws InterruptedException
342     {
343         E e = null;
344 
345         _headLock.lockInterruptibly(); // Size cannot shrink
346         try
347         {
348             try
349             {
350                 while (_size.get() == 0)
351                 {
352                     _notEmpty.await();
353                 }
354             }
355             catch (InterruptedException ie)
356             {
357                 _notEmpty.signal();
358                 throw ie;
359             }
360 
361             final int head = _indexes[HEAD_OFFSET];
362             e = (E)_elements[head];
363             _elements[head] = null;
364             _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
365 
366             if (_size.decrementAndGet() > 0)
367                 _notEmpty.signal();
368         }
369         finally
370         {
371             _headLock.unlock();
372         }
373         return e;
374     }
375 
376     @SuppressWarnings("unchecked")
377     @Override
378     public E poll(long time, TimeUnit unit) throws InterruptedException
379     {
380         long nanos = unit.toNanos(time);
381         E e = null;
382 
383         _headLock.lockInterruptibly(); // Size cannot shrink
384         try
385         {
386             try
387             {
388                 while (_size.get() == 0)
389                 {
390                     if (nanos <= 0)
391                         return null;
392                     nanos = _notEmpty.awaitNanos(nanos);
393                 }
394             }
395             catch (InterruptedException x)
396             {
397                 _notEmpty.signal();
398                 throw x;
399             }
400 
401             int head = _indexes[HEAD_OFFSET];
402             e = (E)_elements[head];
403             _elements[head] = null;
404             _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
405 
406             if (_size.decrementAndGet() > 0)
407                 _notEmpty.signal();
408         }
409         finally
410         {
411             _headLock.unlock();
412         }
413         return e;
414     }
415 
416     @Override
417     public boolean remove(Object o)
418     {
419 
420         _tailLock.lock();
421         try
422         {
423 
424             _headLock.lock();
425             try
426             {
427                 if (isEmpty())
428                     return false;
429 
430                 final int head = _indexes[HEAD_OFFSET];
431                 final int tail = _indexes[TAIL_OFFSET];
432                 final int capacity = _elements.length;
433 
434                 int i = head;
435                 while (true)
436                 {
437                     if (Objects.equals(_elements[i],o))
438                     {
439                         remove(i >= head?i - head:capacity - head + i);
440                         return true;
441                     }
442                     ++i;
443                     if (i == capacity)
444                         i = 0;
445                     if (i == tail)
446                         return false;
447                 }
448             }
449             finally
450             {
451                 _headLock.unlock();
452             }
453         }
454         finally
455         {
456             _tailLock.unlock();
457         }
458     }
459 
460     @Override
461     public int remainingCapacity()
462     {
463 
464         _tailLock.lock();
465         try
466         {
467 
468             _headLock.lock();
469             try
470             {
471                 return getCapacity() - size();
472             }
473             finally
474             {
475                 _headLock.unlock();
476             }
477         }
478         finally
479         {
480             _tailLock.unlock();
481         }
482     }
483 
484     @Override
485     public int drainTo(Collection<? super E> c)
486     {
487         throw new UnsupportedOperationException();
488     }
489 
490     @Override
491     public int drainTo(Collection<? super E> c, int maxElements)
492     {
493         throw new UnsupportedOperationException();
494     }
495 
496     /*----------------------------------------------------------------------------*/
497     /* List methods */
498     /*----------------------------------------------------------------------------*/
499 
500     @SuppressWarnings("unchecked")
501     @Override
502     public E get(int index)
503     {
504 
505         _tailLock.lock();
506         try
507         {
508 
509             _headLock.lock();
510             try
511             {
512                 if (index < 0 || index >= _size.get())
513                     throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
514                 int i = _indexes[HEAD_OFFSET] + index;
515                 int capacity = _elements.length;
516                 if (i >= capacity)
517                     i -= capacity;
518                 return (E)_elements[i];
519             }
520             finally
521             {
522                 _headLock.unlock();
523             }
524         }
525         finally
526         {
527             _tailLock.unlock();
528         }
529     }
530 
531     @Override
532     public void add(int index, E e)
533     {
534         if (e == null)
535             throw new NullPointerException();
536 
537         _tailLock.lock();
538         try
539         {
540 
541             _headLock.lock();
542             try
543             {
544                 final int size = _size.get();
545 
546                 if (index < 0 || index > size)
547                     throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
548 
549                 if (index == size)
550                 {
551                     add(e);
552                 }
553                 else
554                 {
555                     if (_indexes[TAIL_OFFSET] == _indexes[HEAD_OFFSET])
556                         if (!grow())
557                             throw new IllegalStateException("full");
558 
559                     // Re-read head and tail after a possible grow
560                     int i = _indexes[HEAD_OFFSET] + index;
561                     int capacity = _elements.length;
562 
563                     if (i >= capacity)
564                         i -= capacity;
565 
566                     _size.incrementAndGet();
567                     int tail = _indexes[TAIL_OFFSET];
568                     _indexes[TAIL_OFFSET] = tail = (tail + 1) % capacity;
569 
570                     if (i < tail)
571                     {
572                         System.arraycopy(_elements,i,_elements,i + 1,tail - i);
573                         _elements[i] = e;
574                     }
575                     else
576                     {
577                         if (tail > 0)
578                         {
579                             System.arraycopy(_elements,0,_elements,1,tail);
580                             _elements[0] = _elements[capacity - 1];
581                         }
582 
583                         System.arraycopy(_elements,i,_elements,i + 1,capacity - i - 1);
584                         _elements[i] = e;
585                     }
586                 }
587             }
588             finally
589             {
590                 _headLock.unlock();
591             }
592         }
593         finally
594         {
595             _tailLock.unlock();
596         }
597     }
598 
599     @SuppressWarnings("unchecked")
600     @Override
601     public E set(int index, E e)
602     {
603         Objects.requireNonNull(e);
604 
605         _tailLock.lock();
606         try
607         {
608 
609             _headLock.lock();
610             try
611             {
612                 if (index < 0 || index >= _size.get())
613                     throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
614 
615                 int i = _indexes[HEAD_OFFSET] + index;
616                 int capacity = _elements.length;
617                 if (i >= capacity)
618                     i -= capacity;
619                 E old = (E)_elements[i];
620                 _elements[i] = e;
621                 return old;
622             }
623             finally
624             {
625                 _headLock.unlock();
626             }
627         }
628         finally
629         {
630             _tailLock.unlock();
631         }
632     }
633 
634     @SuppressWarnings("unchecked")
635     @Override
636     public E remove(int index)
637     {
638 
639         _tailLock.lock();
640         try
641         {
642 
643             _headLock.lock();
644             try
645             {
646                 if (index < 0 || index >= _size.get())
647                     throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
648 
649                 int i = _indexes[HEAD_OFFSET] + index;
650                 int capacity = _elements.length;
651                 if (i >= capacity)
652                     i -= capacity;
653                 E old = (E)_elements[i];
654 
655                 int tail = _indexes[TAIL_OFFSET];
656                 if (i < tail)
657                 {
658                     System.arraycopy(_elements,i + 1,_elements,i,tail - i);
659                     --_indexes[TAIL_OFFSET];
660                 }
661                 else
662                 {
663                     System.arraycopy(_elements,i + 1,_elements,i,capacity - i - 1);
664                     _elements[capacity - 1] = _elements[0];
665                     if (tail > 0)
666                     {
667                         System.arraycopy(_elements,1,_elements,0,tail);
668                         --_indexes[TAIL_OFFSET];
669                     }
670                     else
671                     {
672                         _indexes[TAIL_OFFSET] = capacity - 1;
673                     }
674                     _elements[_indexes[TAIL_OFFSET]] = null;
675                 }
676 
677                 _size.decrementAndGet();
678 
679                 return old;
680             }
681             finally
682             {
683                 _headLock.unlock();
684             }
685         }
686         finally
687         {
688             _tailLock.unlock();
689         }
690     }
691 
692     @Override
693     public ListIterator<E> listIterator(int index)
694     {
695 
696         _tailLock.lock();
697         try
698         {
699 
700             _headLock.lock();
701             try
702             {
703                 Object[] elements = new Object[size()];
704                 if (size() > 0)
705                 {
706                     int head = _indexes[HEAD_OFFSET];
707                     int tail = _indexes[TAIL_OFFSET];
708                     if (head < tail)
709                     {
710                         System.arraycopy(_elements,head,elements,0,tail - head);
711                     }
712                     else
713                     {
714                         int chunk = _elements.length - head;
715                         System.arraycopy(_elements,head,elements,0,chunk);
716                         System.arraycopy(_elements,0,elements,chunk,tail);
717                     }
718                 }
719                 return new Itr(elements,index);
720             }
721             finally
722             {
723                 _headLock.unlock();
724             }
725         }
726         finally
727         {
728             _tailLock.unlock();
729         }
730     }
731 
732     /*----------------------------------------------------------------------------*/
733     /* Additional methods */
734     /*----------------------------------------------------------------------------*/
735 
736     /**
737      * @return the current capacity of this queue
738      */
739     public int getCapacity()
740     {
741         _tailLock.lock();
742         try
743         {
744             return _elements.length;
745         }
746         finally
747         {
748             _tailLock.unlock();
749         }
750     }
751 
752     /**
753      * @return the max capacity of this queue, or -1 if this queue is unbounded
754      */
755     public int getMaxCapacity()
756     {
757         return _maxCapacity;
758     }
759 
760     /*----------------------------------------------------------------------------*/
761     /* Implementation methods */
762     /*----------------------------------------------------------------------------*/
763 
764     private boolean grow()
765     {
766         if (_growCapacity <= 0)
767             return false;
768 
769         _tailLock.lock();
770         try
771         {
772 
773             _headLock.lock();
774             try
775             {
776                 final int head = _indexes[HEAD_OFFSET];
777                 final int tail = _indexes[TAIL_OFFSET];
778                 final int newTail;
779                 final int capacity = _elements.length;
780 
781                 Object[] elements = new Object[capacity + _growCapacity];
782 
783                 if (head < tail)
784                 {
785                     newTail = tail - head;
786                     System.arraycopy(_elements,head,elements,0,newTail);
787                 }
788                 else if (head > tail || _size.get() > 0)
789                 {
790                     newTail = capacity + tail - head;
791                     int cut = capacity - head;
792                     System.arraycopy(_elements,head,elements,0,cut);
793                     System.arraycopy(_elements,0,elements,cut,tail);
794                 }
795                 else
796                 {
797                     newTail = 0;
798                 }
799 
800                 _elements = elements;
801                 _indexes[HEAD_OFFSET] = 0;
802                 _indexes[TAIL_OFFSET] = newTail;
803                 return true;
804             }
805             finally
806             {
807                 _headLock.unlock();
808             }
809         }
810         finally
811         {
812             _tailLock.unlock();
813         }
814     }
815 
816     private class Itr implements ListIterator<E>
817     {
818         private final Object[] _elements;
819         private int _cursor;
820 
821         public Itr(Object[] elements, int offset)
822         {
823             _elements = elements;
824             _cursor = offset;
825         }
826 
827         @Override
828         public boolean hasNext()
829         {
830             return _cursor < _elements.length;
831         }
832 
833         @SuppressWarnings("unchecked")
834         @Override
835         public E next()
836         {
837             return (E)_elements[_cursor++];
838         }
839 
840         @Override
841         public boolean hasPrevious()
842         {
843             return _cursor > 0;
844         }
845 
846         @SuppressWarnings("unchecked")
847         @Override
848         public E previous()
849         {
850             return (E)_elements[--_cursor];
851         }
852 
853         @Override
854         public int nextIndex()
855         {
856             return _cursor + 1;
857         }
858 
859         @Override
860         public int previousIndex()
861         {
862             return _cursor - 1;
863         }
864 
865         @Override
866         public void remove()
867         {
868             throw new UnsupportedOperationException();
869         }
870 
871         @Override
872         public void set(E e)
873         {
874             throw new UnsupportedOperationException();
875         }
876 
877         @Override
878         public void add(E e)
879         {
880             throw new UnsupportedOperationException();
881         }
882     }
883 }