View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.util;
15  
16  import java.util.AbstractList;
17  import java.util.Collection;
18  import java.util.NoSuchElementException;
19  import java.util.concurrent.BlockingQueue;
20  import java.util.concurrent.TimeUnit;
21  import java.util.concurrent.atomic.AtomicInteger;
22  import java.util.concurrent.locks.Condition;
23  import java.util.concurrent.locks.ReentrantLock;
24  
25  
26  /* ------------------------------------------------------------ */
27  /** Queue backed by a circular array.
28   * 
29   * This queue is uses  a variant of the two lock queue algorithm to
30   * provide an efficient queue or list backed by a growable circular
31   * array.  This queue also has a partial implementation of 
32   * {@link java.util.concurrent.BlockingQueue}, specifically the {@link #take()} and 
33   * {@link #poll(long, TimeUnit)} methods.  
34   * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is
35   * able to grow and provides a blocking put call.
36   * <p>
37   * The queue has both a capacity (the size of the array currently allocated)
38   * and a limit (the maximum size that may be allocated), which defaults to 
39   * {@link Integer#MAX_VALUE}.
40   * 
41   * @param <E> The element type
42   */
43  public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
44  {
45      public final int DEFAULT_CAPACITY=128;
46      public final int DEFAULT_GROWTH=64;
47      private final int _limit;
48      private final AtomicInteger _size=new AtomicInteger();
49      private final int _growCapacity;
50      
51      private volatile int _capacity;
52      private Object[] _elements;
53      private int _head;
54      private int _tail;
55      
56      private final ReentrantLock _headLock = new ReentrantLock();
57      private final Condition _notEmpty = _headLock.newCondition();
58      private final ReentrantLock _tailLock = new ReentrantLock();
59  
60      /* ------------------------------------------------------------ */
61      /** Create a growing partially blocking Queue
62       * 
63       */
64      public BlockingArrayQueue()
65      {
66          _elements=new Object[DEFAULT_CAPACITY];
67          _growCapacity=DEFAULT_GROWTH;
68          _capacity=_elements.length;
69          _limit=Integer.MAX_VALUE;
70      }
71  
72      /* ------------------------------------------------------------ */
73      /** Create a fixed size partially blocking Queue
74       * @param limit The initial capacity and the limit.
75       */
76      public BlockingArrayQueue(int limit)
77      {
78          _elements=new Object[limit];
79          _capacity=_elements.length;
80          _growCapacity=-1;
81          _limit=limit;
82      }
83  
84      /* ------------------------------------------------------------ */
85      /** Create a growing partially blocking Queue.
86       * @param capacity Initial capacity
87       * @param growBy Incremental capacity.
88       */
89      public BlockingArrayQueue(int capacity,int growBy)
90      {
91          _elements=new Object[capacity];
92          _capacity=_elements.length;
93          _growCapacity=growBy;
94          _limit=Integer.MAX_VALUE;
95      }
96  
97      /* ------------------------------------------------------------ */
98      /** Create a growing limited partially blocking Queue.
99       * @param capacity Initial capacity
100      * @param growBy Incremental capacity.
101      * @param limit maximum capacity.
102      */
103     public BlockingArrayQueue(int capacity,int growBy,int limit)
104     {
105         if (capacity>limit)
106             throw new IllegalArgumentException();
107         
108         _elements=new Object[capacity];
109         _capacity=_elements.length;
110         _growCapacity=growBy;
111         _limit=limit;
112     }
113 
114     /* ------------------------------------------------------------ */
115     public int getCapacity()
116     {
117         return _capacity;
118     }
119 
120     /* ------------------------------------------------------------ */
121     public int getLimit()
122     {
123         return _limit;
124     }
125     
126     /* ------------------------------------------------------------ */
127     @Override
128     public boolean add(E e)
129     {
130         return offer(e);
131     }
132     
133     /* ------------------------------------------------------------ */
134     public E element()
135     {
136         E e = peek();
137         if (e==null)
138             throw new NoSuchElementException();
139         return e;
140     }
141     
142     /* ------------------------------------------------------------ */
143     @SuppressWarnings("unchecked")
144     public E peek()
145     {
146         if (_size.get() == 0)
147             return null;
148         
149         E e = null;
150         _headLock.lock(); // Size cannot shrink
151         try 
152         {
153             if (_size.get() > 0) 
154                 e = (E)_elements[_head];
155         } 
156         finally 
157         {
158             _headLock.unlock();
159         }
160         
161         return e;
162     }
163 
164     /* ------------------------------------------------------------ */
165     public boolean offer(E e)
166     {
167         if (e == null) 
168             throw new NullPointerException();
169         
170         boolean not_empty=false;
171         _tailLock.lock();  // size cannot grow... only shrink
172         try 
173         {
174             if (_size.get() >= _limit) 
175                 return false;
176             
177             // should we expand array?
178             if (_size.get()==_capacity)
179             {
180                 _headLock.lock();   // Need to grow array
181                 try
182                 {
183                     if (!grow())
184                         return false;
185                 }
186                 finally
187                 {
188                     _headLock.unlock();
189                 }
190             }
191 
192             // add the element
193             _elements[_tail]=e;
194             _tail=(_tail+1)%_capacity;
195 
196             not_empty=0==_size.getAndIncrement();
197             
198         } 
199         finally 
200         {
201             _tailLock.unlock();
202         }
203         
204         if (not_empty)
205         {
206             _headLock.lock();
207             try
208             {
209                 _notEmpty.signal();
210             }
211             finally
212             {
213                 _headLock.unlock();
214             }
215         }  
216 
217         return true;
218     }
219 
220 
221     /* ------------------------------------------------------------ */
222     @SuppressWarnings("unchecked")
223     public E poll()
224     {
225         if (_size.get() == 0)
226             return null;
227         
228         E e = null;
229         _headLock.lock(); // Size cannot shrink
230         try 
231         {
232             if (_size.get() > 0) 
233             {
234                 final int head=_head;
235                 e = (E)_elements[head];
236                 _elements[head]=null;
237                 _head=(head+1)%_capacity;
238                 
239                 if (_size.decrementAndGet()>0)
240                     _notEmpty.signal();
241             }
242         } 
243         finally 
244         {
245             _headLock.unlock();
246         }
247         
248         return e;
249     }
250 
251     /* ------------------------------------------------------------ */
252     /**
253      * Retrieves and removes the head of this queue, waiting
254      * if no elements are present on this queue.
255      * @return the head of this queue
256      * @throws InterruptedException if interrupted while waiting.
257      */
258     @SuppressWarnings("unchecked")
259     public E take() throws InterruptedException
260     {
261         E e = null;
262         _headLock.lockInterruptibly();  // Size cannot shrink
263         try 
264         {
265             try 
266             {
267                 while (_size.get() == 0)
268                 {
269                     _notEmpty.await();
270                 }
271             } 
272             catch (InterruptedException ie) 
273             {
274                 _notEmpty.signal();
275                 throw ie;
276             }
277 
278             final int head=_head;
279             e = (E)_elements[head];
280             _elements[head]=null;
281             _head=(head+1)%_capacity;
282 
283             if (_size.decrementAndGet()>0)
284                 _notEmpty.signal();
285         } 
286         finally 
287         {
288             _headLock.unlock();
289         }
290         
291         return e;
292     }
293 
294     /* ------------------------------------------------------------ */
295     /**
296      * Retrieves and removes the head of this queue, waiting
297      * if necessary up to the specified wait time if no elements are
298      * present on this queue.
299      * @param time how long to wait before giving up, in units of
300      * <tt>unit</tt>
301      * @param unit a <tt>TimeUnit</tt> determining how to interpret the
302      * <tt>timeout</tt> parameter
303      * @return the head of this queue, or <tt>null</tt> if the
304      * specified waiting time elapses before an element is present.
305      * @throws InterruptedException if interrupted while waiting.
306      */
307     @SuppressWarnings("unchecked")
308     public E poll(long time, TimeUnit unit) throws InterruptedException
309     {
310         
311         E e = null;
312 
313         long nanos = unit.toNanos(time);
314         
315         _headLock.lockInterruptibly(); // Size cannot shrink
316         try 
317         {    
318             try 
319             {
320                 while (_size.get() == 0)
321                 {
322                     if (nanos<=0)
323                         return null;
324                     nanos = _notEmpty.awaitNanos(nanos);
325                 }
326             } 
327             catch (InterruptedException ie) 
328             {
329                 _notEmpty.signal();
330                 throw ie;
331             }
332 
333             e = (E)_elements[_head];
334             _elements[_head]=null;
335             _head=(_head+1)%_capacity;
336 
337             if (_size.decrementAndGet()>0)
338                 _notEmpty.signal();
339         } 
340         finally 
341         {
342             _headLock.unlock();
343         }
344         
345         return e;
346     }
347 
348     /* ------------------------------------------------------------ */
349     public E remove()
350     {
351         E e=poll();
352         if (e==null)
353             throw new NoSuchElementException();
354         return e;
355     }
356 
357     /* ------------------------------------------------------------ */
358     @Override
359     public void clear()
360     {
361         _tailLock.lock();
362         try
363         {
364             _headLock.lock();
365             try
366             {
367                 _head=0;
368                 _tail=0;
369                 _size.set(0);
370             }
371             finally
372             {
373                 _headLock.unlock();
374             }
375         }
376         finally
377         {
378             _tailLock.unlock();
379         }
380     }
381 
382     /* ------------------------------------------------------------ */
383     @Override
384     public boolean isEmpty()
385     {
386         return _size.get()==0;
387     }
388 
389     /* ------------------------------------------------------------ */
390     @Override
391     public int size()
392     {
393         return _size.get();
394     }
395 
396     /* ------------------------------------------------------------ */
397     @SuppressWarnings("unchecked")
398     @Override
399     public E get(int index)
400     {
401         _tailLock.lock();
402         try
403         {
404             _headLock.lock();
405             try
406             {
407                 if (index<0 || index>=_size.get())
408                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
409                 int i = _head+index;
410                 if (i>=_capacity)
411                     i-=_capacity;
412                 return (E)_elements[i];
413             }
414             finally
415             {
416                 _headLock.unlock();
417             }
418         }
419         finally
420         {
421             _tailLock.unlock();
422         }
423     }
424     
425     /* ------------------------------------------------------------ */
426     @Override
427     public E remove(int index)
428     {
429         _tailLock.lock();
430         try
431         {
432             _headLock.lock();
433             try
434             {
435 
436                 if (index<0 || index>=_size.get())
437                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
438 
439                 int i = _head+index;
440                 if (i>=_capacity)
441                     i-=_capacity;
442                 @SuppressWarnings("unchecked")
443                 E old=(E)_elements[i];
444 
445                 if (i<_tail)
446                 {
447                     System.arraycopy(_elements,i+1,_elements,i,_tail-i);
448                     _tail--;
449                     _size.decrementAndGet();
450                 }
451                 else
452                 {
453                     System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
454                     if (_tail>0)
455                     {
456                         _elements[_capacity]=_elements[0];
457                         System.arraycopy(_elements,1,_elements,0,_tail-1);
458                         _tail--;
459                     }
460                     else
461                         _tail=_capacity-1;
462 
463                     _size.decrementAndGet();
464                 }
465 
466                 return old;
467             }
468             finally
469             {
470                 _headLock.unlock();
471             }
472         }
473         finally
474         {
475             _tailLock.unlock();
476         }
477     }
478 
479     /* ------------------------------------------------------------ */
480     @Override
481     public E set(int index, E e)
482     {
483         if (e == null) 
484             throw new NullPointerException();
485 
486         _tailLock.lock();
487         try
488         {
489             _headLock.lock();
490             try
491             {
492 
493                 if (index<0 || index>=_size.get())
494                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
495 
496                 int i = _head+index;
497                 if (i>=_capacity)
498                     i-=_capacity;
499                 @SuppressWarnings("unchecked")
500                 E old=(E)_elements[i];
501                 _elements[i]=e;
502                 return old;
503             }
504             finally
505             {
506                 _headLock.unlock();
507             }
508         }
509         finally
510         {
511             _tailLock.unlock();
512         }
513     }
514     
515     /* ------------------------------------------------------------ */
516     @Override
517     public void add(int index, E e)
518     {
519         if (e == null) 
520             throw new NullPointerException();
521 
522         _tailLock.lock();
523         try
524         {
525             _headLock.lock();
526             try
527             {
528 
529                 if (index<0 || index>_size.get())
530                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
531 
532                 if (index==_size.get())
533                 {
534                     add(e);
535                 }
536                 else
537                 {
538                     if (_tail==_head)
539                         if (!grow())
540                             throw new IllegalStateException("full");
541 
542                     int i = _head+index;
543                     if (i>=_capacity)
544                         i-=_capacity;
545 
546                     _size.incrementAndGet();
547                     _tail=(_tail+1)%_capacity;
548 
549 
550                     if (i<_tail)
551                     {
552                         System.arraycopy(_elements,i,_elements,i+1,_tail-i);
553                         _elements[i]=e;
554                     }
555                     else
556                     {
557                         if (_tail>0)
558                         {
559                             System.arraycopy(_elements,0,_elements,1,_tail);
560                             _elements[0]=_elements[_capacity-1];
561                         }
562 
563                         System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
564                         _elements[i]=e;
565                     }
566                 }
567             }
568             finally
569             {
570                 _headLock.unlock();
571             }
572         }
573         finally
574         {
575             _tailLock.unlock();
576         }
577     }
578 
579     /* ------------------------------------------------------------ */
580     private boolean grow()
581     {
582         if (_growCapacity<=0)
583             return false;
584 
585         _tailLock.lock();
586         try
587         {
588             _headLock.lock();
589             try
590             {
591                 final int head=_head;
592                 final int tail=_tail;
593                 final int new_tail;
594 
595                 Object[] elements=new Object[_capacity+_growCapacity];
596 
597                 if (head<tail)
598                 {
599                     new_tail=tail-head;
600                     System.arraycopy(_elements,head,elements,0,new_tail);
601                 }
602                 else if (head>tail || _size.get()>0)
603                 {
604                     new_tail=_capacity+tail-head;
605                     int cut=_capacity-head;
606                     System.arraycopy(_elements,head,elements,0,cut);
607                     System.arraycopy(_elements,0,elements,cut,tail);
608                 }
609                 else
610                 {
611                     new_tail=0;
612                 }
613 
614                 _elements=elements;
615                 _capacity=_elements.length;
616                 _head=0;
617                 _tail=new_tail; 
618                 return true;
619             }
620             finally
621             {
622                 _headLock.unlock();
623             }
624         }
625         finally
626         {
627             _tailLock.unlock();
628         }
629 
630     }
631 
632     /* ------------------------------------------------------------ */
633     public int drainTo(Collection<? super E> c)
634     {
635         throw new UnsupportedOperationException();
636     }
637 
638     /* ------------------------------------------------------------ */
639     public int drainTo(Collection<? super E> c, int maxElements)
640     {
641         throw new UnsupportedOperationException();
642     }
643 
644     /* ------------------------------------------------------------ */
645     public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
646     {
647         throw new UnsupportedOperationException();
648     }
649 
650     /* ------------------------------------------------------------ */
651     public void put(E o) throws InterruptedException
652     {
653         if (!add(o))
654             throw new IllegalStateException("full");
655     }
656 
657     /* ------------------------------------------------------------ */
658     public int remainingCapacity()
659     {
660         _tailLock.lock();
661         try
662         {
663             _headLock.lock();
664             try
665             {
666                 return getCapacity()-size();
667             }
668             finally
669             {
670                 _headLock.unlock();
671             }
672         }
673         finally
674         {
675             _tailLock.unlock();
676         }
677     }
678 }