View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util;
20  
21  import java.util.Collection;
22  import java.util.Objects;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.atomic.AtomicInteger;
26  import java.util.concurrent.atomic.AtomicLongArray;
27  import java.util.concurrent.locks.Condition;
28  import java.util.concurrent.locks.Lock;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  /**
32   * Common functionality for a blocking version of {@link ConcurrentArrayQueue}.
33   *
34   * @see Unbounded
35   * @see Bounded
36   * @param <E>
37   */
38  public abstract class ConcurrentArrayBlockingQueue<E> extends ConcurrentArrayQueue<E> implements BlockingQueue<E>
39  {
40      private final Lock _lock = new ReentrantLock();
41      private final Condition _consumer = _lock.newCondition();
42  
43      public ConcurrentArrayBlockingQueue(int blockSize)
44      {
45          super(blockSize);
46      }
47  
48      @Override
49      public E poll()
50      {
51          E result = super.poll();
52          if (result != null && decrementAndGetSize() > 0)
53              signalConsumer();
54          return result;
55      }
56  
57      @Override
58      public boolean remove(Object o)
59      {
60          boolean result = super.remove(o);
61          if (result && decrementAndGetSize() > 0)
62              signalConsumer();
63          return result;
64      }
65  
66      protected abstract int decrementAndGetSize();
67  
68      protected void signalConsumer()
69      {
70          final Lock lock = _lock;
71          lock.lock();
72          try
73          {
74              _consumer.signal();
75          }
76          finally
77          {
78              lock.unlock();
79          }
80      }
81  
82      @Override
83      public E take() throws InterruptedException
84      {
85          while (true)
86          {
87              E result = poll();
88              if (result != null)
89                  return result;
90  
91              final Lock lock = _lock;
92              lock.lockInterruptibly();
93              try
94              {
95                  if (size() == 0)
96                  {
97                      _consumer.await();
98                  }
99              }
100             finally
101             {
102                 lock.unlock();
103             }
104         }
105     }
106 
107     @Override
108     public E poll(long timeout, TimeUnit unit) throws InterruptedException
109     {
110         long nanos = unit.toNanos(timeout);
111         
112         while (true)
113         {
114             // TODO should reduce nanos if we spin here
115             
116             E result = poll();
117             if (result != null)
118                 return result;
119 
120             final Lock lock = _lock;
121             lock.lockInterruptibly();
122             try
123             {
124                 if (size() == 0)
125                 {
126                     if (nanos <= 0)
127                         return null;
128                     nanos = _consumer.awaitNanos(nanos);
129                 }
130             }
131             finally
132             {
133                 lock.unlock();
134             }
135         }
136     }
137 
138     @Override
139     public int drainTo(Collection<? super E> c)
140     {
141         return drainTo(c, Integer.MAX_VALUE);
142     }
143 
144     @Override
145     public int drainTo(Collection<? super E> c, int maxElements)
146     {
147         if (c == this)
148             throw new IllegalArgumentException();
149 
150         int added = 0;
151         while (added < maxElements)
152         {
153             E element = poll();
154             if (element == null)
155                 break;
156             c.add(element);
157             ++added;
158         }
159         return added;
160     }
161 
162     /**
163      * An unbounded, blocking version of {@link ConcurrentArrayQueue}.
164      *
165      * @param <E>
166      */
167     public static class Unbounded<E> extends ConcurrentArrayBlockingQueue<E>
168     {
169         private static final int SIZE_LEFT_OFFSET = MemoryUtils.getLongsPerCacheLine() - 1;
170         private static final int SIZE_RIGHT_OFFSET = SIZE_LEFT_OFFSET + MemoryUtils.getLongsPerCacheLine();
171         
172         private final AtomicLongArray _sizes = new AtomicLongArray(SIZE_RIGHT_OFFSET+1);
173 
174         public Unbounded()
175         {
176             this(DEFAULT_BLOCK_SIZE);
177         }
178 
179         public Unbounded(int blockSize)
180         {
181             super(blockSize);
182         }
183 
184         @Override
185         public boolean offer(E item)
186         {
187             boolean result = super.offer(item);
188             if (result && getAndIncrementSize() == 0)
189                 signalConsumer();
190             return result;
191         }
192 
193         private int getAndIncrementSize()
194         {
195             long sizeRight = _sizes.getAndIncrement(SIZE_RIGHT_OFFSET);
196             long sizeLeft = _sizes.get(SIZE_LEFT_OFFSET);
197             return (int)(sizeRight - sizeLeft);
198         }
199 
200         @Override
201         protected int decrementAndGetSize()
202         {
203             long sizeLeft = _sizes.incrementAndGet(SIZE_LEFT_OFFSET);
204             long sizeRight = _sizes.get(SIZE_RIGHT_OFFSET);
205             return (int)(sizeRight - sizeLeft);
206         }
207 
208         @Override
209         public int size()
210         {
211             long sizeLeft = _sizes.get(SIZE_LEFT_OFFSET);
212             long sizeRight = _sizes.get(SIZE_RIGHT_OFFSET);
213             return (int)(sizeRight - sizeLeft);
214         }
215 
216         @Override
217         public int remainingCapacity()
218         {
219             return Integer.MAX_VALUE;
220         }
221 
222         @Override
223         public void put(E element) throws InterruptedException
224         {
225             offer(element);
226         }
227 
228         @Override
229         public boolean offer(E element, long timeout, TimeUnit unit) throws InterruptedException
230         {
231             return offer(element);
232         }
233     }
234 
235     /**
236      * A bounded, blocking version of {@link ConcurrentArrayQueue}.
237      *
238      * @param <E>
239      */
240     public static class Bounded<E> extends ConcurrentArrayBlockingQueue<E>
241     {
242         private final AtomicInteger _size = new AtomicInteger();
243         private final Lock _lock = new ReentrantLock();
244         private final Condition _producer = _lock.newCondition();
245         private final int _capacity;
246 
247         public Bounded(int capacity)
248         {
249             this(DEFAULT_BLOCK_SIZE, capacity);
250         }
251 
252         public Bounded(int blockSize, int capacity)
253         {
254             super(blockSize);
255             this._capacity = capacity;
256         }
257 
258         @Override
259         public boolean offer(E item)
260         {
261             while (true)
262             {
263                 int size = size();
264                 int nextSize = size + 1;
265 
266                 if (nextSize > _capacity)
267                     return false;
268 
269                 if (_size.compareAndSet(size, nextSize))
270                 {
271                     if (super.offer(item))
272                     {
273                         if (size == 0)
274                             signalConsumer();
275                         return true;
276                     }
277                     else
278                     {
279                         decrementAndGetSize();
280                     }
281                 }
282             }
283         }
284 
285         @Override
286         public E poll()
287         {
288             E result = super.poll();
289             if (result != null)
290                 signalProducer();
291             return result;
292         }
293 
294         @Override
295         public boolean remove(Object o)
296         {
297             boolean result = super.remove(o);
298             if (result)
299                 signalProducer();
300             return result;
301         }
302 
303         @Override
304         protected int decrementAndGetSize()
305         {
306             return _size.decrementAndGet();
307         }
308 
309         @Override
310         public int size()
311         {
312             return _size.get();
313         }
314 
315         @Override
316         public int remainingCapacity()
317         {
318             return _capacity - size();
319         }
320 
321         @Override
322         public void put(E item) throws InterruptedException
323         {
324             item = Objects.requireNonNull(item);
325 
326             while (true)
327             {
328                 final Lock lock = _lock;
329                 lock.lockInterruptibly();
330                 try
331                 {
332                     if (size() == _capacity)
333                         _producer.await();
334                 }
335                 finally
336                 {
337                     lock.unlock();
338                 }
339                 if (offer(item))
340                     break;
341             }
342         }
343 
344         @Override
345         public boolean offer(E item, long timeout, TimeUnit unit) throws InterruptedException
346         {
347             item = Objects.requireNonNull(item);
348 
349             long nanos = unit.toNanos(timeout);
350             while (true)
351             {
352                 final Lock lock = _lock;
353                 lock.lockInterruptibly();
354                 try
355                 {
356                     if (size() == _capacity)
357                     {
358                         if (nanos <= 0)
359                             return false;
360                         nanos = _producer.awaitNanos(nanos);
361                     }
362                 }
363                 finally
364                 {
365                     lock.unlock();
366                 }
367                 if (offer(item))
368                     break;
369             }
370 
371             return true;
372         }
373 
374         @Override
375         public int drainTo(Collection<? super E> c, int maxElements)
376         {
377             int result = super.drainTo(c, maxElements);
378             if (result > 0)
379                 signalProducers();
380             return result;
381         }
382 
383         @Override
384         public void clear()
385         {
386             super.clear();
387             signalProducers();
388         }
389 
390         private void signalProducer()
391         {
392             final Lock lock = _lock;
393             lock.lock();
394             try
395             {
396                 _producer.signal();
397             }
398             finally
399             {
400                 lock.unlock();
401             }
402         }
403 
404         private void signalProducers()
405         {
406             final Lock lock = _lock;
407             lock.lock();
408             try
409             {
410                 _producer.signalAll();
411             }
412             finally
413             {
414                 lock.unlock();
415             }
416         }
417     }
418 }