View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.util;
15  
16  import java.util.AbstractList;
17  import java.util.Collection;
18  import java.util.NoSuchElementException;
19  import java.util.concurrent.BlockingQueue;
20  import java.util.concurrent.TimeUnit;
21  import java.util.concurrent.atomic.AtomicInteger;
22  import java.util.concurrent.locks.Condition;
23  import java.util.concurrent.locks.ReentrantLock;
24  
25  
26  /* ------------------------------------------------------------ */
27  /** Queue backed by a circular array.
28   * 
29   * This queue is uses  a variant of the two lock queue algorithm to
30   * provide an efficient queue or list backed by a growable circular
31   * array.  This queue also has a partial implementation of 
32   * {@link java.util.concurrent.BlockingQueue}, specifically the {@link #take()} and 
33   * {@link #poll(long, TimeUnit)} methods.  
34   * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is
35   * able to grow and provides a blocking put call.
36   * <p>
37   * The queue has both a capacity (the size of the array currently allocated)
38   * and a limit (the maximum size that may be allocated), which defaults to 
39   * {@link Integer#MAX_VALUE}.
40   * 
41   * @param <E> The element type
42   */
43  public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
44  {
45      public final int DEFAULT_CAPACITY=128;
46      public final int DEFAULT_GROWTH=64;
47      private final int _limit;
48      private final AtomicInteger _size=new AtomicInteger();
49      private final int _growCapacity;
50      
51      private volatile int _capacity;
52      private Object[] _elements;
53      private int _head;
54      private int _tail;
55      
56      private final ReentrantLock _headLock = new ReentrantLock();
57      private final Condition _notEmpty = _headLock.newCondition();
58      private final ReentrantLock _tailLock = new ReentrantLock();
59  
60      /* ------------------------------------------------------------ */
61      /** Create a growing partially blocking Queue
62       * 
63       */
64      public BlockingArrayQueue()
65      {
66          _elements=new Object[DEFAULT_CAPACITY];
67          _growCapacity=DEFAULT_GROWTH;
68          _capacity=_elements.length;
69          _limit=Integer.MAX_VALUE;
70      }
71  
72      /* ------------------------------------------------------------ */
73      /** Create a fixed size partially blocking Queue
74       * @param limit The initial capacity and the limit.
75       */
76      public BlockingArrayQueue(int limit)
77      {
78          _elements=new Object[limit];
79          _capacity=_elements.length;
80          _growCapacity=-1;
81          _limit=limit;
82      }
83  
84      /* ------------------------------------------------------------ */
85      /** Create a growing partially blocking Queue.
86       * @param capacity Initial capacity
87       * @param growBy Incremental capacity.
88       */
89      public BlockingArrayQueue(int capacity,int growBy)
90      {
91          _elements=new Object[capacity];
92          _capacity=_elements.length;
93          _growCapacity=growBy;
94          _limit=Integer.MAX_VALUE;
95      }
96  
97      /* ------------------------------------------------------------ */
98      /** Create a growing limited partially blocking Queue.
99       * @param capacity Initial capacity
100      * @param growBy Incremental capacity.
101      * @param limit maximum capacity.
102      */
103     public BlockingArrayQueue(int capacity,int growBy,int limit)
104     {
105         if (capacity>limit)
106             throw new IllegalArgumentException();
107         
108         _elements=new Object[capacity];
109         _capacity=_elements.length;
110         _growCapacity=growBy;
111         _limit=limit;
112     }
113 
114     /* ------------------------------------------------------------ */
115     public int getCapacity()
116     {
117         return _capacity;
118     }
119 
120     /* ------------------------------------------------------------ */
121     public int getLimit()
122     {
123         return _limit;
124     }
125     
126     /* ------------------------------------------------------------ */
127     @Override
128     public boolean add(E e)
129     {
130         return offer(e);
131     }
132     
133     /* ------------------------------------------------------------ */
134     public E element()
135     {
136         E e = peek();
137         if (e==null)
138             throw new NoSuchElementException();
139         return e;
140     }
141     
142     /* ------------------------------------------------------------ */
143     public E peek()
144     {
145         if (_size.get() == 0)
146             return null;
147         
148         E e = null;
149         _headLock.lock(); // Size cannot shrink
150         try 
151         {
152             if (_size.get() > 0) 
153                 e = (E)_elements[_head];
154         } 
155         finally 
156         {
157             _headLock.unlock();
158         }
159         
160         return e;
161     }
162 
163     /* ------------------------------------------------------------ */
164     public boolean offer(E e)
165     {
166         if (e == null) 
167             throw new NullPointerException();
168         
169         boolean not_empty=false;
170         _tailLock.lock();  // size cannot grow... only shrink
171         try 
172         {
173             if (_size.get() >= _limit) 
174                 return false;
175             
176             // should we expand array?
177             if (_size.get()==_capacity)
178             {
179                 _headLock.lock();   // Need to grow array
180                 try
181                 {
182                     if (!grow())
183                         return false;
184                 }
185                 finally
186                 {
187                     _headLock.unlock();
188                 }
189             }
190 
191             // add the element
192             _elements[_tail]=e;
193             _tail=(_tail+1)%_capacity;
194 
195             not_empty=0==_size.getAndIncrement();
196             
197         } 
198         finally 
199         {
200             _tailLock.unlock();
201         }
202         
203         if (not_empty)
204         {
205             _headLock.lock();
206             try
207             {
208                 _notEmpty.signal();
209             }
210             finally
211             {
212                 _headLock.unlock();
213             }
214         }  
215 
216         return true;
217     }
218 
219 
220     /* ------------------------------------------------------------ */
221     public E poll()
222     {
223         if (_size.get() == 0)
224             return null;
225         
226         E e = null;
227         _headLock.lock(); // Size cannot shrink
228         try 
229         {
230             if (_size.get() > 0) 
231             {
232                 final int head=_head;
233                 e = (E)_elements[head];
234                 _elements[head]=null;
235                 _head=(head+1)%_capacity;
236                 
237                 if (_size.decrementAndGet()>0)
238                     _notEmpty.signal();
239             }
240         } 
241         finally 
242         {
243             _headLock.unlock();
244         }
245         
246         return e;
247     }
248 
249     /* ------------------------------------------------------------ */
250     /**
251      * Retrieves and removes the head of this queue, waiting
252      * if no elements are present on this queue.
253      * @return the head of this queue
254      * @throws InterruptedException if interrupted while waiting.
255      */
256     public E take() throws InterruptedException
257     {
258         E e = null;
259         _headLock.lockInterruptibly();  // Size cannot shrink
260         try 
261         {
262             try 
263             {
264                 while (_size.get() == 0)
265                 {
266                     _notEmpty.await();
267                 }
268             } 
269             catch (InterruptedException ie) 
270             {
271                 _notEmpty.signal();
272                 throw ie;
273             }
274 
275             final int head=_head;
276             e = (E)_elements[head];
277             _elements[head]=null;
278             _head=(head+1)%_capacity;
279 
280             if (_size.decrementAndGet()>0)
281                 _notEmpty.signal();
282         } 
283         finally 
284         {
285             _headLock.unlock();
286         }
287         
288         return e;
289     }
290 
291     /* ------------------------------------------------------------ */
292     /**
293      * Retrieves and removes the head of this queue, waiting
294      * if necessary up to the specified wait time if no elements are
295      * present on this queue.
296      * @param time how long to wait before giving up, in units of
297      * <tt>unit</tt>
298      * @param unit a <tt>TimeUnit</tt> determining how to interpret the
299      * <tt>timeout</tt> parameter
300      * @return the head of this queue, or <tt>null</tt> if the
301      * specified waiting time elapses before an element is present.
302      * @throws InterruptedException if interrupted while waiting.
303      */
304     public E poll(long time, TimeUnit unit) throws InterruptedException
305     {
306         
307         E e = null;
308 
309         long nanos = unit.toNanos(time);
310         
311         _headLock.lockInterruptibly(); // Size cannot shrink
312         try 
313         {    
314             try 
315             {
316                 while (_size.get() == 0)
317                 {
318                     if (nanos<=0)
319                         return null;
320                     nanos = _notEmpty.awaitNanos(nanos);
321                 }
322             } 
323             catch (InterruptedException ie) 
324             {
325                 _notEmpty.signal();
326                 throw ie;
327             }
328 
329             e = (E)_elements[_head];
330             _elements[_head]=null;
331             _head=(_head+1)%_capacity;
332 
333             if (_size.decrementAndGet()>0)
334                 _notEmpty.signal();
335         } 
336         finally 
337         {
338             _headLock.unlock();
339         }
340         
341         return e;
342     }
343 
344     /* ------------------------------------------------------------ */
345     public E remove()
346     {
347         E e=poll();
348         if (e==null)
349             throw new NoSuchElementException();
350         return e;
351     }
352 
353     /* ------------------------------------------------------------ */
354     @Override
355     public void clear()
356     {
357         _tailLock.lock();
358         try
359         {
360             _headLock.lock();
361             try
362             {
363                 _head=0;
364                 _tail=0;
365                 _size.set(0);
366             }
367             finally
368             {
369                 _headLock.unlock();
370             }
371         }
372         finally
373         {
374             _tailLock.unlock();
375         }
376     }
377 
378     /* ------------------------------------------------------------ */
379     @Override
380     public boolean isEmpty()
381     {
382         return _size.get()==0;
383     }
384 
385     /* ------------------------------------------------------------ */
386     @Override
387     public int size()
388     {
389         return _size.get();
390     }
391 
392     /* ------------------------------------------------------------ */
393     @Override
394     public E get(int index)
395     {
396         _tailLock.lock();
397         try
398         {
399             _headLock.lock();
400             try
401             {
402                 if (index<0 || index>=_size.get())
403                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
404                 int i = _head+index;
405                 if (i>=_capacity)
406                     i-=_capacity;
407                 return (E)_elements[i];
408             }
409             finally
410             {
411                 _headLock.unlock();
412             }
413         }
414         finally
415         {
416             _tailLock.unlock();
417         }
418     }
419     
420     /* ------------------------------------------------------------ */
421     @Override
422     public E remove(int index)
423     {
424         _tailLock.lock();
425         try
426         {
427             _headLock.lock();
428             try
429             {
430 
431                 if (index<0 || index>=_size.get())
432                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
433 
434                 int i = _head+index;
435                 if (i>=_capacity)
436                     i-=_capacity;
437                 E old=(E)_elements[i];
438 
439                 if (i<_tail)
440                 {
441                     System.arraycopy(_elements,i+1,_elements,i,_tail-i);
442                     _tail--;
443                     _size.decrementAndGet();
444                 }
445                 else
446                 {
447                     System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
448                     if (_tail>0)
449                     {
450                         _elements[_capacity]=_elements[0];
451                         System.arraycopy(_elements,1,_elements,0,_tail-1);
452                         _tail--;
453                     }
454                     else
455                         _tail=_capacity-1;
456 
457                     _size.decrementAndGet();
458                 }
459 
460                 return old;
461             }
462             finally
463             {
464                 _headLock.unlock();
465             }
466         }
467         finally
468         {
469             _tailLock.unlock();
470         }
471     }
472 
473     /* ------------------------------------------------------------ */
474     @Override
475     public E set(int index, E e)
476     {
477         if (e == null) 
478             throw new NullPointerException();
479 
480         _tailLock.lock();
481         try
482         {
483             _headLock.lock();
484             try
485             {
486 
487                 if (index<0 || index>=_size.get())
488                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
489 
490                 int i = _head+index;
491                 if (i>=_capacity)
492                     i-=_capacity;
493                 E old=(E)_elements[i];
494                 _elements[i]=e;
495                 return old;
496             }
497             finally
498             {
499                 _headLock.unlock();
500             }
501         }
502         finally
503         {
504             _tailLock.unlock();
505         }
506     }
507     
508     /* ------------------------------------------------------------ */
509     @Override
510     public void add(int index, E e)
511     {
512         if (e == null) 
513             throw new NullPointerException();
514 
515         _tailLock.lock();
516         try
517         {
518             _headLock.lock();
519             try
520             {
521 
522                 if (index<0 || index>_size.get())
523                     throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
524 
525                 if (index==_size.get())
526                 {
527                     add(e);
528                 }
529                 else
530                 {
531                     if (_tail==_head)
532                         if (!grow())
533                             throw new IllegalStateException("full");
534 
535                     int i = _head+index;
536                     if (i>=_capacity)
537                         i-=_capacity;
538 
539                     _size.incrementAndGet();
540                     _tail=(_tail+1)%_capacity;
541 
542 
543                     if (i<_tail)
544                     {
545                         System.arraycopy(_elements,i,_elements,i+1,_tail-i);
546                         _elements[i]=e;
547                     }
548                     else
549                     {
550                         if (_tail>0)
551                         {
552                             System.arraycopy(_elements,0,_elements,1,_tail);
553                             _elements[0]=_elements[_capacity-1];
554                         }
555 
556                         System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
557                         _elements[i]=e;
558                     }
559                 }
560             }
561             finally
562             {
563                 _headLock.unlock();
564             }
565         }
566         finally
567         {
568             _tailLock.unlock();
569         }
570     }
571 
572     /* ------------------------------------------------------------ */
573     private boolean grow()
574     {
575         if (_growCapacity<=0)
576             return false;
577 
578         _tailLock.lock();
579         try
580         {
581             _headLock.lock();
582             try
583             {
584                 final int head=_head;
585                 final int tail=_tail;
586                 final int new_tail;
587 
588                 Object[] elements=new Object[_capacity+_growCapacity];
589 
590                 if (head<tail)
591                 {
592                     new_tail=tail-head;
593                     System.arraycopy(_elements,head,elements,0,new_tail);
594                 }
595                 else if (head>tail || _size.get()>0)
596                 {
597                     new_tail=_capacity+tail-head;
598                     int cut=_capacity-head;
599                     System.arraycopy(_elements,head,elements,0,cut);
600                     System.arraycopy(_elements,0,elements,cut,tail);
601                 }
602                 else
603                 {
604                     new_tail=0;
605                 }
606 
607                 _elements=elements;
608                 _capacity=_elements.length;
609                 _head=0;
610                 _tail=new_tail; 
611                 return true;
612             }
613             finally
614             {
615                 _headLock.unlock();
616             }
617         }
618         finally
619         {
620             _tailLock.unlock();
621         }
622 
623     }
624 
625     /* ------------------------------------------------------------ */
626     public int drainTo(Collection<? super E> c)
627     {
628         throw new UnsupportedOperationException();
629     }
630 
631     /* ------------------------------------------------------------ */
632     public int drainTo(Collection<? super E> c, int maxElements)
633     {
634         throw new UnsupportedOperationException();
635     }
636 
637     /* ------------------------------------------------------------ */
638     public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
639     {
640         throw new UnsupportedOperationException();
641     }
642 
643     /* ------------------------------------------------------------ */
644     public void put(E o) throws InterruptedException
645     {
646         if (!add(o))
647             throw new IllegalStateException("full");
648     }
649 
650     /* ------------------------------------------------------------ */
651     public int remainingCapacity()
652     {
653         _tailLock.lock();
654         try
655         {
656             _headLock.lock();
657             try
658             {
659                 return getCapacity()-size();
660             }
661             finally
662             {
663                 _headLock.unlock();
664             }
665         }
666         finally
667         {
668             _tailLock.unlock();
669         }
670     }
671 }