1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.util;
20
21 import java.util.AbstractQueue;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.NoSuchElementException;
27 import java.util.Objects;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.atomic.AtomicIntegerArray;
31 import java.util.concurrent.atomic.AtomicReference;
32 import java.util.concurrent.atomic.AtomicReferenceArray;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
49 {
50 public static final int DEFAULT_BLOCK_SIZE = 512;
51 public static final Object REMOVED_ELEMENT = new Object()
52 {
53 @Override
54 public String toString()
55 {
56 return "X";
57 }
58 };
59
60 private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
61 private static final int TAIL_OFFSET = MemoryUtils.getIntegersPerCacheLine()*2 -1;
62
63 private final AtomicReferenceArray<Block<T>> _blocks = new AtomicReferenceArray<>(TAIL_OFFSET + 1);
64 private final int _blockSize;
65
66 public ConcurrentArrayQueue()
67 {
68 this(DEFAULT_BLOCK_SIZE);
69 }
70
71 public ConcurrentArrayQueue(int blockSize)
72 {
73 _blockSize = blockSize;
74 Block<T> block = newBlock();
75 _blocks.set(HEAD_OFFSET,block);
76 _blocks.set(TAIL_OFFSET,block);
77 }
78
79 public int getBlockSize()
80 {
81 return _blockSize;
82 }
83
84 protected Block<T> getHeadBlock()
85 {
86 return _blocks.get(HEAD_OFFSET);
87 }
88
89 protected Block<T> getTailBlock()
90 {
91 return _blocks.get(TAIL_OFFSET);
92 }
93
94 @Override
95 public boolean offer(T item)
96 {
97 item = Objects.requireNonNull(item);
98
99 final Block<T> initialTailBlock = getTailBlock();
100 Block<T> currentTailBlock = initialTailBlock;
101 int tail = currentTailBlock.tail();
102 while (true)
103 {
104 if (tail == getBlockSize())
105 {
106 Block<T> nextTailBlock = currentTailBlock.next();
107 if (nextTailBlock == null)
108 {
109 nextTailBlock = newBlock();
110 if (currentTailBlock.link(nextTailBlock))
111 {
112
113 currentTailBlock = nextTailBlock;
114 }
115 else
116 {
117
118 currentTailBlock = currentTailBlock.next();
119 }
120 }
121 else
122 {
123
124 currentTailBlock = nextTailBlock;
125 }
126 tail = currentTailBlock.tail();
127 }
128 else
129 {
130 if (currentTailBlock.peek(tail) == null)
131 {
132 if (currentTailBlock.store(tail, item))
133 {
134
135 break;
136 }
137 else
138 {
139
140 ++tail;
141 }
142 }
143 else
144 {
145
146 ++tail;
147 }
148 }
149 }
150
151 updateTailBlock(initialTailBlock, currentTailBlock);
152
153 return true;
154 }
155
156 private void updateTailBlock(Block<T> oldTailBlock, Block<T> newTailBlock)
157 {
158
159 if (oldTailBlock != newTailBlock)
160 {
161
162
163
164 casTailBlock(oldTailBlock, newTailBlock);
165 }
166 }
167
168 protected boolean casTailBlock(Block<T> current, Block<T> update)
169 {
170 return _blocks.compareAndSet(TAIL_OFFSET,current,update);
171 }
172
173 @SuppressWarnings("unchecked")
174 @Override
175 public T poll()
176 {
177 final Block<T> initialHeadBlock = getHeadBlock();
178 Block<T> currentHeadBlock = initialHeadBlock;
179 int head = currentHeadBlock.head();
180 T result = null;
181 while (true)
182 {
183 if (head == getBlockSize())
184 {
185 Block<T> nextHeadBlock = currentHeadBlock.next();
186 if (nextHeadBlock == null)
187 {
188
189
190
191
192
193
194 break;
195 }
196 else
197 {
198
199 currentHeadBlock = nextHeadBlock;
200 head = currentHeadBlock.head();
201 }
202 }
203 else
204 {
205 Object element = currentHeadBlock.peek(head);
206 if (element == REMOVED_ELEMENT)
207 {
208
209 ++head;
210 }
211 else
212 {
213 result = (T)element;
214 if (result != null)
215 {
216 if (currentHeadBlock.remove(head, result, true))
217 {
218
219 break;
220 }
221 else
222 {
223
224 ++head;
225 }
226 }
227 else
228 {
229
230 break;
231 }
232 }
233 }
234 }
235
236 updateHeadBlock(initialHeadBlock, currentHeadBlock);
237
238 return result;
239 }
240
241 private void updateHeadBlock(Block<T> oldHeadBlock, Block<T> newHeadBlock)
242 {
243
244 if (oldHeadBlock != newHeadBlock)
245 {
246
247
248
249 casHeadBlock(oldHeadBlock, newHeadBlock);
250 }
251 }
252
253 protected boolean casHeadBlock(Block<T> current, Block<T> update)
254 {
255 return _blocks.compareAndSet(HEAD_OFFSET,current,update);
256 }
257
258 @Override
259 public T peek()
260 {
261 Block<T> currentHeadBlock = getHeadBlock();
262 int head = currentHeadBlock.head();
263 while (true)
264 {
265 if (head == getBlockSize())
266 {
267 Block<T> nextHeadBlock = currentHeadBlock.next();
268 if (nextHeadBlock == null)
269 {
270
271 return null;
272 }
273 else
274 {
275
276 currentHeadBlock = nextHeadBlock;
277 head = currentHeadBlock.head();
278 }
279 }
280 else
281 {
282 T element = currentHeadBlock.peek(head);
283 if (element == REMOVED_ELEMENT)
284 {
285
286 ++head;
287 }
288 else
289 {
290 return element;
291 }
292 }
293 }
294 }
295
296 @Override
297 public boolean remove(Object o)
298 {
299 Block<T> currentHeadBlock = getHeadBlock();
300 int head = currentHeadBlock.head();
301 boolean result = false;
302 while (true)
303 {
304 if (head == getBlockSize())
305 {
306 Block<T> nextHeadBlock = currentHeadBlock.next();
307 if (nextHeadBlock == null)
308 {
309
310 break;
311 }
312 else
313 {
314
315 currentHeadBlock = nextHeadBlock;
316 head = currentHeadBlock.head();
317 }
318 }
319 else
320 {
321 Object element = currentHeadBlock.peek(head);
322 if (element == REMOVED_ELEMENT)
323 {
324
325 ++head;
326 }
327 else
328 {
329 if (element == null)
330 {
331
332 break;
333 }
334 else
335 {
336 if (element.equals(o))
337 {
338
339 if (currentHeadBlock.remove(head, o, false))
340 {
341 result = true;
342 break;
343 }
344 else
345 {
346 ++head;
347 }
348 }
349 else
350 {
351
352 ++head;
353 }
354 }
355 }
356 }
357 }
358
359 return result;
360 }
361
362 @Override
363 public boolean removeAll(Collection<?> c)
364 {
365
366 return super.removeAll(c);
367 }
368
369 @Override
370 public boolean retainAll(Collection<?> c)
371 {
372
373 return super.retainAll(c);
374 }
375
376 @Override
377 public Iterator<T> iterator()
378 {
379 final List<Object[]> blocks = new ArrayList<>();
380 Block<T> currentHeadBlock = getHeadBlock();
381 while (currentHeadBlock != null)
382 {
383 Object[] elements = currentHeadBlock.arrayCopy();
384 blocks.add(elements);
385 currentHeadBlock = currentHeadBlock.next();
386 }
387 return new Iterator<T>()
388 {
389 private int blockIndex;
390 private int index;
391
392 @Override
393 public boolean hasNext()
394 {
395 while (true)
396 {
397 if (blockIndex == blocks.size())
398 return false;
399
400 Object element = blocks.get(blockIndex)[index];
401
402 if (element == null)
403 return false;
404
405 if (element != REMOVED_ELEMENT)
406 return true;
407
408 advance();
409 }
410 }
411
412 @Override
413 public T next()
414 {
415 while (true)
416 {
417 if (blockIndex == blocks.size())
418 throw new NoSuchElementException();
419
420 Object element = blocks.get(blockIndex)[index];
421
422 if (element == null)
423 throw new NoSuchElementException();
424
425 advance();
426
427 if (element != REMOVED_ELEMENT) {
428 @SuppressWarnings("unchecked")
429 T e = (T)element;
430 return e;
431 }
432 }
433 }
434
435 private void advance()
436 {
437 if (++index == getBlockSize())
438 {
439 index = 0;
440 ++blockIndex;
441 }
442 }
443
444 @Override
445 public void remove()
446 {
447 throw new UnsupportedOperationException();
448 }
449 };
450 }
451
452 @Override
453 public int size()
454 {
455 Block<T> currentHeadBlock = getHeadBlock();
456 int head = currentHeadBlock.head();
457 int size = 0;
458 while (true)
459 {
460 if (head == getBlockSize())
461 {
462 Block<T> nextHeadBlock = currentHeadBlock.next();
463 if (nextHeadBlock == null)
464 {
465 break;
466 }
467 else
468 {
469
470 currentHeadBlock = nextHeadBlock;
471 head = currentHeadBlock.head();
472 }
473 }
474 else
475 {
476 Object element = currentHeadBlock.peek(head);
477 if (element == REMOVED_ELEMENT)
478 {
479
480 ++head;
481 }
482 else if (element != null)
483 {
484 ++size;
485 ++head;
486 }
487 else
488 {
489 break;
490 }
491 }
492 }
493 return size;
494 }
495
496 protected Block<T> newBlock()
497 {
498 return new Block<>(getBlockSize());
499 }
500
501 protected int getBlockCount()
502 {
503 int result = 0;
504 Block<T> headBlock = getHeadBlock();
505 while (headBlock != null)
506 {
507 ++result;
508 headBlock = headBlock.next();
509 }
510 return result;
511 }
512
513 protected static final class Block<E>
514 {
515 private static final int headOffset = MemoryUtils.getIntegersPerCacheLine()-1;
516 private static final int tailOffset = MemoryUtils.getIntegersPerCacheLine()*2-1;
517
518 private final AtomicReferenceArray<Object> elements;
519 private final AtomicReference<Block<E>> next = new AtomicReference<>();
520 private final AtomicIntegerArray indexes = new AtomicIntegerArray(TAIL_OFFSET+1);
521
522 protected Block(int blockSize)
523 {
524 elements = new AtomicReferenceArray<>(blockSize);
525 }
526
527 @SuppressWarnings("unchecked")
528 public E peek(int index)
529 {
530 return (E)elements.get(index);
531 }
532
533 public boolean store(int index, E item)
534 {
535 boolean result = elements.compareAndSet(index, null, item);
536 if (result)
537 indexes.incrementAndGet(tailOffset);
538 return result;
539 }
540
541 public boolean remove(int index, Object item, boolean updateHead)
542 {
543 boolean result = elements.compareAndSet(index, item, REMOVED_ELEMENT);
544 if (result && updateHead)
545 indexes.incrementAndGet(headOffset);
546 return result;
547 }
548
549 public Block<E> next()
550 {
551 return next.get();
552 }
553
554 public boolean link(Block<E> nextBlock)
555 {
556 return next.compareAndSet(null, nextBlock);
557 }
558
559 public int head()
560 {
561 return indexes.get(headOffset);
562 }
563
564 public int tail()
565 {
566 return indexes.get(tailOffset);
567 }
568
569 public Object[] arrayCopy()
570 {
571 Object[] result = new Object[elements.length()];
572 for (int i = 0; i < result.length; ++i)
573 result[i] = elements.get(i);
574 return result;
575 }
576 }
577 }