View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util;
20  
21  import java.util.AbstractList;
22  import java.util.Collection;
23  import java.util.Iterator;
24  import java.util.ListIterator;
25  import java.util.NoSuchElementException;
26  import java.util.Objects;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicInteger;
30  import java.util.concurrent.locks.Condition;
31  import java.util.concurrent.locks.Lock;
32  import java.util.concurrent.locks.ReentrantLock;
33  
34  /**
35   * A BlockingQueue backed by a circular array capable or growing.
36   * <p/>
37   * This queue is uses  a variant of the two lock queue algorithm to provide an
38   * efficient queue or list backed by a growable circular array.
39   * <p/>
40   * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is
41   * able to grow and provides a blocking put call.
42   * <p/>
43   * The queue has both a capacity (the size of the array currently allocated)
44   * and a max capacity (the maximum size that may be allocated), which defaults to
45   * {@link Integer#MAX_VALUE}.
46   *
47   * @param <E> The element type
48   */
49  public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
50  {
51      /**
52       * The head offset in the {@link #_indexes} array, displaced
53       * by 15 slots to avoid false sharing with the array length
54       * (stored before the first element of the array itself).
55       */
56      private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
57      /**
58       * The tail offset in the {@link #_indexes} array, displaced
59       * by 16 slots from the head to avoid false sharing with it.
60       */
61      private static final int TAIL_OFFSET = HEAD_OFFSET + MemoryUtils.getIntegersPerCacheLine();
62      /**
63       * Default initial capacity, 128.
64       */
65      public static final int DEFAULT_CAPACITY = 128;
66      /**
67       * Default growth factor, 64.
68       */
69      public static final int DEFAULT_GROWTH = 64;
70  
71      private final int _maxCapacity;
72      private final int _growCapacity;
73      /**
74       * Array that holds the head and tail indexes, separated by a cache line to avoid false sharing
75       */
76      private final int[] _indexes = new int[TAIL_OFFSET + 1];
77      private final Lock _tailLock = new ReentrantLock();
78      private final AtomicInteger _size = new AtomicInteger();
79      private final Lock _headLock = new ReentrantLock();
80      private final Condition _notEmpty = _headLock.newCondition();
81      private Object[] _elements;
82  
83      /**
84       * Creates an unbounded {@link BlockingArrayQueue} with default initial capacity and grow factor.
85       *
86       * @see #DEFAULT_CAPACITY
87       * @see #DEFAULT_GROWTH
88       */
89      public BlockingArrayQueue()
90      {
91          _elements = new Object[DEFAULT_CAPACITY];
92          _growCapacity = DEFAULT_GROWTH;
93          _maxCapacity = Integer.MAX_VALUE;
94      }
95  
96      /**
97       * Creates a bounded {@link BlockingArrayQueue} that does not grow.
98       * The capacity of the queue is fixed and equal to the given parameter.
99       *
100      * @param maxCapacity the maximum capacity
101      */
102     public BlockingArrayQueue(int maxCapacity)
103     {
104         _elements = new Object[maxCapacity];
105         _growCapacity = -1;
106         _maxCapacity = maxCapacity;
107     }
108 
109     /**
110      * Creates an unbounded {@link BlockingArrayQueue} that grows by the given parameter.
111      *
112      * @param capacity the initial capacity
113      * @param growBy   the growth factor
114      */
115     public BlockingArrayQueue(int capacity, int growBy)
116     {
117         _elements = new Object[capacity];
118         _growCapacity = growBy;
119         _maxCapacity = Integer.MAX_VALUE;
120     }
121 
122     /**
123      * Create a bounded {@link BlockingArrayQueue} that grows by the given parameter.
124      *
125      * @param capacity the initial capacity
126      * @param growBy   the growth factor
127      * @param maxCapacity    the maximum capacity
128      */
129     public BlockingArrayQueue(int capacity, int growBy, int maxCapacity)
130     {
131         if (capacity > maxCapacity)
132             throw new IllegalArgumentException();
133         _elements = new Object[capacity];
134         _growCapacity = growBy;
135         _maxCapacity = maxCapacity;
136     }
137 
138     /*----------------------------------------------------------------------------*/
139     /* Collection methods                                                         */
140     /*----------------------------------------------------------------------------*/
141 
142     @Override
143     public void clear()
144     {
145         final Lock tailLock = _tailLock;
146         tailLock.lock();
147         try
148         {
149             final Lock headLock = _headLock;
150             headLock.lock();
151             try
152             {
153                 _indexes[HEAD_OFFSET] = 0;
154                 _indexes[TAIL_OFFSET] = 0;
155                 _size.set(0);
156             }
157             finally
158             {
159                 headLock.unlock();
160             }
161         }
162         finally
163         {
164             tailLock.unlock();
165         }
166     }
167 
168     @Override
169     public int size()
170     {
171         return _size.get();
172     }
173 
174     @Override
175     public Iterator<E> iterator()
176     {
177         return listIterator();
178     }
179 
180     /*----------------------------------------------------------------------------*/
181     /* Queue methods                                                              */
182     /*----------------------------------------------------------------------------*/
183 
184     @SuppressWarnings("unchecked")
185     @Override
186     public E poll()
187     {
188         if (_size.get() == 0)
189             return null;
190 
191         E e = null;
192         final Lock headLock = _headLock;
193         headLock.lock(); // Size cannot shrink
194         try
195         {
196             if (_size.get() > 0)
197             {
198                 final int head = _indexes[HEAD_OFFSET];
199                 e = (E)_elements[head];
200                 _elements[head] = null;
201                 _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
202                 if (_size.decrementAndGet() > 0)
203                     _notEmpty.signal();
204             }
205         }
206         finally
207         {
208             headLock.unlock();
209         }
210         return e;
211     }
212 
213     @SuppressWarnings("unchecked")
214     @Override
215     public E peek()
216     {
217         if (_size.get() == 0)
218             return null;
219 
220         E e = null;
221         final Lock headLock = _headLock;
222         headLock.lock(); // Size cannot shrink
223         try
224         {
225             if (_size.get() > 0)
226                 e = (E)_elements[_indexes[HEAD_OFFSET]];
227         }
228         finally
229         {
230             headLock.unlock();
231         }
232         return e;
233     }
234 
235     @Override
236     public E remove()
237     {
238         E e = poll();
239         if (e == null)
240             throw new NoSuchElementException();
241         return e;
242     }
243 
244     @Override
245     public E element()
246     {
247         E e = peek();
248         if (e == null)
249             throw new NoSuchElementException();
250         return e;
251     }
252 
253     /*----------------------------------------------------------------------------*/
254     /* BlockingQueue methods                                                      */
255     /*----------------------------------------------------------------------------*/
256 
257     @Override
258     public boolean offer(E e)
259     {
260         Objects.requireNonNull(e);
261 
262         final Lock tailLock = _tailLock;
263         final Lock headLock = _headLock;
264         boolean notEmpty = false;
265         tailLock.lock(); // Size cannot grow... only shrink
266         try
267         {
268             int size = _size.get();
269             if (size >= _maxCapacity)
270                 return false;
271 
272             // Should we expand array?
273             if (size == _elements.length)
274             {
275                 headLock.lock();
276                 try
277                 {
278                     if (!grow())
279                         return false;
280                 }
281                 finally
282                 {
283                     headLock.unlock();
284                 }
285             }
286 
287             // Re-read head and tail after a possible grow
288             int tail = _indexes[TAIL_OFFSET];
289             _elements[tail] = e;
290             _indexes[TAIL_OFFSET] = (tail + 1) % _elements.length;
291             notEmpty = _size.getAndIncrement() == 0;
292         }
293         finally
294         {
295             tailLock.unlock();
296         }
297 
298         if (notEmpty)
299         {
300             headLock.lock();
301             try
302             {
303                 _notEmpty.signal();
304             }
305             finally
306             {
307                 headLock.unlock();
308             }
309         }
310 
311         return true;
312     }
313 
314     @Override
315     public boolean add(E e)
316     {
317         if (offer(e))
318             return true;
319         else
320             throw new IllegalStateException();
321     }
322 
323     @Override
324     public void put(E o) throws InterruptedException
325     {
326         // The mechanism to await and signal when the queue is full is not implemented
327         throw new UnsupportedOperationException();
328     }
329 
330     @Override
331     public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
332     {
333         // The mechanism to await and signal when the queue is full is not implemented
334         throw new UnsupportedOperationException();
335     }
336 
337     @SuppressWarnings("unchecked")
338     @Override
339     public E take() throws InterruptedException
340     {
341         E e = null;
342         final Lock headLock = _headLock;
343         headLock.lockInterruptibly(); // Size cannot shrink
344         try
345         {
346             try
347             {
348                 while (_size.get() == 0)
349                 {
350                     _notEmpty.await();
351                 }
352             }
353             catch (InterruptedException ie)
354             {
355                 _notEmpty.signal();
356                 throw ie;
357             }
358 
359             final int head = _indexes[HEAD_OFFSET];
360             e = (E)_elements[head];
361             _elements[head] = null;
362             _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
363 
364             if (_size.decrementAndGet() > 0)
365                 _notEmpty.signal();
366         }
367         finally
368         {
369             headLock.unlock();
370         }
371         return e;
372     }
373 
374     @SuppressWarnings("unchecked")
375     @Override
376     public E poll(long time, TimeUnit unit) throws InterruptedException
377     {
378         long nanos = unit.toNanos(time);
379         E e = null;
380         final Lock headLock = _headLock;
381         headLock.lockInterruptibly(); // Size cannot shrink
382         try
383         {
384             try
385             {
386                 while (_size.get() == 0)
387                 {
388                     if (nanos <= 0)
389                         return null;
390                     nanos = _notEmpty.awaitNanos(nanos);
391                 }
392             }
393             catch (InterruptedException x)
394             {
395                 _notEmpty.signal();
396                 throw x;
397             }
398 
399             int head = _indexes[HEAD_OFFSET];
400             e = (E)_elements[head];
401             _elements[head] = null;
402             _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
403 
404             if (_size.decrementAndGet() > 0)
405                 _notEmpty.signal();
406         }
407         finally
408         {
409             headLock.unlock();
410         }
411         return e;
412     }
413 
414     @Override
415     public boolean remove(Object o)
416     {
417         final Lock tailLock = _tailLock;
418         tailLock.lock();
419         try
420         {
421             final Lock headLock = _headLock;
422             headLock.lock();
423             try
424             {
425                 if (isEmpty())
426                     return false;
427 
428                 final int head = _indexes[HEAD_OFFSET];
429                 final int tail = _indexes[TAIL_OFFSET];
430                 final int capacity = _elements.length;
431 
432                 int i = head;
433                 while (true)
434                 {
435                     if (Objects.equals(_elements[i], o))
436                     {
437                         remove(i >= head ? i - head : capacity - head + i);
438                         return true;
439                     }
440                     ++i;
441                     if (i == capacity)
442                         i = 0;
443                     if (i == tail)
444                         return false;
445                 }
446             }
447             finally
448             {
449                 headLock.unlock();
450             }
451         }
452         finally
453         {
454             tailLock.unlock();
455         }
456     }
457 
458     @Override
459     public int remainingCapacity()
460     {
461         final Lock tailLock = _tailLock;
462         tailLock.lock();
463         try
464         {
465             final Lock headLock = _headLock;
466             headLock.lock();
467             try
468             {
469                 return getCapacity() - size();
470             }
471             finally
472             {
473                 headLock.unlock();
474             }
475         }
476         finally
477         {
478             tailLock.unlock();
479         }
480     }
481 
482     @Override
483     public int drainTo(Collection<? super E> c)
484     {
485         throw new UnsupportedOperationException();
486     }
487 
488     @Override
489     public int drainTo(Collection<? super E> c, int maxElements)
490     {
491         throw new UnsupportedOperationException();
492     }
493 
494     /*----------------------------------------------------------------------------*/
495     /* List methods                                                               */
496     /*----------------------------------------------------------------------------*/
497 
498     @SuppressWarnings("unchecked")
499     @Override
500     public E get(int index)
501     {
502         final Lock tailLock = _tailLock;
503         tailLock.lock();
504         try
505         {
506             final Lock headLock = _headLock;
507             headLock.lock();
508             try
509             {
510                 if (index < 0 || index >= _size.get())
511                     throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
512                 int i = _indexes[HEAD_OFFSET] + index;
513                 int capacity = _elements.length;
514                 if (i >= capacity)
515                     i -= capacity;
516                 return (E)_elements[i];
517             }
518             finally
519             {
520                 headLock.unlock();
521             }
522         }
523         finally
524         {
525             tailLock.unlock();
526         }
527     }
528 
529     @Override
530     public void add(int index, E e)
531     {
532         if (e == null)
533             throw new NullPointerException();
534 
535         final Lock tailLock = _tailLock;
536         tailLock.lock();
537         try
538         {
539             final Lock headLock = _headLock;
540             headLock.lock();
541             try
542             {
543                 final int size = _size.get();
544 
545                 if (index < 0 || index > size)
546                     throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
547 
548                 if (index == size)
549                 {
550                     add(e);
551                 }
552                 else
553                 {
554                     if (_indexes[TAIL_OFFSET] == _indexes[HEAD_OFFSET])
555                         if (!grow())
556                             throw new IllegalStateException("full");
557 
558                     // Re-read head and tail after a possible grow
559                     int i = _indexes[HEAD_OFFSET] + index;
560                     int capacity = _elements.length;
561 
562                     if (i >= capacity)
563                         i -= capacity;
564 
565                     _size.incrementAndGet();
566                     int tail = _indexes[TAIL_OFFSET];
567                     _indexes[TAIL_OFFSET] = tail = (tail + 1) % capacity;
568 
569                     if (i < tail)
570                     {
571                         System.arraycopy(_elements, i, _elements, i + 1, tail - i);
572                         _elements[i] = e;
573                     }
574                     else
575                     {
576                         if (tail > 0)
577                         {
578                             System.arraycopy(_elements, 0, _elements, 1, tail);
579                             _elements[0] = _elements[capacity - 1];
580                         }
581 
582                         System.arraycopy(_elements, i, _elements, i + 1, capacity - i - 1);
583                         _elements[i] = e;
584                     }
585                 }
586             }
587             finally
588             {
589                 headLock.unlock();
590             }
591         }
592         finally
593         {
594             tailLock.unlock();
595         }
596     }
597 
598     @SuppressWarnings("unchecked")
599     @Override
600     public E set(int index, E e)
601     {
602         Objects.requireNonNull(e);
603 
604         final Lock tailLock = _tailLock;
605         tailLock.lock();
606         try
607         {
608             final Lock headLock = _headLock;
609             headLock.lock();
610             try
611             {
612                 if (index < 0 || index >= _size.get())
613                     throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
614 
615                 int i = _indexes[HEAD_OFFSET] + index;
616                 int capacity = _elements.length;
617                 if (i >= capacity)
618                     i -= capacity;
619                 E old = (E)_elements[i];
620                 _elements[i] = e;
621                 return old;
622             }
623             finally
624             {
625                 headLock.unlock();
626             }
627         }
628         finally
629         {
630             tailLock.unlock();
631         }
632     }
633 
634     @SuppressWarnings("unchecked")
635     @Override
636     public E remove(int index)
637     {
638         final Lock tailLock = _tailLock;
639         tailLock.lock();
640         try
641         {
642             final Lock headLock = _headLock;
643             headLock.lock();
644             try
645             {
646                 if (index < 0 || index >= _size.get())
647                     throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
648 
649                 int i = _indexes[HEAD_OFFSET] + index;
650                 int capacity = _elements.length;
651                 if (i >= capacity)
652                     i -= capacity;
653                 E old = (E)_elements[i];
654 
655                 int tail = _indexes[TAIL_OFFSET];
656                 if (i < tail)
657                 {
658                     System.arraycopy(_elements, i + 1, _elements, i, tail - i);
659                     --_indexes[TAIL_OFFSET];
660                 }
661                 else
662                 {
663                     System.arraycopy(_elements, i + 1, _elements, i, capacity - i - 1);
664                     _elements[capacity - 1] = _elements[0];
665                     if (tail > 0)
666                     {
667                         System.arraycopy(_elements, 1, _elements, 0, tail);
668                         --_indexes[TAIL_OFFSET];
669                     }
670                     else
671                     {
672                         _indexes[TAIL_OFFSET] = capacity - 1;
673                     }
674                     _elements[_indexes[TAIL_OFFSET]] = null;
675                 }
676 
677                 _size.decrementAndGet();
678 
679                 return old;
680             }
681             finally
682             {
683                 headLock.unlock();
684             }
685         }
686         finally
687         {
688             tailLock.unlock();
689         }
690     }
691 
692     @Override
693     public ListIterator<E> listIterator(int index)
694     {
695         final Lock tailLock = _tailLock;
696         tailLock.lock();
697         try
698         {
699             final Lock headLock = _headLock;
700             headLock.lock();
701             try
702             {
703                 Object[] elements = new Object[size()];
704                 if (size() > 0)
705                 {
706                     int head = _indexes[HEAD_OFFSET];
707                     int tail = _indexes[TAIL_OFFSET];
708                     if (head < tail)
709                     {
710                         System.arraycopy(_elements, head, elements, 0, tail - head);
711                     }
712                     else
713                     {
714                         int chunk = _elements.length - head;
715                         System.arraycopy(_elements, head, elements, 0, chunk);
716                         System.arraycopy(_elements, 0, elements, chunk, tail);
717                     }
718                 }
719                 return new Itr(elements, index);
720             }
721             finally
722             {
723                 headLock.unlock();
724             }
725         }
726         finally
727         {
728             tailLock.unlock();
729         }
730     }
731 
732     /*----------------------------------------------------------------------------*/
733     /* Additional methods                                                         */
734     /*----------------------------------------------------------------------------*/
735 
736     /**
737      * @return the current capacity of this queue
738      */
739     public int getCapacity()
740     {
741         return _elements.length;
742     }
743 
744     /**
745      * @return the max capacity of this queue, or -1 if this queue is unbounded
746      */
747     public int getMaxCapacity()
748     {
749         return _maxCapacity;
750     }
751 
752     /*----------------------------------------------------------------------------*/
753     /* Implementation methods                                                     */
754     /*----------------------------------------------------------------------------*/
755 
756     private boolean grow()
757     {
758         if (_growCapacity <= 0)
759             return false;
760 
761         final Lock tailLock = _tailLock;
762         tailLock.lock();
763         try
764         {
765             final Lock headLock = _headLock;
766             headLock.lock();
767             try
768             {
769                 final int head = _indexes[HEAD_OFFSET];
770                 final int tail = _indexes[TAIL_OFFSET];
771                 final int newTail;
772                 final int capacity = _elements.length;
773 
774                 Object[] elements = new Object[capacity + _growCapacity];
775 
776                 if (head < tail)
777                 {
778                     newTail = tail - head;
779                     System.arraycopy(_elements, head, elements, 0, newTail);
780                 }
781                 else if (head > tail || _size.get() > 0)
782                 {
783                     newTail = capacity + tail - head;
784                     int cut = capacity - head;
785                     System.arraycopy(_elements, head, elements, 0, cut);
786                     System.arraycopy(_elements, 0, elements, cut, tail);
787                 }
788                 else
789                 {
790                     newTail = 0;
791                 }
792 
793                 _elements = elements;
794                 _indexes[HEAD_OFFSET] = 0;
795                 _indexes[TAIL_OFFSET] = newTail;
796                 return true;
797             }
798             finally
799             {
800                 headLock.unlock();
801             }
802         }
803         finally
804         {
805             tailLock.unlock();
806         }
807     }
808 
809     private class Itr implements ListIterator<E>
810     {
811         private final Object[] _elements;
812         private int _cursor;
813 
814         public Itr(Object[] elements, int offset)
815         {
816             _elements = elements;
817             _cursor = offset;
818         }
819 
820         @Override
821         public boolean hasNext()
822         {
823             return _cursor < _elements.length;
824         }
825 
826         @SuppressWarnings("unchecked")
827         @Override
828         public E next()
829         {
830             return (E)_elements[_cursor++];
831         }
832 
833         @Override
834         public boolean hasPrevious()
835         {
836             return _cursor > 0;
837         }
838 
839         @SuppressWarnings("unchecked")
840         @Override
841         public E previous()
842         {
843             return (E)_elements[--_cursor];
844         }
845 
846         @Override
847         public int nextIndex()
848         {
849             return _cursor + 1;
850         }
851 
852         @Override
853         public int previousIndex()
854         {
855             return _cursor - 1;
856         }
857 
858         @Override
859         public void remove()
860         {
861             throw new UnsupportedOperationException();
862         }
863 
864         @Override
865         public void set(E e)
866         {
867             throw new UnsupportedOperationException();
868         }
869 
870         @Override
871         public void add(E e)
872         {
873             throw new UnsupportedOperationException();
874         }
875     }
876 }