View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.util;
15  
16  import java.util.AbstractList;
17  import java.util.Collection;
18  import java.util.NoSuchElementException;
19  import java.util.concurrent.BlockingQueue;
20  import java.util.concurrent.TimeUnit;
21  import java.util.concurrent.atomic.AtomicInteger;
22  import java.util.concurrent.locks.Condition;
23  import java.util.concurrent.locks.ReentrantLock;
24  
25  
26  /* ------------------------------------------------------------ */
27  /** Queue backed by a circular array.
28   * 
29   * This queue is uses  a variant of the two lock queue algorithm to
30   * provide an efficient queue or list backed by a growable circular
31   * array.  This queue also has a partial implementation of 
32   * {@link java.util.concurrent.BlockingQueue}, specifically the {@link #take()} and 
33   * {@link #poll(long, TimeUnit)} methods.  
34   * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is
35   * able to grow and provides a blocking put call.
36   * <p>
37   * The queue has both a capacity (the size of the array currently allocated)
38   * and a limit (the maximum size that may be allocated), which defaults to 
39   * {@link Integer#MAX_VALUE}.
40   * 
41   * @param <E> The element type
42   */
43  public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
44  {
45      public final int DEFAULT_CAPACITY=128;
46      public final int DEFAULT_GROWTH=64;
47      private final int _limit;
48      private final AtomicInteger _size=new AtomicInteger();
49      private final int _growCapacity;
50      
51      private volatile int _capacity;
52      private Object[] _elements;
53      
54      private final ReentrantLock _headLock = new ReentrantLock();
55      private final Condition _notEmpty = _headLock.newCondition();
56      private int _head;
57  
58      // spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing
59      // TODO verify this has benefits
60      private long _space0;
61      private long _space1;
62      private long _space2;
63      private long _space3;
64      private long _space4;
65      private long _space5;
66      private long _space6;
67      private long _space7;
68      
69      private final ReentrantLock _tailLock = new ReentrantLock();
70      private int _tail;
71      
72  
73      /* ------------------------------------------------------------ */
74      /** Create a growing partially blocking Queue
75       * 
76       */
77      public BlockingArrayQueue()
78      {
79          _elements=new Object[DEFAULT_CAPACITY];
80          _growCapacity=DEFAULT_GROWTH;
81          _capacity=_elements.length;
82          _limit=Integer.MAX_VALUE;
83      }
84  
85      /* ------------------------------------------------------------ */
86      /** Create a fixed size partially blocking Queue
87       * @param limit The initial capacity and the limit.
88       */
89      public BlockingArrayQueue(int limit)
90      {
91          _elements=new Object[limit];
92          _capacity=_elements.length;
93          _growCapacity=-1;
94          _limit=limit;
95      }
96  
97      /* ------------------------------------------------------------ */
98      /** Create a growing partially blocking Queue.
99       * @param capacity Initial capacity
100      * @param growBy Incremental capacity.
101      */
102     public BlockingArrayQueue(int capacity,int growBy)
103     {
104         _elements=new Object[capacity];
105         _capacity=_elements.length;
106         _growCapacity=growBy;
107         _limit=Integer.MAX_VALUE;
108     }
109 
110     /* ------------------------------------------------------------ */
111     /** Create a growing limited partially blocking Queue.
112      * @param capacity Initial capacity
113      * @param growBy Incremental capacity.
114      * @param limit maximum capacity.
115      */
116     public BlockingArrayQueue(int capacity,int growBy,int limit)
117     {
118         if (capacity>limit)
119             throw new IllegalArgumentException();
120         
121         _elements=new Object[capacity];
122         _capacity=_elements.length;
123         _growCapacity=growBy;
124         _limit=limit;
125     }
126 
127     /* ------------------------------------------------------------ */
128     public int getCapacity()
129     {
130         return _capacity;
131     }
132 
133     /* ------------------------------------------------------------ */
134     public int getLimit()
135     {
136         return _limit;
137     }
138     
139     /* ------------------------------------------------------------ */
140     @Override
141     public boolean add(E e)
142     {
143         return offer(e);
144     }
145     
146     /* ------------------------------------------------------------ */
147     public E element()
148     {
149         E e = peek();
150         if (e==null)
151             throw new NoSuchElementException();
152         return e;
153     }
154     
155     /* ------------------------------------------------------------ */
156     @SuppressWarnings("unchecked")
157     public E peek()
158     {
159         if (_size.get() == 0)
160             return null;
161         
162         E e = null;
163         _headLock.lock(); // Size cannot shrink
164         try 
165         {
166             if (_size.get() > 0) 
167                 e = (E)_elements[_head];
168         } 
169         finally 
170         {
171             _headLock.unlock();
172         }
173         
174         return e;
175     }
176 
177     /* ------------------------------------------------------------ */
178     public boolean offer(E e)
179     {
180         if (e == null) 
181             throw new NullPointerException();
182         
183         boolean not_empty=false;
184         _tailLock.lock();  // size cannot grow... only shrink
185         try 
186         {
187             if (_size.get() >= _limit) 
188                 return false;
189             
190             // should we expand array?
191             if (_size.get()==_capacity)
192             {
193                 _headLock.lock();   // Need to grow array
194                 try
195                 {
196                     if (!grow())
197                         return false;
198                 }
199                 finally
200                 {
201                     _headLock.unlock();
202                 }
203             }
204 
205             // add the element
206             _elements[_tail]=e;
207             _tail=(_tail+1)%_capacity;
208 
209             not_empty=0==_size.getAndIncrement();
210             
211         } 
212         finally 
213         {
214             _tailLock.unlock();
215         }
216         
217         if (not_empty)
218         {
219             _headLock.lock();
220             try
221             {
222                 _notEmpty.signal();
223             }
224             finally
225             {
226                 _headLock.unlock();
227             }
228         }  
229 
230         return true;
231     }
232 
233 
234     /* ------------------------------------------------------------ */
235     @SuppressWarnings("unchecked")
236     public E poll()
237     {
238         if (_size.get() == 0)
239             return null;
240         
241         E e = null;
242         _headLock.lock(); // Size cannot shrink
243         try 
244         {
245             if (_size.get() > 0) 
246             {
247                 final int head=_head;
248                 e = (E)_elements[head];
249                 _elements[head]=null;
250                 _head=(head+1)%_capacity;
251                 
252                 if (_size.decrementAndGet()>0)
253                     _notEmpty.signal();
254             }
255         } 
256         finally 
257         {
258             _headLock.unlock();
259         }
260         
261         return e;
262     }
263 
264     /* ------------------------------------------------------------ */
265     /**
266      * Retrieves and removes the head of this queue, waiting
267      * if no elements are present on this queue.
268      * @return the head of this queue
269      * @throws InterruptedException if interrupted while waiting.
270      */
271     @SuppressWarnings("unchecked")
272     public E take() throws InterruptedException
273     {
274         E e = null;
275         _headLock.lockInterruptibly();  // Size cannot shrink
276         try 
277         {
278             try 
279             {
280                 while (_size.get() == 0)
281                 {
282                     _notEmpty.await();
283                 }
284             } 
285             catch (InterruptedException ie) 
286             {
287                 _notEmpty.signal();
288                 throw ie;
289             }
290 
291             final int head=_head;
292             e = (E)_elements[head];
293             _elements[head]=null;
294             _head=(head+1)%_capacity;
295 
296             if (_size.decrementAndGet()>0)
297                 _notEmpty.signal();
298         } 
299         finally 
300         {
301             _headLock.unlock();
302         }
303         
304         return e;
305     }
306 
307     /* ------------------------------------------------------------ */
308     /**
309      * Retrieves and removes the head of this queue, waiting
310      * if necessary up to the specified wait time if no elements are
311      * present on this queue.
312      * @param time how long to wait before giving up, in units of
313      * <tt>unit</tt>
314      * @param unit a <tt>TimeUnit</tt> determining how to interpret the
315      * <tt>timeout</tt> parameter
316      * @return the head of this queue, or <tt>null</tt> if the
317      * specified waiting time elapses before an element is present.
318      * @throws InterruptedException if interrupted while waiting.
319      */
320     @SuppressWarnings("unchecked")
321     public E poll(long time, TimeUnit unit) throws InterruptedException
322     {
323         
324         E e = null;
325 
326         long nanos = unit.toNanos(time);
327         
328         _headLock.lockInterruptibly(); // Size cannot shrink
329         try 
330         {    
331             try 
332             {
333                 while (_size.get() == 0)
334                 {
335                     if (nanos<=0)
336                         return null;
337                     nanos = _notEmpty.awaitNanos(nanos);
338                 }
339             } 
340             catch (InterruptedException ie) 
341             {
342                 _notEmpty.signal();
343                 throw ie;
344             }
345 
346             e = (E)_elements[_head];
347             _elements[_head]=null;
348             _head=(_head+1)%_capacity;
349 
350             if (_size.decrementAndGet()>0)
351                 _notEmpty.signal();
352         } 
353         finally 
354         {
355             _headLock.unlock();
356         }
357         
358         return e;
359     }
360 
361     /* ------------------------------------------------------------ */
362     public E remove()
363     {
364         E e=poll();
365         if (e==null)
366             throw new NoSuchElementException();
367         return e;
368     }
369 
370     /* ------------------------------------------------------------ */
371     @Override
372     public void clear()
373     {
374         _tailLock.lock();
375         try
376         {
377             _headLock.lock();
378             try
379             {
380                 _head=0;
381                 _tail=0;
382                 _size.set(0);
383             }
384             finally
385             {
386                 _headLock.unlock();
387             }
388         }
389         finally
390         {
391             _tailLock.unlock();
392         }
393     }
394 
395     /* ------------------------------------------------------------ */
396     @Override
397     public boolean isEmpty()
398     {
399         return _size.get()==0;
400     }
401 
402     /* ------------------------------------------------------------ */
403     @Override
404     public int size()
405     {
406         return _size.get();
407     }
408 
409     /* ------------------------------------------------------------ */
410     @SuppressWarnings("unchecked")
411     @Override
412     public E get(int index)
413     {
414         _tailLock.lock();
415         try
416         {
417             _headLock.lock();
418             try
419             {
420                 if (index<0 || index>=_size.get())
421                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
422                 int i = _head+index;
423                 if (i>=_capacity)
424                     i-=_capacity;
425                 return (E)_elements[i];
426             }
427             finally
428             {
429                 _headLock.unlock();
430             }
431         }
432         finally
433         {
434             _tailLock.unlock();
435         }
436     }
437     
438     /* ------------------------------------------------------------ */
439     @Override
440     public E remove(int index)
441     {
442         _tailLock.lock();
443         try
444         {
445             _headLock.lock();
446             try
447             {
448 
449                 if (index<0 || index>=_size.get())
450                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
451 
452                 int i = _head+index;
453                 if (i>=_capacity)
454                     i-=_capacity;
455                 @SuppressWarnings("unchecked")
456                 E old=(E)_elements[i];
457 
458                 if (i<_tail)
459                 {
460                     System.arraycopy(_elements,i+1,_elements,i,_tail-i);
461                     _tail--;
462                     _size.decrementAndGet();
463                 }
464                 else
465                 {
466                     System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
467                     if (_tail>0)
468                     {
469                         _elements[_capacity]=_elements[0];
470                         System.arraycopy(_elements,1,_elements,0,_tail-1);
471                         _tail--;
472                     }
473                     else
474                         _tail=_capacity-1;
475 
476                     _size.decrementAndGet();
477                 }
478 
479                 return old;
480             }
481             finally
482             {
483                 _headLock.unlock();
484             }
485         }
486         finally
487         {
488             _tailLock.unlock();
489         }
490     }
491 
492     /* ------------------------------------------------------------ */
493     @Override
494     public E set(int index, E e)
495     {
496         if (e == null) 
497             throw new NullPointerException();
498 
499         _tailLock.lock();
500         try
501         {
502             _headLock.lock();
503             try
504             {
505 
506                 if (index<0 || index>=_size.get())
507                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
508 
509                 int i = _head+index;
510                 if (i>=_capacity)
511                     i-=_capacity;
512                 @SuppressWarnings("unchecked")
513                 E old=(E)_elements[i];
514                 _elements[i]=e;
515                 return old;
516             }
517             finally
518             {
519                 _headLock.unlock();
520             }
521         }
522         finally
523         {
524             _tailLock.unlock();
525         }
526     }
527     
528     /* ------------------------------------------------------------ */
529     @Override
530     public void add(int index, E e)
531     {
532         if (e == null) 
533             throw new NullPointerException();
534 
535         _tailLock.lock();
536         try
537         {
538             _headLock.lock();
539             try
540             {
541 
542                 if (index<0 || index>_size.get())
543                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
544 
545                 if (index==_size.get())
546                 {
547                     add(e);
548                 }
549                 else
550                 {
551                     if (_tail==_head)
552                         if (!grow())
553                             throw new IllegalStateException("full");
554 
555                     int i = _head+index;
556                     if (i>=_capacity)
557                         i-=_capacity;
558 
559                     _size.incrementAndGet();
560                     _tail=(_tail+1)%_capacity;
561 
562 
563                     if (i<_tail)
564                     {
565                         System.arraycopy(_elements,i,_elements,i+1,_tail-i);
566                         _elements[i]=e;
567                     }
568                     else
569                     {
570                         if (_tail>0)
571                         {
572                             System.arraycopy(_elements,0,_elements,1,_tail);
573                             _elements[0]=_elements[_capacity-1];
574                         }
575 
576                         System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
577                         _elements[i]=e;
578                     }
579                 }
580             }
581             finally
582             {
583                 _headLock.unlock();
584             }
585         }
586         finally
587         {
588             _tailLock.unlock();
589         }
590     }
591 
592     /* ------------------------------------------------------------ */
593     private boolean grow()
594     {
595         if (_growCapacity<=0)
596             return false;
597 
598         _tailLock.lock();
599         try
600         {
601             _headLock.lock();
602             try
603             {
604                 final int head=_head;
605                 final int tail=_tail;
606                 final int new_tail;
607 
608                 Object[] elements=new Object[_capacity+_growCapacity];
609 
610                 if (head<tail)
611                 {
612                     new_tail=tail-head;
613                     System.arraycopy(_elements,head,elements,0,new_tail);
614                 }
615                 else if (head>tail || _size.get()>0)
616                 {
617                     new_tail=_capacity+tail-head;
618                     int cut=_capacity-head;
619                     System.arraycopy(_elements,head,elements,0,cut);
620                     System.arraycopy(_elements,0,elements,cut,tail);
621                 }
622                 else
623                 {
624                     new_tail=0;
625                 }
626 
627                 _elements=elements;
628                 _capacity=_elements.length;
629                 _head=0;
630                 _tail=new_tail; 
631                 return true;
632             }
633             finally
634             {
635                 _headLock.unlock();
636             }
637         }
638         finally
639         {
640             _tailLock.unlock();
641         }
642 
643     }
644 
645     /* ------------------------------------------------------------ */
646     public int drainTo(Collection<? super E> c)
647     {
648         throw new UnsupportedOperationException();
649     }
650 
651     /* ------------------------------------------------------------ */
652     public int drainTo(Collection<? super E> c, int maxElements)
653     {
654         throw new UnsupportedOperationException();
655     }
656 
657     /* ------------------------------------------------------------ */
658     public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
659     {
660         throw new UnsupportedOperationException();
661     }
662 
663     /* ------------------------------------------------------------ */
664     public void put(E o) throws InterruptedException
665     {
666         if (!add(o))
667             throw new IllegalStateException("full");
668     }
669 
670     /* ------------------------------------------------------------ */
671     public int remainingCapacity()
672     {
673         _tailLock.lock();
674         try
675         {
676             _headLock.lock();
677             try
678             {
679                 return getCapacity()-size();
680             }
681             finally
682             {
683                 _headLock.unlock();
684             }
685         }
686         finally
687         {
688             _tailLock.unlock();
689         }
690     }
691     
692 
693     /* ------------------------------------------------------------ */
694     long sumOfSpace()
695     {
696         // this method exists to stop clever optimisers removing the spacers
697         return _space0++ +_space1++ +_space2++ +_space3++ +_space4++ +_space5++ +_space6++ +_space7++; 
698     }
699 }