View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util;
20  
21  import java.util.AbstractQueue;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.NoSuchElementException;
27  import java.util.Objects;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import java.util.concurrent.atomic.AtomicIntegerArray;
31  import java.util.concurrent.atomic.AtomicReference;
32  import java.util.concurrent.atomic.AtomicReferenceArray;
33  
34  /**
35   * A concurrent, unbounded implementation of {@link Queue} that uses singly-linked array blocks
36   * to store elements.
37   * <p>
38   * This class is a drop-in replacement for {@link ConcurrentLinkedQueue}, with similar performance
39   * but producing less garbage because arrays are used to store elements rather than nodes.
40   * </p>
41   * <p>
42   * The algorithm used is a variation of the algorithm from Gidenstam, Sundell and Tsigas
43   * (http://www.adm.hb.se/~AGD/Presentations/CacheAwareQueue_OPODIS.pdf).
44   * </p>
45   *
46   * @param <T> the Array entry type
47   */
48  public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
49  {
50      public static final int DEFAULT_BLOCK_SIZE = 512;
51      public static final Object REMOVED_ELEMENT = new Object()
52      {
53          @Override
54          public String toString()
55          {
56              return "X";
57          }
58      };
59  
60      private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
61      private static final int TAIL_OFFSET = MemoryUtils.getIntegersPerCacheLine()*2 -1;
62  
63      private final AtomicReferenceArray<Block<T>> _blocks = new AtomicReferenceArray<>(TAIL_OFFSET + 1);
64      private final int _blockSize;
65  
66      public ConcurrentArrayQueue()
67      {
68          this(DEFAULT_BLOCK_SIZE);
69      }
70  
71      public ConcurrentArrayQueue(int blockSize)
72      {
73          _blockSize = blockSize;
74          Block<T> block = newBlock();
75          _blocks.set(HEAD_OFFSET,block);
76          _blocks.set(TAIL_OFFSET,block);
77      }
78  
79      public int getBlockSize()
80      {
81          return _blockSize;
82      }
83  
84      protected Block<T> getHeadBlock()
85      {
86          return _blocks.get(HEAD_OFFSET);
87      }
88  
89      protected Block<T> getTailBlock()
90      {
91          return _blocks.get(TAIL_OFFSET);
92      }
93  
94      @Override
95      public boolean offer(T item)
96      {
97          item = Objects.requireNonNull(item);
98  
99          final Block<T> initialTailBlock = getTailBlock();
100         Block<T> currentTailBlock = initialTailBlock;
101         int tail = currentTailBlock.tail();
102         while (true)
103         {
104             if (tail == getBlockSize())
105             {
106                 Block<T> nextTailBlock = currentTailBlock.next();
107                 if (nextTailBlock == null)
108                 {
109                     nextTailBlock = newBlock();
110                     if (currentTailBlock.link(nextTailBlock))
111                     {
112                         // Linking succeeded, loop
113                         currentTailBlock = nextTailBlock;
114                     }
115                     else
116                     {
117                         // Concurrent linking, use other block and loop
118                         currentTailBlock = currentTailBlock.next();
119                     }
120                 }
121                 else
122                 {
123                     // Not at last block, loop
124                     currentTailBlock = nextTailBlock;
125                 }
126                 tail = currentTailBlock.tail();
127             }
128             else
129             {
130                 if (currentTailBlock.peek(tail) == null)
131                 {
132                     if (currentTailBlock.store(tail, item))
133                     {
134                         // Item stored
135                         break;
136                     }
137                     else
138                     {
139                         // Concurrent store, try next index
140                         ++tail;
141                     }
142                 }
143                 else
144                 {
145                     // Not free, try next index
146                     ++tail;
147                 }
148             }
149         }
150 
151         updateTailBlock(initialTailBlock, currentTailBlock);
152 
153         return true;
154     }
155 
156     private void updateTailBlock(Block<T> oldTailBlock, Block<T> newTailBlock)
157     {
158         // Update the tail block pointer if needs to
159         if (oldTailBlock != newTailBlock)
160         {
161             // The tail block pointer is allowed to lag behind.
162             // If this update fails, it means that other threads
163             // have filled this block and installed a new one.
164             casTailBlock(oldTailBlock, newTailBlock);
165         }
166     }
167 
168     protected boolean casTailBlock(Block<T> current, Block<T> update)
169     {
170         return _blocks.compareAndSet(TAIL_OFFSET,current,update);
171     }
172 
173     @SuppressWarnings("unchecked")
174     @Override
175     public T poll()
176     {
177         final Block<T> initialHeadBlock = getHeadBlock();
178         Block<T> currentHeadBlock = initialHeadBlock;
179         int head = currentHeadBlock.head();
180         T result = null;
181         while (true)
182         {
183             if (head == getBlockSize())
184             {
185                 Block<T> nextHeadBlock = currentHeadBlock.next();
186                 if (nextHeadBlock == null)
187                 {
188                     // We could have read that the next head block was null
189                     // but another thread allocated a new block and stored a
190                     // new item. This thread could not detect this, but that
191                     // is ok, otherwise we would not be able to exit this loop.
192 
193                     // Queue is empty
194                     break;
195                 }
196                 else
197                 {
198                     // Use next block and loop
199                     currentHeadBlock = nextHeadBlock;
200                     head = currentHeadBlock.head();
201                 }
202             }
203             else
204             {
205                 Object element = currentHeadBlock.peek(head);
206                 if (element == REMOVED_ELEMENT)
207                 {
208                     // Already removed, try next index
209                     ++head;
210                 }
211                 else
212                 {
213                     result = (T)element;
214                     if (result != null)
215                     {
216                         if (currentHeadBlock.remove(head, result, true))
217                         {
218                             // Item removed
219                             break;
220                         }
221                         else
222                         {
223                             // Concurrent remove, try next index
224                             ++head;
225                         }
226                     }
227                     else
228                     {
229                         // Queue is empty
230                         break;
231                     }
232                 }
233             }
234         }
235 
236         updateHeadBlock(initialHeadBlock, currentHeadBlock);
237 
238         return result;
239     }
240 
241     private void updateHeadBlock(Block<T> oldHeadBlock, Block<T> newHeadBlock)
242     {
243         // Update the head block pointer if needs to
244         if (oldHeadBlock != newHeadBlock)
245         {
246             // The head block pointer lagged behind.
247             // If this update fails, it means that other threads
248             // have emptied this block and pointed to a new one.
249             casHeadBlock(oldHeadBlock, newHeadBlock);
250         }
251     }
252 
253     protected boolean casHeadBlock(Block<T> current, Block<T> update)
254     {
255         return _blocks.compareAndSet(HEAD_OFFSET,current,update);
256     }
257 
258     @Override
259     public T peek()
260     {
261         Block<T> currentHeadBlock = getHeadBlock();
262         int head = currentHeadBlock.head();
263         while (true)
264         {
265             if (head == getBlockSize())
266             {
267                 Block<T> nextHeadBlock = currentHeadBlock.next();
268                 if (nextHeadBlock == null)
269                 {
270                     // Queue is empty
271                     return null;
272                 }
273                 else
274                 {
275                     // Use next block and loop
276                     currentHeadBlock = nextHeadBlock;
277                     head = currentHeadBlock.head();
278                 }
279             }
280             else
281             {
282                 T element = currentHeadBlock.peek(head);
283                 if (element == REMOVED_ELEMENT)
284                 {
285                     // Already removed, try next index
286                     ++head;
287                 }
288                 else
289                 {
290                     return element;
291                 }
292             }
293         }
294     }
295 
296     @Override
297     public boolean remove(Object o)
298     {
299         Block<T> currentHeadBlock = getHeadBlock();
300         int head = currentHeadBlock.head();
301         boolean result = false;
302         while (true)
303         {
304             if (head == getBlockSize())
305             {
306                 Block<T> nextHeadBlock = currentHeadBlock.next();
307                 if (nextHeadBlock == null)
308                 {
309                     // Not found
310                     break;
311                 }
312                 else
313                 {
314                     // Use next block and loop
315                     currentHeadBlock = nextHeadBlock;
316                     head = currentHeadBlock.head();
317                 }
318             }
319             else
320             {
321                 Object element = currentHeadBlock.peek(head);
322                 if (element == REMOVED_ELEMENT)
323                 {
324                     // Removed, try next index
325                     ++head;
326                 }
327                 else
328                 {
329                     if (element == null)
330                     {
331                         // Not found
332                         break;
333                     }
334                     else
335                     {
336                         if (element.equals(o))
337                         {
338                             // Found
339                             if (currentHeadBlock.remove(head, o, false))
340                             {
341                                 result = true;
342                                 break;
343                             }
344                             else
345                             {
346                                 ++head;
347                             }
348                         }
349                         else
350                         {
351                             // Not the one we're looking for
352                             ++head;
353                         }
354                     }
355                 }
356             }
357         }
358 
359         return result;
360     }
361 
362     @Override
363     public boolean removeAll(Collection<?> c)
364     {
365         // TODO: super invocations are based on iterator.remove(), which throws
366         return super.removeAll(c);
367     }
368 
369     @Override
370     public boolean retainAll(Collection<?> c)
371     {
372         // TODO: super invocations are based on iterator.remove(), which throws
373         return super.retainAll(c);
374     }
375 
376     @Override
377     public Iterator<T> iterator()
378     {
379         final List<Object[]> blocks = new ArrayList<>();
380         Block<T> currentHeadBlock = getHeadBlock();
381         while (currentHeadBlock != null)
382         {
383             Object[] elements = currentHeadBlock.arrayCopy();
384             blocks.add(elements);
385             currentHeadBlock = currentHeadBlock.next();
386         }
387         return new Iterator<T>()
388         {
389             private int blockIndex;
390             private int index;
391 
392             @Override
393             public boolean hasNext()
394             {
395                 while (true)
396                 {
397                     if (blockIndex == blocks.size())
398                         return false;
399 
400                     Object element = blocks.get(blockIndex)[index];
401 
402                     if (element == null)
403                         return false;
404 
405                     if (element != REMOVED_ELEMENT)
406                         return true;
407 
408                     advance();
409                 }
410             }
411 
412             @Override
413             public T next()
414             {
415                 while (true)
416                 {
417                     if (blockIndex == blocks.size())
418                         throw new NoSuchElementException();
419 
420                     Object element = blocks.get(blockIndex)[index];
421 
422                     if (element == null)
423                         throw new NoSuchElementException();
424 
425                     advance();
426 
427                     if (element != REMOVED_ELEMENT) {
428                         @SuppressWarnings("unchecked")
429                         T e = (T)element;
430                         return e;
431                     }
432                 }
433             }
434 
435             private void advance()
436             {
437                 if (++index == getBlockSize())
438                 {
439                     index = 0;
440                     ++blockIndex;
441                 }
442             }
443 
444             @Override
445             public void remove()
446             {
447                 throw new UnsupportedOperationException();
448             }
449         };
450     }
451 
452     @Override
453     public int size()
454     {
455         Block<T> currentHeadBlock = getHeadBlock();
456         int head = currentHeadBlock.head();
457         int size = 0;
458         while (true)
459         {
460             if (head == getBlockSize())
461             {
462                 Block<T> nextHeadBlock = currentHeadBlock.next();
463                 if (nextHeadBlock == null)
464                 {
465                     break;
466                 }
467                 else
468                 {
469                     // Use next block and loop
470                     currentHeadBlock = nextHeadBlock;
471                     head = currentHeadBlock.head();
472                 }
473             }
474             else
475             {
476                 Object element = currentHeadBlock.peek(head);
477                 if (element == REMOVED_ELEMENT)
478                 {
479                     // Already removed, try next index
480                     ++head;
481                 }
482                 else if (element != null)
483                 {
484                     ++size;
485                     ++head;
486                 }
487                 else
488                 {
489                     break;
490                 }
491             }
492         }
493         return size;
494     }
495 
496     protected Block<T> newBlock()
497     {
498         return new Block<>(getBlockSize());
499     }
500 
501     protected int getBlockCount()
502     {
503         int result = 0;
504         Block<T> headBlock = getHeadBlock();
505         while (headBlock != null)
506         {
507             ++result;
508             headBlock = headBlock.next();
509         }
510         return result;
511     }
512 
513     protected static final class Block<E>
514     {
515         private static final int headOffset = MemoryUtils.getIntegersPerCacheLine()-1;
516         private static final int tailOffset = MemoryUtils.getIntegersPerCacheLine()*2-1;
517 
518         private final AtomicReferenceArray<Object> elements;
519         private final AtomicReference<Block<E>> next = new AtomicReference<>();
520         private final AtomicIntegerArray indexes = new AtomicIntegerArray(TAIL_OFFSET+1);
521 
522         protected Block(int blockSize)
523         {
524             elements = new AtomicReferenceArray<>(blockSize);
525         }
526 
527         @SuppressWarnings("unchecked")
528         public E peek(int index)
529         {
530             return (E)elements.get(index);
531         }
532 
533         public boolean store(int index, E item)
534         {
535             boolean result = elements.compareAndSet(index, null, item);
536             if (result)
537                 indexes.incrementAndGet(tailOffset);
538             return result;
539         }
540 
541         public boolean remove(int index, Object item, boolean updateHead)
542         {
543             boolean result = elements.compareAndSet(index, item, REMOVED_ELEMENT);
544             if (result && updateHead)
545                 indexes.incrementAndGet(headOffset);
546             return result;
547         }
548 
549         public Block<E> next()
550         {
551             return next.get();
552         }
553 
554         public boolean link(Block<E> nextBlock)
555         {
556             return next.compareAndSet(null, nextBlock);
557         }
558 
559         public int head()
560         {
561             return indexes.get(headOffset);
562         }
563 
564         public int tail()
565         {
566             return indexes.get(tailOffset);
567         }
568 
569         public Object[] arrayCopy()
570         {
571             Object[] result = new Object[elements.length()];
572             for (int i = 0; i < result.length; ++i)
573                 result[i] = elements.get(i);
574             return result;
575         }
576     }
577 }