View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util;
20  
21  import java.util.AbstractList;
22  import java.util.Collection;
23  import java.util.NoSuchElementException;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicInteger;
27  import java.util.concurrent.locks.Condition;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  
31  /* ------------------------------------------------------------ */
32  /** Queue backed by a circular array.
33   * 
34   * This queue is uses  a variant of the two lock queue algorithm to
35   * provide an efficient queue or list backed by a growable circular
36   * array.  This queue also has a partial implementation of 
37   * {@link java.util.concurrent.BlockingQueue}, specifically the {@link #take()} and 
38   * {@link #poll(long, TimeUnit)} methods.  
39   * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is
40   * able to grow and provides a blocking put call.
41   * <p>
42   * The queue has both a capacity (the size of the array currently allocated)
43   * and a limit (the maximum size that may be allocated), which defaults to 
44   * {@link Integer#MAX_VALUE}.
45   * 
46   * @param <E> The element type
47   */
48  public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
49  {
50      public final int DEFAULT_CAPACITY=128;
51      public final int DEFAULT_GROWTH=64;
52      private final int _limit;
53      private final AtomicInteger _size=new AtomicInteger();
54      private final int _growCapacity;
55      
56      private volatile int _capacity;
57      private Object[] _elements;
58      
59      private final ReentrantLock _headLock = new ReentrantLock();
60      private final Condition _notEmpty = _headLock.newCondition();
61      private int _head;
62  
63      // spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing
64      // TODO verify this has benefits
65      private long _space0;
66      private long _space1;
67      private long _space2;
68      private long _space3;
69      private long _space4;
70      private long _space5;
71      private long _space6;
72      private long _space7;
73      
74      private final ReentrantLock _tailLock = new ReentrantLock();
75      private int _tail;
76      
77  
78      /* ------------------------------------------------------------ */
79      /** Create a growing partially blocking Queue
80       * 
81       */
82      public BlockingArrayQueue()
83      {
84          _elements=new Object[DEFAULT_CAPACITY];
85          _growCapacity=DEFAULT_GROWTH;
86          _capacity=_elements.length;
87          _limit=Integer.MAX_VALUE;
88      }
89  
90      /* ------------------------------------------------------------ */
91      /** Create a fixed size partially blocking Queue
92       * @param limit The initial capacity and the limit.
93       */
94      public BlockingArrayQueue(int limit)
95      {
96          _elements=new Object[limit];
97          _capacity=_elements.length;
98          _growCapacity=-1;
99          _limit=limit;
100     }
101 
102     /* ------------------------------------------------------------ */
103     /** Create a growing partially blocking Queue.
104      * @param capacity Initial capacity
105      * @param growBy Incremental capacity.
106      */
107     public BlockingArrayQueue(int capacity,int growBy)
108     {
109         _elements=new Object[capacity];
110         _capacity=_elements.length;
111         _growCapacity=growBy;
112         _limit=Integer.MAX_VALUE;
113     }
114 
115     /* ------------------------------------------------------------ */
116     /** Create a growing limited partially blocking Queue.
117      * @param capacity Initial capacity
118      * @param growBy Incremental capacity.
119      * @param limit maximum capacity.
120      */
121     public BlockingArrayQueue(int capacity,int growBy,int limit)
122     {
123         if (capacity>limit)
124             throw new IllegalArgumentException();
125         
126         _elements=new Object[capacity];
127         _capacity=_elements.length;
128         _growCapacity=growBy;
129         _limit=limit;
130     }
131 
132     /* ------------------------------------------------------------ */
133     public int getCapacity()
134     {
135         return _capacity;
136     }
137 
138     /* ------------------------------------------------------------ */
139     public int getLimit()
140     {
141         return _limit;
142     }
143     
144     /* ------------------------------------------------------------ */
145     @Override
146     public boolean add(E e)
147     {
148         return offer(e);
149     }
150     
151     /* ------------------------------------------------------------ */
152     public E element()
153     {
154         E e = peek();
155         if (e==null)
156             throw new NoSuchElementException();
157         return e;
158     }
159     
160     /* ------------------------------------------------------------ */
161     @SuppressWarnings("unchecked")
162     public E peek()
163     {
164         if (_size.get() == 0)
165             return null;
166         
167         E e = null;
168         _headLock.lock(); // Size cannot shrink
169         try 
170         {
171             if (_size.get() > 0) 
172                 e = (E)_elements[_head];
173         } 
174         finally 
175         {
176             _headLock.unlock();
177         }
178         
179         return e;
180     }
181 
182     /* ------------------------------------------------------------ */
183     public boolean offer(E e)
184     {
185         if (e == null) 
186             throw new NullPointerException();
187         
188         boolean not_empty=false;
189         _tailLock.lock();  // size cannot grow... only shrink
190         try 
191         {
192             if (_size.get() >= _limit) 
193                 return false;
194             
195             // should we expand array?
196             if (_size.get()==_capacity)
197             {
198                 _headLock.lock();   // Need to grow array
199                 try
200                 {
201                     if (!grow())
202                         return false;
203                 }
204                 finally
205                 {
206                     _headLock.unlock();
207                 }
208             }
209 
210             // add the element
211             _elements[_tail]=e;
212             _tail=(_tail+1)%_capacity;
213 
214             not_empty=0==_size.getAndIncrement();
215             
216         } 
217         finally 
218         {
219             _tailLock.unlock();
220         }
221         
222         if (not_empty)
223         {
224             _headLock.lock();
225             try
226             {
227                 _notEmpty.signal();
228             }
229             finally
230             {
231                 _headLock.unlock();
232             }
233         }  
234 
235         return true;
236     }
237 
238 
239     /* ------------------------------------------------------------ */
240     @SuppressWarnings("unchecked")
241     public E poll()
242     {
243         if (_size.get() == 0)
244             return null;
245         
246         E e = null;
247         _headLock.lock(); // Size cannot shrink
248         try 
249         {
250             if (_size.get() > 0) 
251             {
252                 final int head=_head;
253                 e = (E)_elements[head];
254                 _elements[head]=null;
255                 _head=(head+1)%_capacity;
256                 
257                 if (_size.decrementAndGet()>0)
258                     _notEmpty.signal();
259             }
260         } 
261         finally 
262         {
263             _headLock.unlock();
264         }
265         
266         return e;
267     }
268 
269     /* ------------------------------------------------------------ */
270     /**
271      * Retrieves and removes the head of this queue, waiting
272      * if no elements are present on this queue.
273      * @return the head of this queue
274      * @throws InterruptedException if interrupted while waiting.
275      */
276     @SuppressWarnings("unchecked")
277     public E take() throws InterruptedException
278     {
279         E e = null;
280         _headLock.lockInterruptibly();  // Size cannot shrink
281         try 
282         {
283             try 
284             {
285                 while (_size.get() == 0)
286                 {
287                     _notEmpty.await();
288                 }
289             } 
290             catch (InterruptedException ie) 
291             {
292                 _notEmpty.signal();
293                 throw ie;
294             }
295 
296             final int head=_head;
297             e = (E)_elements[head];
298             _elements[head]=null;
299             _head=(head+1)%_capacity;
300 
301             if (_size.decrementAndGet()>0)
302                 _notEmpty.signal();
303         } 
304         finally 
305         {
306             _headLock.unlock();
307         }
308         
309         return e;
310     }
311 
312     /* ------------------------------------------------------------ */
313     /**
314      * Retrieves and removes the head of this queue, waiting
315      * if necessary up to the specified wait time if no elements are
316      * present on this queue.
317      * @param time how long to wait before giving up, in units of
318      * <tt>unit</tt>
319      * @param unit a <tt>TimeUnit</tt> determining how to interpret the
320      * <tt>timeout</tt> parameter
321      * @return the head of this queue, or <tt>null</tt> if the
322      * specified waiting time elapses before an element is present.
323      * @throws InterruptedException if interrupted while waiting.
324      */
325     @SuppressWarnings("unchecked")
326     public E poll(long time, TimeUnit unit) throws InterruptedException
327     {
328         
329         E e = null;
330 
331         long nanos = unit.toNanos(time);
332         
333         _headLock.lockInterruptibly(); // Size cannot shrink
334         try 
335         {    
336             try 
337             {
338                 while (_size.get() == 0)
339                 {
340                     if (nanos<=0)
341                         return null;
342                     nanos = _notEmpty.awaitNanos(nanos);
343                 }
344             } 
345             catch (InterruptedException ie) 
346             {
347                 _notEmpty.signal();
348                 throw ie;
349             }
350 
351             e = (E)_elements[_head];
352             _elements[_head]=null;
353             _head=(_head+1)%_capacity;
354 
355             if (_size.decrementAndGet()>0)
356                 _notEmpty.signal();
357         } 
358         finally 
359         {
360             _headLock.unlock();
361         }
362         
363         return e;
364     }
365 
366     /* ------------------------------------------------------------ */
367     public E remove()
368     {
369         E e=poll();
370         if (e==null)
371             throw new NoSuchElementException();
372         return e;
373     }
374 
375     /* ------------------------------------------------------------ */
376     @Override
377     public void clear()
378     {
379         _tailLock.lock();
380         try
381         {
382             _headLock.lock();
383             try
384             {
385                 _head=0;
386                 _tail=0;
387                 _size.set(0);
388             }
389             finally
390             {
391                 _headLock.unlock();
392             }
393         }
394         finally
395         {
396             _tailLock.unlock();
397         }
398     }
399 
400     /* ------------------------------------------------------------ */
401     @Override
402     public boolean isEmpty()
403     {
404         return _size.get()==0;
405     }
406 
407     /* ------------------------------------------------------------ */
408     @Override
409     public int size()
410     {
411         return _size.get();
412     }
413 
414     /* ------------------------------------------------------------ */
415     @SuppressWarnings("unchecked")
416     @Override
417     public E get(int index)
418     {
419         _tailLock.lock();
420         try
421         {
422             _headLock.lock();
423             try
424             {
425                 if (index<0 || index>=_size.get())
426                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
427                 int i = _head+index;
428                 if (i>=_capacity)
429                     i-=_capacity;
430                 return (E)_elements[i];
431             }
432             finally
433             {
434                 _headLock.unlock();
435             }
436         }
437         finally
438         {
439             _tailLock.unlock();
440         }
441     }
442     
443     /* ------------------------------------------------------------ */
444     @Override
445     public E remove(int index)
446     {
447         _tailLock.lock();
448         try
449         {
450             _headLock.lock();
451             try
452             {
453 
454                 if (index<0 || index>=_size.get())
455                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
456 
457                 int i = _head+index;
458                 if (i>=_capacity)
459                     i-=_capacity;
460                 @SuppressWarnings("unchecked")
461                 E old=(E)_elements[i];
462 
463                 if (i<_tail)
464                 {
465                     System.arraycopy(_elements,i+1,_elements,i,_tail-i);
466                     _tail--;
467                     _size.decrementAndGet();
468                 }
469                 else
470                 {
471                     System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
472                     if (_tail>0)
473                     {
474                         _elements[_capacity]=_elements[0];
475                         System.arraycopy(_elements,1,_elements,0,_tail-1);
476                         _tail--;
477                     }
478                     else
479                         _tail=_capacity-1;
480 
481                     _size.decrementAndGet();
482                 }
483 
484                 return old;
485             }
486             finally
487             {
488                 _headLock.unlock();
489             }
490         }
491         finally
492         {
493             _tailLock.unlock();
494         }
495     }
496 
497     /* ------------------------------------------------------------ */
498     @Override
499     public E set(int index, E e)
500     {
501         if (e == null) 
502             throw new NullPointerException();
503 
504         _tailLock.lock();
505         try
506         {
507             _headLock.lock();
508             try
509             {
510 
511                 if (index<0 || index>=_size.get())
512                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
513 
514                 int i = _head+index;
515                 if (i>=_capacity)
516                     i-=_capacity;
517                 @SuppressWarnings("unchecked")
518                 E old=(E)_elements[i];
519                 _elements[i]=e;
520                 return old;
521             }
522             finally
523             {
524                 _headLock.unlock();
525             }
526         }
527         finally
528         {
529             _tailLock.unlock();
530         }
531     }
532     
533     /* ------------------------------------------------------------ */
534     @Override
535     public void add(int index, E e)
536     {
537         if (e == null) 
538             throw new NullPointerException();
539 
540         _tailLock.lock();
541         try
542         {
543             _headLock.lock();
544             try
545             {
546 
547                 if (index<0 || index>_size.get())
548                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
549 
550                 if (index==_size.get())
551                 {
552                     add(e);
553                 }
554                 else
555                 {
556                     if (_tail==_head)
557                         if (!grow())
558                             throw new IllegalStateException("full");
559 
560                     int i = _head+index;
561                     if (i>=_capacity)
562                         i-=_capacity;
563 
564                     _size.incrementAndGet();
565                     _tail=(_tail+1)%_capacity;
566 
567 
568                     if (i<_tail)
569                     {
570                         System.arraycopy(_elements,i,_elements,i+1,_tail-i);
571                         _elements[i]=e;
572                     }
573                     else
574                     {
575                         if (_tail>0)
576                         {
577                             System.arraycopy(_elements,0,_elements,1,_tail);
578                             _elements[0]=_elements[_capacity-1];
579                         }
580 
581                         System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
582                         _elements[i]=e;
583                     }
584                 }
585             }
586             finally
587             {
588                 _headLock.unlock();
589             }
590         }
591         finally
592         {
593             _tailLock.unlock();
594         }
595     }
596 
597     /* ------------------------------------------------------------ */
598     private boolean grow()
599     {
600         if (_growCapacity<=0)
601             return false;
602 
603         _tailLock.lock();
604         try
605         {
606             _headLock.lock();
607             try
608             {
609                 final int head=_head;
610                 final int tail=_tail;
611                 final int new_tail;
612 
613                 Object[] elements=new Object[_capacity+_growCapacity];
614 
615                 if (head<tail)
616                 {
617                     new_tail=tail-head;
618                     System.arraycopy(_elements,head,elements,0,new_tail);
619                 }
620                 else if (head>tail || _size.get()>0)
621                 {
622                     new_tail=_capacity+tail-head;
623                     int cut=_capacity-head;
624                     System.arraycopy(_elements,head,elements,0,cut);
625                     System.arraycopy(_elements,0,elements,cut,tail);
626                 }
627                 else
628                 {
629                     new_tail=0;
630                 }
631 
632                 _elements=elements;
633                 _capacity=_elements.length;
634                 _head=0;
635                 _tail=new_tail; 
636                 return true;
637             }
638             finally
639             {
640                 _headLock.unlock();
641             }
642         }
643         finally
644         {
645             _tailLock.unlock();
646         }
647 
648     }
649 
650     /* ------------------------------------------------------------ */
651     public int drainTo(Collection<? super E> c)
652     {
653         throw new UnsupportedOperationException();
654     }
655 
656     /* ------------------------------------------------------------ */
657     public int drainTo(Collection<? super E> c, int maxElements)
658     {
659         throw new UnsupportedOperationException();
660     }
661 
662     /* ------------------------------------------------------------ */
663     public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
664     {
665         throw new UnsupportedOperationException();
666     }
667 
668     /* ------------------------------------------------------------ */
669     public void put(E o) throws InterruptedException
670     {
671         if (!add(o))
672             throw new IllegalStateException("full");
673     }
674 
675     /* ------------------------------------------------------------ */
676     public int remainingCapacity()
677     {
678         _tailLock.lock();
679         try
680         {
681             _headLock.lock();
682             try
683             {
684                 return getCapacity()-size();
685             }
686             finally
687             {
688                 _headLock.unlock();
689             }
690         }
691         finally
692         {
693             _tailLock.unlock();
694         }
695     }
696     
697 
698     /* ------------------------------------------------------------ */
699     long sumOfSpace()
700     {
701         // this method exists to stop clever optimisers removing the spacers
702         return _space0++ +_space1++ +_space2++ +_space3++ +_space4++ +_space5++ +_space6++ +_space7++; 
703     }
704 }