View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.util;
15  
16  import java.util.AbstractList;
17  import java.util.Collection;
18  import java.util.NoSuchElementException;
19  import java.util.concurrent.BlockingQueue;
20  import java.util.concurrent.TimeUnit;
21  import java.util.concurrent.atomic.AtomicInteger;
22  import java.util.concurrent.locks.Condition;
23  import java.util.concurrent.locks.ReentrantLock;
24  
25  
26  /* ------------------------------------------------------------ */
27  /** Queue backed by a circular array.
28   * 
29   * This queue is uses  a variant of the two lock queue algorithm to
30   * provide an efficient queue or list backed by a growable circular
31   * array.  This queue also has a partial implementation of 
32   * {@link java.util.concurrent.BlockingQueue}, specifically the {@link #take()} and 
33   * {@link #poll(long, TimeUnit)} methods.  
34   * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is
35   * able to grow and provides a blocking put call.
36   * <p>
37   * The queue has both a capacity (the size of the array currently allocated)
38   * and a limit (the maximum size that may be allocated), which defaults to 
39   * {@link Integer#MAX_VALUE}.
40   * 
41   * @param <E> The element type
42   */
43  public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
44  {
45      public final int DEFAULT_CAPACITY=128;
46      public final int DEFAULT_GROWTH=64;
47      private final int _limit;
48      private final AtomicInteger _size=new AtomicInteger();
49      private final int _growCapacity;
50      
51      private volatile int _capacity;
52      private Object[] _elements;
53      private int _head;
54      private int _tail;
55      
56      private final ReentrantLock _headLock = new ReentrantLock();
57      private final Condition _notEmpty = _headLock.newCondition();
58      private final ReentrantLock _tailLock = new ReentrantLock();
59  
60      /* ------------------------------------------------------------ */
61      /** Create a growing partially blocking Queue
62       * 
63       */
64      public BlockingArrayQueue()
65      {
66          _elements=new Object[DEFAULT_CAPACITY];
67          _growCapacity=DEFAULT_GROWTH;
68          _capacity=_elements.length;
69          _limit=Integer.MAX_VALUE;
70      }
71  
72      /* ------------------------------------------------------------ */
73      /** Create a fixed size partially blocking Queue
74       * @param limit The initial capacity and the limit.
75       */
76      public BlockingArrayQueue(int limit)
77      {
78          _elements=new Object[limit];
79          _capacity=_elements.length;
80          _growCapacity=-1;
81          _limit=limit;
82      }
83  
84      /* ------------------------------------------------------------ */
85      /** Create a growing partially blocking Queue.
86       * @param capacity Initial capacity
87       * @param growBy Incremental capacity.
88       */
89      public BlockingArrayQueue(int capacity,int growBy)
90      {
91          _elements=new Object[capacity];
92          _capacity=_elements.length;
93          _growCapacity=growBy;
94          _limit=Integer.MAX_VALUE;
95      }
96  
97      /* ------------------------------------------------------------ */
98      /** Create a growing limited partially blocking Queue.
99       * @param capacity Initial capacity
100      * @param growBy Incremental capacity.
101      * @param limit maximum capacity.
102      */
103     public BlockingArrayQueue(int capacity,int growBy,int limit)
104     {
105         if (capacity>limit)
106             throw new IllegalArgumentException();
107         
108         _elements=new Object[capacity];
109         _capacity=_elements.length;
110         _growCapacity=growBy;
111         _limit=limit;
112     }
113 
114     /* ------------------------------------------------------------ */
115     public int getCapacity()
116     {
117         return _capacity;
118     }
119 
120     /* ------------------------------------------------------------ */
121     public int getLimit()
122     {
123         return _limit;
124     }
125     
126     /* ------------------------------------------------------------ */
127     public boolean add(E e)
128     {
129         return offer(e);
130     }
131     
132     /* ------------------------------------------------------------ */
133     public E element()
134     {
135         E e = peek();
136         if (e==null)
137             throw new NoSuchElementException();
138         return e;
139     }
140     
141     /* ------------------------------------------------------------ */
142     public E peek()
143     {
144         if (_size.get() == 0)
145             return null;
146         
147         E e = null;
148         _headLock.lock(); // Size cannot shrink
149         try 
150         {
151             if (_size.get() > 0) 
152                 e = (E)_elements[_head];
153         } 
154         finally 
155         {
156             _headLock.unlock();
157         }
158         
159         return e;
160     }
161 
162     /* ------------------------------------------------------------ */
163     public boolean offer(E e)
164     {
165         if (e == null) 
166             throw new NullPointerException();
167         
168         boolean not_empty=false;
169         _tailLock.lock();  // size cannot grow... only shrink
170         try 
171         {
172             if (_size.get() >= _limit) 
173                 return false;
174             
175             // should we expand array?
176             if (_size.get()==_capacity)
177             {
178                 _headLock.lock();   // Need to grow array
179                 try
180                 {
181                     if (!grow())
182                         return false;
183                 }
184                 finally
185                 {
186                     _headLock.unlock();
187                 }
188             }
189 
190             // add the element
191             _elements[_tail]=e;
192             _tail=(_tail+1)%_capacity;
193 
194             not_empty=0==_size.getAndIncrement();
195             
196         } 
197         finally 
198         {
199             _tailLock.unlock();
200         }
201         
202         if (not_empty)
203         {
204             _headLock.lock();
205             try
206             {
207                 _notEmpty.signal();
208             }
209             finally
210             {
211                 _headLock.unlock();
212             }
213         }  
214 
215         return true;
216     }
217 
218 
219     /* ------------------------------------------------------------ */
220     public E poll()
221     {
222         if (_size.get() == 0)
223             return null;
224         
225         E e = null;
226         _headLock.lock(); // Size cannot shrink
227         try 
228         {
229             if (_size.get() > 0) 
230             {
231                 final int head=_head;
232                 e = (E)_elements[head];
233                 _elements[head]=null;
234                 _head=(head+1)%_capacity;
235                 
236                 if (_size.decrementAndGet()>0)
237                     _notEmpty.signal();
238             }
239         } 
240         finally 
241         {
242             _headLock.unlock();
243         }
244         
245         return e;
246     }
247 
248     /* ------------------------------------------------------------ */
249     /**
250      * Retrieves and removes the head of this queue, waiting
251      * if no elements are present on this queue.
252      * @return the head of this queue
253      * @throws InterruptedException if interrupted while waiting.
254      */
255     public E take() throws InterruptedException
256     {
257         E e = null;
258         _headLock.lockInterruptibly();  // Size cannot shrink
259         try 
260         {
261             try 
262             {
263                 while (_size.get() == 0)
264                 {
265                     _notEmpty.await();
266                 }
267             } 
268             catch (InterruptedException ie) 
269             {
270                 _notEmpty.signal();
271                 throw ie;
272             }
273 
274             final int head=_head;
275             e = (E)_elements[head];
276             _elements[head]=null;
277             _head=(head+1)%_capacity;
278 
279             if (_size.decrementAndGet()>0)
280                 _notEmpty.signal();
281         } 
282         finally 
283         {
284             _headLock.unlock();
285         }
286         
287         return e;
288     }
289 
290     /* ------------------------------------------------------------ */
291     /**
292      * Retrieves and removes the head of this queue, waiting
293      * if necessary up to the specified wait time if no elements are
294      * present on this queue.
295      * @param time how long to wait before giving up, in units of
296      * <tt>unit</tt>
297      * @param unit a <tt>TimeUnit</tt> determining how to interpret the
298      * <tt>timeout</tt> parameter
299      * @return the head of this queue, or <tt>null</tt> if the
300      * specified waiting time elapses before an element is present.
301      * @throws InterruptedException if interrupted while waiting.
302      */
303     public E poll(long time, TimeUnit unit) throws InterruptedException
304     {
305         
306         E e = null;
307 
308         long nanos = unit.toNanos(time);
309         
310         _headLock.lockInterruptibly(); // Size cannot shrink
311         try 
312         {    
313             try 
314             {
315                 while (_size.get() == 0)
316                 {
317                     if (nanos<=0)
318                         return null;
319                     nanos = _notEmpty.awaitNanos(nanos);
320                 }
321             } 
322             catch (InterruptedException ie) 
323             {
324                 _notEmpty.signal();
325                 throw ie;
326             }
327 
328             e = (E)_elements[_head];
329             _elements[_head]=null;
330             _head=(_head+1)%_capacity;
331 
332             if (_size.decrementAndGet()>0)
333                 _notEmpty.signal();
334         } 
335         finally 
336         {
337             _headLock.unlock();
338         }
339         
340         return e;
341     }
342 
343     /* ------------------------------------------------------------ */
344     public E remove()
345     {
346         E e=poll();
347         if (e==null)
348             throw new NoSuchElementException();
349         return e;
350     }
351 
352     /* ------------------------------------------------------------ */
353     public void clear()
354     {
355         _tailLock.lock();
356         try
357         {
358             _headLock.lock();
359             try
360             {
361                 _head=0;
362                 _tail=0;
363                 _size.set(0);
364             }
365             finally
366             {
367                 _headLock.unlock();
368             }
369         }
370         finally
371         {
372             _tailLock.unlock();
373         }
374     }
375 
376     /* ------------------------------------------------------------ */
377     public boolean isEmpty()
378     {
379         return _size.get()==0;
380     }
381 
382     /* ------------------------------------------------------------ */
383     public int size()
384     {
385         return _size.get();
386     }
387 
388     /* ------------------------------------------------------------ */
389     public E get(int index)
390     {
391         _tailLock.lock();
392         try
393         {
394             _headLock.lock();
395             try
396             {
397                 if (index<0 || index>=_size.get())
398                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
399                 int i = _head+index;
400                 if (i>=_capacity)
401                     i-=_capacity;
402                 return (E)_elements[i];
403             }
404             finally
405             {
406                 _headLock.unlock();
407             }
408         }
409         finally
410         {
411             _tailLock.unlock();
412         }
413     }
414     
415     /* ------------------------------------------------------------ */
416     public E remove(int index)
417     {
418         _tailLock.lock();
419         try
420         {
421             _headLock.lock();
422             try
423             {
424 
425                 if (index<0 || index>=_size.get())
426                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
427 
428                 int i = _head+index;
429                 if (i>=_capacity)
430                     i-=_capacity;
431                 E old=(E)_elements[i];
432 
433                 if (i<_tail)
434                 {
435                     System.arraycopy(_elements,i+1,_elements,i,_tail-i);
436                     _tail--;
437                     _size.decrementAndGet();
438                 }
439                 else
440                 {
441                     System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
442                     if (_tail>0)
443                     {
444                         _elements[_capacity]=_elements[0];
445                         System.arraycopy(_elements,1,_elements,0,_tail-1);
446                         _tail--;
447                     }
448                     else
449                         _tail=_capacity-1;
450 
451                     _size.decrementAndGet();
452                 }
453 
454                 return old;
455             }
456             finally
457             {
458                 _headLock.unlock();
459             }
460         }
461         finally
462         {
463             _tailLock.unlock();
464         }
465     }
466 
467     /* ------------------------------------------------------------ */
468     public E set(int index, E e)
469     {
470         if (e == null) 
471             throw new NullPointerException();
472 
473         _tailLock.lock();
474         try
475         {
476             _headLock.lock();
477             try
478             {
479 
480                 if (index<0 || index>=_size.get())
481                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
482 
483                 int i = _head+index;
484                 if (i>=_capacity)
485                     i-=_capacity;
486                 E old=(E)_elements[i];
487                 _elements[i]=e;
488                 return old;
489             }
490             finally
491             {
492                 _headLock.unlock();
493             }
494         }
495         finally
496         {
497             _tailLock.unlock();
498         }
499     }
500     
501     /* ------------------------------------------------------------ */
502     public void add(int index, E e)
503     {
504         if (e == null) 
505             throw new NullPointerException();
506 
507         _tailLock.lock();
508         try
509         {
510             _headLock.lock();
511             try
512             {
513 
514                 if (index<0 || index>_size.get())
515                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
516 
517                 if (index==_size.get())
518                 {
519                     add(e);
520                 }
521                 else
522                 {
523                     if (_tail==_head)
524                         if (!grow())
525                             throw new IllegalStateException("full");
526 
527                     int i = _head+index;
528                     if (i>=_capacity)
529                         i-=_capacity;
530 
531                     _size.incrementAndGet();
532                     _tail=(_tail+1)%_capacity;
533 
534 
535                     if (i<_tail)
536                     {
537                         System.arraycopy(_elements,i,_elements,i+1,_tail-i);
538                         _elements[i]=e;
539                     }
540                     else
541                     {
542                         if (_tail>0)
543                         {
544                             System.arraycopy(_elements,0,_elements,1,_tail);
545                             _elements[0]=_elements[_capacity-1];
546                         }
547 
548                         System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
549                         _elements[i]=e;
550                     }
551                 }
552             }
553             finally
554             {
555                 _headLock.unlock();
556             }
557         }
558         finally
559         {
560             _tailLock.unlock();
561         }
562     }
563 
564     /* ------------------------------------------------------------ */
565     private boolean grow()
566     {
567         if (_growCapacity<=0)
568             return false;
569 
570         _tailLock.lock();
571         try
572         {
573             _headLock.lock();
574             try
575             {
576                 final int head=_head;
577                 final int tail=_tail;
578                 final int new_tail;
579 
580                 Object[] elements=new Object[_capacity+_growCapacity];
581 
582                 if (head<tail)
583                 {
584                     new_tail=tail-head;
585                     System.arraycopy(_elements,head,elements,0,new_tail);
586                 }
587                 else if (head>tail || _size.get()>0)
588                 {
589                     new_tail=_capacity+tail-head;
590                     int cut=_capacity-head;
591                     System.arraycopy(_elements,head,elements,0,cut);
592                     System.arraycopy(_elements,0,elements,cut,tail);
593                 }
594                 else
595                 {
596                     new_tail=0;
597                 }
598 
599                 _elements=elements;
600                 _capacity=_elements.length;
601                 _head=0;
602                 _tail=new_tail; 
603                 return true;
604             }
605             finally
606             {
607                 _headLock.unlock();
608             }
609         }
610         finally
611         {
612             _tailLock.unlock();
613         }
614 
615     }
616 
617     /* ------------------------------------------------------------ */
618     public int drainTo(Collection<? super E> c)
619     {
620         throw new UnsupportedOperationException();
621     }
622 
623     /* ------------------------------------------------------------ */
624     public int drainTo(Collection<? super E> c, int maxElements)
625     {
626         throw new UnsupportedOperationException();
627     }
628 
629     /* ------------------------------------------------------------ */
630     public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
631     {
632         throw new UnsupportedOperationException();
633     }
634 
635     /* ------------------------------------------------------------ */
636     public void put(E o) throws InterruptedException
637     {
638         if (!add(o))
639             throw new IllegalStateException("full");
640     }
641 
642     /* ------------------------------------------------------------ */
643     public int remainingCapacity()
644     {
645         _tailLock.lock();
646         try
647         {
648             _headLock.lock();
649             try
650             {
651                 return getCapacity()-size();
652             }
653             finally
654             {
655                 _headLock.unlock();
656             }
657         }
658         finally
659         {
660             _tailLock.unlock();
661         }
662     }
663 }