View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util;
20  
21  import java.util.AbstractQueue;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.NoSuchElementException;
27  import java.util.Objects;
28  import java.util.concurrent.atomic.AtomicIntegerArray;
29  import java.util.concurrent.atomic.AtomicReference;
30  import java.util.concurrent.atomic.AtomicReferenceArray;
31  
32  /**
33   * A concurrent, unbounded implementation of {@link Queue} that uses singly-linked array blocks
34   * to store elements.
35   * <p/>
36   * This class is a drop-in replacement for {@link ConcurrentLinkedQueue}, with similar performance
37   * but producing less garbage because arrays are used to store elements rather than nodes.
38   * <p/>
39   * The algorithm used is a variation of the algorithm from Gidenstam, Sundell and Tsigas
40   * (http://www.adm.hb.se/~AGD/Presentations/CacheAwareQueue_OPODIS.pdf).
41   *
42   * @param <T>
43   */
44  public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
45  {
46      public static final int DEFAULT_BLOCK_SIZE = 512;
47      public static final Object REMOVED_ELEMENT = new Object()
48      {
49          @Override
50          public String toString()
51          {
52              return "X";
53          }
54      };
55  
56      private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
57      private static final int TAIL_OFFSET = MemoryUtils.getIntegersPerCacheLine()*2 -1;
58  
59      private final AtomicReferenceArray<Block<T>> _blocks = new AtomicReferenceArray<>(TAIL_OFFSET + 1);
60      private final int _blockSize;
61  
62      public ConcurrentArrayQueue()
63      {
64          this(DEFAULT_BLOCK_SIZE);
65      }
66  
67      public ConcurrentArrayQueue(int blockSize)
68      {
69          _blockSize = blockSize;
70          Block<T> block = newBlock();
71          _blocks.set(HEAD_OFFSET,block);
72          _blocks.set(TAIL_OFFSET,block);
73      }
74  
75      public int getBlockSize()
76      {
77          return _blockSize;
78      }
79  
80      protected Block<T> getHeadBlock()
81      {
82          return _blocks.get(HEAD_OFFSET);
83      }
84  
85      protected Block<T> getTailBlock()
86      {
87          return _blocks.get(TAIL_OFFSET);
88      }
89  
90      @Override
91      public boolean offer(T item)
92      {
93          item = Objects.requireNonNull(item);
94  
95          final Block<T> initialTailBlock = getTailBlock();
96          Block<T> currentTailBlock = initialTailBlock;
97          int tail = currentTailBlock.tail();
98          while (true)
99          {
100             if (tail == getBlockSize())
101             {
102                 Block<T> nextTailBlock = currentTailBlock.next();
103                 if (nextTailBlock == null)
104                 {
105                     nextTailBlock = newBlock();
106                     if (currentTailBlock.link(nextTailBlock))
107                     {
108                         // Linking succeeded, loop
109                         currentTailBlock = nextTailBlock;
110                     }
111                     else
112                     {
113                         // Concurrent linking, use other block and loop
114                         currentTailBlock = currentTailBlock.next();
115                     }
116                 }
117                 else
118                 {
119                     // Not at last block, loop
120                     currentTailBlock = nextTailBlock;
121                 }
122                 tail = currentTailBlock.tail();
123             }
124             else
125             {
126                 if (currentTailBlock.peek(tail) == null)
127                 {
128                     if (currentTailBlock.store(tail, item))
129                     {
130                         // Item stored
131                         break;
132                     }
133                     else
134                     {
135                         // Concurrent store, try next index
136                         ++tail;
137                     }
138                 }
139                 else
140                 {
141                     // Not free, try next index
142                     ++tail;
143                 }
144             }
145         }
146 
147         updateTailBlock(initialTailBlock, currentTailBlock);
148 
149         return true;
150     }
151 
152     private void updateTailBlock(Block<T> oldTailBlock, Block<T> newTailBlock)
153     {
154         // Update the tail block pointer if needs to
155         if (oldTailBlock != newTailBlock)
156         {
157             // The tail block pointer is allowed to lag behind.
158             // If this update fails, it means that other threads
159             // have filled this block and installed a new one.
160             casTailBlock(oldTailBlock, newTailBlock);
161         }
162     }
163 
164     protected boolean casTailBlock(Block<T> current, Block<T> update)
165     {
166         return _blocks.compareAndSet(TAIL_OFFSET,current,update);
167     }
168 
169     @SuppressWarnings("unchecked")
170     @Override
171     public T poll()
172     {
173         final Block<T> initialHeadBlock = getHeadBlock();
174         Block<T> currentHeadBlock = initialHeadBlock;
175         int head = currentHeadBlock.head();
176         T result = null;
177         while (true)
178         {
179             if (head == getBlockSize())
180             {
181                 Block<T> nextHeadBlock = currentHeadBlock.next();
182                 if (nextHeadBlock == null)
183                 {
184                     // We could have read that the next head block was null
185                     // but another thread allocated a new block and stored a
186                     // new item. This thread could not detect this, but that
187                     // is ok, otherwise we would not be able to exit this loop.
188 
189                     // Queue is empty
190                     break;
191                 }
192                 else
193                 {
194                     // Use next block and loop
195                     currentHeadBlock = nextHeadBlock;
196                     head = currentHeadBlock.head();
197                 }
198             }
199             else
200             {
201                 Object element = currentHeadBlock.peek(head);
202                 if (element == REMOVED_ELEMENT)
203                 {
204                     // Already removed, try next index
205                     ++head;
206                 }
207                 else
208                 {
209                     result = (T)element;
210                     if (result != null)
211                     {
212                         if (currentHeadBlock.remove(head, result, true))
213                         {
214                             // Item removed
215                             break;
216                         }
217                         else
218                         {
219                             // Concurrent remove, try next index
220                             ++head;
221                         }
222                     }
223                     else
224                     {
225                         // Queue is empty
226                         break;
227                     }
228                 }
229             }
230         }
231 
232         updateHeadBlock(initialHeadBlock, currentHeadBlock);
233 
234         return result;
235     }
236 
237     private void updateHeadBlock(Block<T> oldHeadBlock, Block<T> newHeadBlock)
238     {
239         // Update the head block pointer if needs to
240         if (oldHeadBlock != newHeadBlock)
241         {
242             // The head block pointer lagged behind.
243             // If this update fails, it means that other threads
244             // have emptied this block and pointed to a new one.
245             casHeadBlock(oldHeadBlock, newHeadBlock);
246         }
247     }
248 
249     protected boolean casHeadBlock(Block<T> current, Block<T> update)
250     {
251         return _blocks.compareAndSet(HEAD_OFFSET,current,update);
252     }
253 
254     @Override
255     public T peek()
256     {
257         Block<T> currentHeadBlock = getHeadBlock();
258         int head = currentHeadBlock.head();
259         while (true)
260         {
261             if (head == getBlockSize())
262             {
263                 Block<T> nextHeadBlock = currentHeadBlock.next();
264                 if (nextHeadBlock == null)
265                 {
266                     // Queue is empty
267                     return null;
268                 }
269                 else
270                 {
271                     // Use next block and loop
272                     currentHeadBlock = nextHeadBlock;
273                     head = currentHeadBlock.head();
274                 }
275             }
276             else
277             {
278                 T element = currentHeadBlock.peek(head);
279                 if (element == REMOVED_ELEMENT)
280                 {
281                     // Already removed, try next index
282                     ++head;
283                 }
284                 else
285                 {
286                     return element;
287                 }
288             }
289         }
290     }
291 
292     @Override
293     public boolean remove(Object o)
294     {
295         Block<T> currentHeadBlock = getHeadBlock();
296         int head = currentHeadBlock.head();
297         boolean result = false;
298         while (true)
299         {
300             if (head == getBlockSize())
301             {
302                 Block<T> nextHeadBlock = currentHeadBlock.next();
303                 if (nextHeadBlock == null)
304                 {
305                     // Not found
306                     break;
307                 }
308                 else
309                 {
310                     // Use next block and loop
311                     currentHeadBlock = nextHeadBlock;
312                     head = currentHeadBlock.head();
313                 }
314             }
315             else
316             {
317                 Object element = currentHeadBlock.peek(head);
318                 if (element == REMOVED_ELEMENT)
319                 {
320                     // Removed, try next index
321                     ++head;
322                 }
323                 else
324                 {
325                     if (element == null)
326                     {
327                         // Not found
328                         break;
329                     }
330                     else
331                     {
332                         if (element.equals(o))
333                         {
334                             // Found
335                             if (currentHeadBlock.remove(head, o, false))
336                             {
337                                 result = true;
338                                 break;
339                             }
340                             else
341                             {
342                                 ++head;
343                             }
344                         }
345                         else
346                         {
347                             // Not the one we're looking for
348                             ++head;
349                         }
350                     }
351                 }
352             }
353         }
354 
355         return result;
356     }
357 
358     @Override
359     public boolean removeAll(Collection<?> c)
360     {
361         // TODO: super invocations are based on iterator.remove(), which throws
362         return super.removeAll(c);
363     }
364 
365     @Override
366     public boolean retainAll(Collection<?> c)
367     {
368         // TODO: super invocations are based on iterator.remove(), which throws
369         return super.retainAll(c);
370     }
371 
372     @Override
373     public Iterator<T> iterator()
374     {
375         final List<Object[]> blocks = new ArrayList<>();
376         Block<T> currentHeadBlock = getHeadBlock();
377         while (currentHeadBlock != null)
378         {
379             Object[] elements = currentHeadBlock.arrayCopy();
380             blocks.add(elements);
381             currentHeadBlock = currentHeadBlock.next();
382         }
383         return new Iterator<T>()
384         {
385             private int blockIndex;
386             private int index;
387 
388             @Override
389             public boolean hasNext()
390             {
391                 while (true)
392                 {
393                     if (blockIndex == blocks.size())
394                         return false;
395 
396                     Object element = blocks.get(blockIndex)[index];
397 
398                     if (element == null)
399                         return false;
400 
401                     if (element != REMOVED_ELEMENT)
402                         return true;
403 
404                     advance();
405                 }
406             }
407 
408             @Override
409             public T next()
410             {
411                 while (true)
412                 {
413                     if (blockIndex == blocks.size())
414                         throw new NoSuchElementException();
415 
416                     Object element = blocks.get(blockIndex)[index];
417 
418                     if (element == null)
419                         throw new NoSuchElementException();
420 
421                     advance();
422 
423                     if (element != REMOVED_ELEMENT) {
424                         @SuppressWarnings("unchecked")
425                         T e = (T)element;
426                         return e;
427                     }
428                 }
429             }
430 
431             private void advance()
432             {
433                 if (++index == getBlockSize())
434                 {
435                     index = 0;
436                     ++blockIndex;
437                 }
438             }
439 
440             @Override
441             public void remove()
442             {
443                 throw new UnsupportedOperationException();
444             }
445         };
446     }
447 
448     @Override
449     public int size()
450     {
451         Block<T> currentHeadBlock = getHeadBlock();
452         int head = currentHeadBlock.head();
453         int size = 0;
454         while (true)
455         {
456             if (head == getBlockSize())
457             {
458                 Block<T> nextHeadBlock = currentHeadBlock.next();
459                 if (nextHeadBlock == null)
460                 {
461                     break;
462                 }
463                 else
464                 {
465                     // Use next block and loop
466                     currentHeadBlock = nextHeadBlock;
467                     head = currentHeadBlock.head();
468                 }
469             }
470             else
471             {
472                 Object element = currentHeadBlock.peek(head);
473                 if (element == REMOVED_ELEMENT)
474                 {
475                     // Already removed, try next index
476                     ++head;
477                 }
478                 else if (element != null)
479                 {
480                     ++size;
481                     ++head;
482                 }
483                 else
484                 {
485                     break;
486                 }
487             }
488         }
489         return size;
490     }
491 
492     protected Block<T> newBlock()
493     {
494         return new Block<>(getBlockSize());
495     }
496 
497     protected int getBlockCount()
498     {
499         int result = 0;
500         Block<T> headBlock = getHeadBlock();
501         while (headBlock != null)
502         {
503             ++result;
504             headBlock = headBlock.next();
505         }
506         return result;
507     }
508 
509     protected static final class Block<E>
510     {
511         private static final int headOffset = MemoryUtils.getIntegersPerCacheLine()-1;
512         private static final int tailOffset = MemoryUtils.getIntegersPerCacheLine()*2-1;
513 
514         private final AtomicReferenceArray<Object> elements;
515         private final AtomicReference<Block<E>> next = new AtomicReference<>();
516         private final AtomicIntegerArray indexes = new AtomicIntegerArray(TAIL_OFFSET+1);
517 
518         protected Block(int blockSize)
519         {
520             elements = new AtomicReferenceArray<>(blockSize);
521         }
522 
523         @SuppressWarnings("unchecked")
524         public E peek(int index)
525         {
526             return (E)elements.get(index);
527         }
528 
529         public boolean store(int index, E item)
530         {
531             boolean result = elements.compareAndSet(index, null, item);
532             if (result)
533                 indexes.incrementAndGet(tailOffset);
534             return result;
535         }
536 
537         public boolean remove(int index, Object item, boolean updateHead)
538         {
539             boolean result = elements.compareAndSet(index, item, REMOVED_ELEMENT);
540             if (result && updateHead)
541                 indexes.incrementAndGet(headOffset);
542             return result;
543         }
544 
545         public Block<E> next()
546         {
547             return next.get();
548         }
549 
550         public boolean link(Block<E> nextBlock)
551         {
552             return next.compareAndSet(null, nextBlock);
553         }
554 
555         public int head()
556         {
557             return indexes.get(headOffset);
558         }
559 
560         public int tail()
561         {
562             return indexes.get(tailOffset);
563         }
564 
565         public Object[] arrayCopy()
566         {
567             Object[] result = new Object[elements.length()];
568             for (int i = 0; i < result.length; ++i)
569                 result[i] = elements.get(i);
570             return result;
571         }
572     }
573 }