View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util;
20  
21  import java.util.AbstractQueue;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.NoSuchElementException;
27  import java.util.Objects;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import java.util.concurrent.atomic.AtomicIntegerArray;
31  import java.util.concurrent.atomic.AtomicReference;
32  import java.util.concurrent.atomic.AtomicReferenceArray;
33  
34  /**
35   * A concurrent, unbounded implementation of {@link Queue} that uses singly-linked array blocks
36   * to store elements.
37   * <p/>
38   * This class is a drop-in replacement for {@link ConcurrentLinkedQueue}, with similar performance
39   * but producing less garbage because arrays are used to store elements rather than nodes.
40   * <p/>
41   * The algorithm used is a variation of the algorithm from Gidenstam, Sundell and Tsigas
42   * (http://www.adm.hb.se/~AGD/Presentations/CacheAwareQueue_OPODIS.pdf).
43   *
44   * @param <T>
45   */
46  public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
47  {
48      public static final int DEFAULT_BLOCK_SIZE = 512;
49      public static final Object REMOVED_ELEMENT = new Object()
50      {
51          @Override
52          public String toString()
53          {
54              return "X";
55          }
56      };
57  
58      private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
59      private static final int TAIL_OFFSET = MemoryUtils.getIntegersPerCacheLine()*2 -1;
60  
61      private final AtomicReferenceArray<Block<T>> _blocks = new AtomicReferenceArray<>(TAIL_OFFSET + 1);
62      private final int _blockSize;
63  
64      public ConcurrentArrayQueue()
65      {
66          this(DEFAULT_BLOCK_SIZE);
67      }
68  
69      public ConcurrentArrayQueue(int blockSize)
70      {
71          _blockSize = blockSize;
72          Block<T> block = newBlock();
73          _blocks.set(HEAD_OFFSET,block);
74          _blocks.set(TAIL_OFFSET,block);
75      }
76  
77      public int getBlockSize()
78      {
79          return _blockSize;
80      }
81  
82      protected Block<T> getHeadBlock()
83      {
84          return _blocks.get(HEAD_OFFSET);
85      }
86  
87      protected Block<T> getTailBlock()
88      {
89          return _blocks.get(TAIL_OFFSET);
90      }
91  
92      @Override
93      public boolean offer(T item)
94      {
95          item = Objects.requireNonNull(item);
96  
97          final Block<T> initialTailBlock = getTailBlock();
98          Block<T> currentTailBlock = initialTailBlock;
99          int tail = currentTailBlock.tail();
100         while (true)
101         {
102             if (tail == getBlockSize())
103             {
104                 Block<T> nextTailBlock = currentTailBlock.next();
105                 if (nextTailBlock == null)
106                 {
107                     nextTailBlock = newBlock();
108                     if (currentTailBlock.link(nextTailBlock))
109                     {
110                         // Linking succeeded, loop
111                         currentTailBlock = nextTailBlock;
112                     }
113                     else
114                     {
115                         // Concurrent linking, use other block and loop
116                         currentTailBlock = currentTailBlock.next();
117                     }
118                 }
119                 else
120                 {
121                     // Not at last block, loop
122                     currentTailBlock = nextTailBlock;
123                 }
124                 tail = currentTailBlock.tail();
125             }
126             else
127             {
128                 if (currentTailBlock.peek(tail) == null)
129                 {
130                     if (currentTailBlock.store(tail, item))
131                     {
132                         // Item stored
133                         break;
134                     }
135                     else
136                     {
137                         // Concurrent store, try next index
138                         ++tail;
139                     }
140                 }
141                 else
142                 {
143                     // Not free, try next index
144                     ++tail;
145                 }
146             }
147         }
148 
149         updateTailBlock(initialTailBlock, currentTailBlock);
150 
151         return true;
152     }
153 
154     private void updateTailBlock(Block<T> oldTailBlock, Block<T> newTailBlock)
155     {
156         // Update the tail block pointer if needs to
157         if (oldTailBlock != newTailBlock)
158         {
159             // The tail block pointer is allowed to lag behind.
160             // If this update fails, it means that other threads
161             // have filled this block and installed a new one.
162             casTailBlock(oldTailBlock, newTailBlock);
163         }
164     }
165 
166     protected boolean casTailBlock(Block<T> current, Block<T> update)
167     {
168         return _blocks.compareAndSet(TAIL_OFFSET,current,update);
169     }
170 
171     @Override
172     public T poll()
173     {
174         final Block<T> initialHeadBlock = getHeadBlock();
175         Block<T> currentHeadBlock = initialHeadBlock;
176         int head = currentHeadBlock.head();
177         T result = null;
178         while (true)
179         {
180             if (head == getBlockSize())
181             {
182                 Block<T> nextHeadBlock = currentHeadBlock.next();
183                 if (nextHeadBlock == null)
184                 {
185                     // We could have read that the next head block was null
186                     // but another thread allocated a new block and stored a
187                     // new item. This thread could not detect this, but that
188                     // is ok, otherwise we would not be able to exit this loop.
189 
190                     // Queue is empty
191                     break;
192                 }
193                 else
194                 {
195                     // Use next block and loop
196                     currentHeadBlock = nextHeadBlock;
197                     head = currentHeadBlock.head();
198                 }
199             }
200             else
201             {
202                 Object element = currentHeadBlock.peek(head);
203                 if (element == REMOVED_ELEMENT)
204                 {
205                     // Already removed, try next index
206                     ++head;
207                 }
208                 else
209                 {
210                     result = (T)element;
211                     if (result != null)
212                     {
213                         if (currentHeadBlock.remove(head, result, true))
214                         {
215                             // Item removed
216                             break;
217                         }
218                         else
219                         {
220                             // Concurrent remove, try next index
221                             ++head;
222                         }
223                     }
224                     else
225                     {
226                         // Queue is empty
227                         break;
228                     }
229                 }
230             }
231         }
232 
233         updateHeadBlock(initialHeadBlock, currentHeadBlock);
234 
235         return result;
236     }
237 
238     private void updateHeadBlock(Block<T> oldHeadBlock, Block<T> newHeadBlock)
239     {
240         // Update the head block pointer if needs to
241         if (oldHeadBlock != newHeadBlock)
242         {
243             // The head block pointer lagged behind.
244             // If this update fails, it means that other threads
245             // have emptied this block and pointed to a new one.
246             casHeadBlock(oldHeadBlock, newHeadBlock);
247         }
248     }
249 
250     protected boolean casHeadBlock(Block<T> current, Block<T> update)
251     {
252         return _blocks.compareAndSet(HEAD_OFFSET,current,update);
253     }
254 
255     @Override
256     public T peek()
257     {
258         Block<T> currentHeadBlock = getHeadBlock();
259         int head = currentHeadBlock.head();
260         while (true)
261         {
262             if (head == getBlockSize())
263             {
264                 Block<T> nextHeadBlock = currentHeadBlock.next();
265                 if (nextHeadBlock == null)
266                 {
267                     // Queue is empty
268                     return null;
269                 }
270                 else
271                 {
272                     // Use next block and loop
273                     currentHeadBlock = nextHeadBlock;
274                     head = currentHeadBlock.head();
275                 }
276             }
277             else
278             {
279                 Object element = currentHeadBlock.peek(head);
280                 if (element == REMOVED_ELEMENT)
281                 {
282                     // Already removed, try next index
283                     ++head;
284                 }
285                 else
286                 {
287                     return (T)element;
288                 }
289             }
290         }
291     }
292 
293     @Override
294     public boolean remove(Object o)
295     {
296         Block<T> currentHeadBlock = getHeadBlock();
297         int head = currentHeadBlock.head();
298         boolean result = false;
299         while (true)
300         {
301             if (head == getBlockSize())
302             {
303                 Block<T> nextHeadBlock = currentHeadBlock.next();
304                 if (nextHeadBlock == null)
305                 {
306                     // Not found
307                     break;
308                 }
309                 else
310                 {
311                     // Use next block and loop
312                     currentHeadBlock = nextHeadBlock;
313                     head = currentHeadBlock.head();
314                 }
315             }
316             else
317             {
318                 Object element = currentHeadBlock.peek(head);
319                 if (element == REMOVED_ELEMENT)
320                 {
321                     // Removed, try next index
322                     ++head;
323                 }
324                 else
325                 {
326                     if (element == null)
327                     {
328                         // Not found
329                         break;
330                     }
331                     else
332                     {
333                         if (element.equals(o))
334                         {
335                             // Found
336                             if (currentHeadBlock.remove(head, o, false))
337                             {
338                                 result = true;
339                                 break;
340                             }
341                             else
342                             {
343                                 ++head;
344                             }
345                         }
346                         else
347                         {
348                             // Not the one we're looking for
349                             ++head;
350                         }
351                     }
352                 }
353             }
354         }
355 
356         return result;
357     }
358 
359     @Override
360     public boolean removeAll(Collection<?> c)
361     {
362         // TODO: super invocations are based on iterator.remove(), which throws
363         return super.removeAll(c);
364     }
365 
366     @Override
367     public boolean retainAll(Collection<?> c)
368     {
369         // TODO: super invocations are based on iterator.remove(), which throws
370         return super.retainAll(c);
371     }
372 
373     @Override
374     public Iterator<T> iterator()
375     {
376         final List<Object[]> blocks = new ArrayList<>();
377         Block<T> currentHeadBlock = getHeadBlock();
378         while (currentHeadBlock != null)
379         {
380             Object[] elements = currentHeadBlock.arrayCopy();
381             blocks.add(elements);
382             currentHeadBlock = currentHeadBlock.next();
383         }
384         return new Iterator<T>()
385         {
386             private int blockIndex;
387             private int index;
388 
389             @Override
390             public boolean hasNext()
391             {
392                 while (true)
393                 {
394                     if (blockIndex == blocks.size())
395                         return false;
396 
397                     Object element = blocks.get(blockIndex)[index];
398 
399                     if (element == null)
400                         return false;
401 
402                     if (element != REMOVED_ELEMENT)
403                         return true;
404 
405                     advance();
406                 }
407             }
408 
409             @Override
410             public T next()
411             {
412                 while (true)
413                 {
414                     if (blockIndex == blocks.size())
415                         throw new NoSuchElementException();
416 
417                     Object element = blocks.get(blockIndex)[index];
418 
419                     if (element == null)
420                         throw new NoSuchElementException();
421 
422                     advance();
423 
424                     if (element != REMOVED_ELEMENT)
425                         return (T)element;
426                 }
427             }
428 
429             private void advance()
430             {
431                 if (++index == getBlockSize())
432                 {
433                     index = 0;
434                     ++blockIndex;
435                 }
436             }
437 
438             @Override
439             public void remove()
440             {
441                 throw new UnsupportedOperationException();
442             }
443         };
444     }
445 
446     @Override
447     public int size()
448     {
449         Block<T> currentHeadBlock = getHeadBlock();
450         int head = currentHeadBlock.head();
451         int size = 0;
452         while (true)
453         {
454             if (head == getBlockSize())
455             {
456                 Block<T> nextHeadBlock = currentHeadBlock.next();
457                 if (nextHeadBlock == null)
458                 {
459                     break;
460                 }
461                 else
462                 {
463                     // Use next block and loop
464                     currentHeadBlock = nextHeadBlock;
465                     head = currentHeadBlock.head();
466                 }
467             }
468             else
469             {
470                 Object element = currentHeadBlock.peek(head);
471                 if (element == REMOVED_ELEMENT)
472                 {
473                     // Already removed, try next index
474                     ++head;
475                 }
476                 else if (element != null)
477                 {
478                     ++size;
479                     ++head;
480                 }
481                 else
482                 {
483                     break;
484                 }
485             }
486         }
487         return size;
488     }
489 
490     protected Block<T> newBlock()
491     {
492         return new Block<>(getBlockSize());
493     }
494 
495     protected int getBlockCount()
496     {
497         int result = 0;
498         Block<T> headBlock = getHeadBlock();
499         while (headBlock != null)
500         {
501             ++result;
502             headBlock = headBlock.next();
503         }
504         return result;
505     }
506 
507     protected static final class Block<E>
508     {
509         private static final int headOffset = MemoryUtils.getIntegersPerCacheLine()-1;
510         private static final int tailOffset = MemoryUtils.getIntegersPerCacheLine()*2-1;
511 
512         private final AtomicReferenceArray<Object> elements;
513         private final AtomicReference<Block<E>> next = new AtomicReference<>();
514         private final AtomicIntegerArray indexes = new AtomicIntegerArray(TAIL_OFFSET+1);
515 
516         protected Block(int blockSize)
517         {
518             elements = new AtomicReferenceArray<>(blockSize);
519         }
520 
521         public Object peek(int index)
522         {
523             return elements.get(index);
524         }
525 
526         public boolean store(int index, E item)
527         {
528             boolean result = elements.compareAndSet(index, null, item);
529             if (result)
530                 indexes.incrementAndGet(tailOffset);
531             return result;
532         }
533 
534         public boolean remove(int index, Object item, boolean updateHead)
535         {
536             boolean result = elements.compareAndSet(index, item, REMOVED_ELEMENT);
537             if (result && updateHead)
538                 indexes.incrementAndGet(headOffset);
539             return result;
540         }
541 
542         public Block<E> next()
543         {
544             return next.get();
545         }
546 
547         public boolean link(Block<E> nextBlock)
548         {
549             return next.compareAndSet(null, nextBlock);
550         }
551 
552         public int head()
553         {
554             return indexes.get(headOffset);
555         }
556 
557         public int tail()
558         {
559             return indexes.get(tailOffset);
560         }
561 
562         public Object[] arrayCopy()
563         {
564             Object[] result = new Object[elements.length()];
565             for (int i = 0; i < result.length; ++i)
566                 result[i] = elements.get(i);
567             return result;
568         }
569     }
570 }