1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.util;
20
21 import java.util.AbstractQueue;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.NoSuchElementException;
27 import java.util.Objects;
28 import java.util.concurrent.atomic.AtomicIntegerArray;
29 import java.util.concurrent.atomic.AtomicReference;
30 import java.util.concurrent.atomic.AtomicReferenceArray;
31
32
33
34
35
36
37
38
39
40
41
42
43
44 public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
45 {
46 public static final int DEFAULT_BLOCK_SIZE = 512;
47 public static final Object REMOVED_ELEMENT = new Object()
48 {
49 @Override
50 public String toString()
51 {
52 return "X";
53 }
54 };
55
56 private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
57 private static final int TAIL_OFFSET = MemoryUtils.getIntegersPerCacheLine()*2 -1;
58
59 private final AtomicReferenceArray<Block<T>> _blocks = new AtomicReferenceArray<>(TAIL_OFFSET + 1);
60 private final int _blockSize;
61
62 public ConcurrentArrayQueue()
63 {
64 this(DEFAULT_BLOCK_SIZE);
65 }
66
67 public ConcurrentArrayQueue(int blockSize)
68 {
69 _blockSize = blockSize;
70 Block<T> block = newBlock();
71 _blocks.set(HEAD_OFFSET,block);
72 _blocks.set(TAIL_OFFSET,block);
73 }
74
75 public int getBlockSize()
76 {
77 return _blockSize;
78 }
79
80 protected Block<T> getHeadBlock()
81 {
82 return _blocks.get(HEAD_OFFSET);
83 }
84
85 protected Block<T> getTailBlock()
86 {
87 return _blocks.get(TAIL_OFFSET);
88 }
89
90 @Override
91 public boolean offer(T item)
92 {
93 item = Objects.requireNonNull(item);
94
95 final Block<T> initialTailBlock = getTailBlock();
96 Block<T> currentTailBlock = initialTailBlock;
97 int tail = currentTailBlock.tail();
98 while (true)
99 {
100 if (tail == getBlockSize())
101 {
102 Block<T> nextTailBlock = currentTailBlock.next();
103 if (nextTailBlock == null)
104 {
105 nextTailBlock = newBlock();
106 if (currentTailBlock.link(nextTailBlock))
107 {
108
109 currentTailBlock = nextTailBlock;
110 }
111 else
112 {
113
114 currentTailBlock = currentTailBlock.next();
115 }
116 }
117 else
118 {
119
120 currentTailBlock = nextTailBlock;
121 }
122 tail = currentTailBlock.tail();
123 }
124 else
125 {
126 if (currentTailBlock.peek(tail) == null)
127 {
128 if (currentTailBlock.store(tail, item))
129 {
130
131 break;
132 }
133 else
134 {
135
136 ++tail;
137 }
138 }
139 else
140 {
141
142 ++tail;
143 }
144 }
145 }
146
147 updateTailBlock(initialTailBlock, currentTailBlock);
148
149 return true;
150 }
151
152 private void updateTailBlock(Block<T> oldTailBlock, Block<T> newTailBlock)
153 {
154
155 if (oldTailBlock != newTailBlock)
156 {
157
158
159
160 casTailBlock(oldTailBlock, newTailBlock);
161 }
162 }
163
164 protected boolean casTailBlock(Block<T> current, Block<T> update)
165 {
166 return _blocks.compareAndSet(TAIL_OFFSET,current,update);
167 }
168
169 @SuppressWarnings("unchecked")
170 @Override
171 public T poll()
172 {
173 final Block<T> initialHeadBlock = getHeadBlock();
174 Block<T> currentHeadBlock = initialHeadBlock;
175 int head = currentHeadBlock.head();
176 T result = null;
177 while (true)
178 {
179 if (head == getBlockSize())
180 {
181 Block<T> nextHeadBlock = currentHeadBlock.next();
182 if (nextHeadBlock == null)
183 {
184
185
186
187
188
189
190 break;
191 }
192 else
193 {
194
195 currentHeadBlock = nextHeadBlock;
196 head = currentHeadBlock.head();
197 }
198 }
199 else
200 {
201 Object element = currentHeadBlock.peek(head);
202 if (element == REMOVED_ELEMENT)
203 {
204
205 ++head;
206 }
207 else
208 {
209 result = (T)element;
210 if (result != null)
211 {
212 if (currentHeadBlock.remove(head, result, true))
213 {
214
215 break;
216 }
217 else
218 {
219
220 ++head;
221 }
222 }
223 else
224 {
225
226 break;
227 }
228 }
229 }
230 }
231
232 updateHeadBlock(initialHeadBlock, currentHeadBlock);
233
234 return result;
235 }
236
237 private void updateHeadBlock(Block<T> oldHeadBlock, Block<T> newHeadBlock)
238 {
239
240 if (oldHeadBlock != newHeadBlock)
241 {
242
243
244
245 casHeadBlock(oldHeadBlock, newHeadBlock);
246 }
247 }
248
249 protected boolean casHeadBlock(Block<T> current, Block<T> update)
250 {
251 return _blocks.compareAndSet(HEAD_OFFSET,current,update);
252 }
253
254 @Override
255 public T peek()
256 {
257 Block<T> currentHeadBlock = getHeadBlock();
258 int head = currentHeadBlock.head();
259 while (true)
260 {
261 if (head == getBlockSize())
262 {
263 Block<T> nextHeadBlock = currentHeadBlock.next();
264 if (nextHeadBlock == null)
265 {
266
267 return null;
268 }
269 else
270 {
271
272 currentHeadBlock = nextHeadBlock;
273 head = currentHeadBlock.head();
274 }
275 }
276 else
277 {
278 T element = currentHeadBlock.peek(head);
279 if (element == REMOVED_ELEMENT)
280 {
281
282 ++head;
283 }
284 else
285 {
286 return element;
287 }
288 }
289 }
290 }
291
292 @Override
293 public boolean remove(Object o)
294 {
295 Block<T> currentHeadBlock = getHeadBlock();
296 int head = currentHeadBlock.head();
297 boolean result = false;
298 while (true)
299 {
300 if (head == getBlockSize())
301 {
302 Block<T> nextHeadBlock = currentHeadBlock.next();
303 if (nextHeadBlock == null)
304 {
305
306 break;
307 }
308 else
309 {
310
311 currentHeadBlock = nextHeadBlock;
312 head = currentHeadBlock.head();
313 }
314 }
315 else
316 {
317 Object element = currentHeadBlock.peek(head);
318 if (element == REMOVED_ELEMENT)
319 {
320
321 ++head;
322 }
323 else
324 {
325 if (element == null)
326 {
327
328 break;
329 }
330 else
331 {
332 if (element.equals(o))
333 {
334
335 if (currentHeadBlock.remove(head, o, false))
336 {
337 result = true;
338 break;
339 }
340 else
341 {
342 ++head;
343 }
344 }
345 else
346 {
347
348 ++head;
349 }
350 }
351 }
352 }
353 }
354
355 return result;
356 }
357
358 @Override
359 public boolean removeAll(Collection<?> c)
360 {
361
362 return super.removeAll(c);
363 }
364
365 @Override
366 public boolean retainAll(Collection<?> c)
367 {
368
369 return super.retainAll(c);
370 }
371
372 @Override
373 public Iterator<T> iterator()
374 {
375 final List<Object[]> blocks = new ArrayList<>();
376 Block<T> currentHeadBlock = getHeadBlock();
377 while (currentHeadBlock != null)
378 {
379 Object[] elements = currentHeadBlock.arrayCopy();
380 blocks.add(elements);
381 currentHeadBlock = currentHeadBlock.next();
382 }
383 return new Iterator<T>()
384 {
385 private int blockIndex;
386 private int index;
387
388 @Override
389 public boolean hasNext()
390 {
391 while (true)
392 {
393 if (blockIndex == blocks.size())
394 return false;
395
396 Object element = blocks.get(blockIndex)[index];
397
398 if (element == null)
399 return false;
400
401 if (element != REMOVED_ELEMENT)
402 return true;
403
404 advance();
405 }
406 }
407
408 @Override
409 public T next()
410 {
411 while (true)
412 {
413 if (blockIndex == blocks.size())
414 throw new NoSuchElementException();
415
416 Object element = blocks.get(blockIndex)[index];
417
418 if (element == null)
419 throw new NoSuchElementException();
420
421 advance();
422
423 if (element != REMOVED_ELEMENT) {
424 @SuppressWarnings("unchecked")
425 T e = (T)element;
426 return e;
427 }
428 }
429 }
430
431 private void advance()
432 {
433 if (++index == getBlockSize())
434 {
435 index = 0;
436 ++blockIndex;
437 }
438 }
439
440 @Override
441 public void remove()
442 {
443 throw new UnsupportedOperationException();
444 }
445 };
446 }
447
448 @Override
449 public int size()
450 {
451 Block<T> currentHeadBlock = getHeadBlock();
452 int head = currentHeadBlock.head();
453 int size = 0;
454 while (true)
455 {
456 if (head == getBlockSize())
457 {
458 Block<T> nextHeadBlock = currentHeadBlock.next();
459 if (nextHeadBlock == null)
460 {
461 break;
462 }
463 else
464 {
465
466 currentHeadBlock = nextHeadBlock;
467 head = currentHeadBlock.head();
468 }
469 }
470 else
471 {
472 Object element = currentHeadBlock.peek(head);
473 if (element == REMOVED_ELEMENT)
474 {
475
476 ++head;
477 }
478 else if (element != null)
479 {
480 ++size;
481 ++head;
482 }
483 else
484 {
485 break;
486 }
487 }
488 }
489 return size;
490 }
491
492 protected Block<T> newBlock()
493 {
494 return new Block<>(getBlockSize());
495 }
496
497 protected int getBlockCount()
498 {
499 int result = 0;
500 Block<T> headBlock = getHeadBlock();
501 while (headBlock != null)
502 {
503 ++result;
504 headBlock = headBlock.next();
505 }
506 return result;
507 }
508
509 protected static final class Block<E>
510 {
511 private static final int headOffset = MemoryUtils.getIntegersPerCacheLine()-1;
512 private static final int tailOffset = MemoryUtils.getIntegersPerCacheLine()*2-1;
513
514 private final AtomicReferenceArray<Object> elements;
515 private final AtomicReference<Block<E>> next = new AtomicReference<>();
516 private final AtomicIntegerArray indexes = new AtomicIntegerArray(TAIL_OFFSET+1);
517
518 protected Block(int blockSize)
519 {
520 elements = new AtomicReferenceArray<>(blockSize);
521 }
522
523 @SuppressWarnings("unchecked")
524 public E peek(int index)
525 {
526 return (E)elements.get(index);
527 }
528
529 public boolean store(int index, E item)
530 {
531 boolean result = elements.compareAndSet(index, null, item);
532 if (result)
533 indexes.incrementAndGet(tailOffset);
534 return result;
535 }
536
537 public boolean remove(int index, Object item, boolean updateHead)
538 {
539 boolean result = elements.compareAndSet(index, item, REMOVED_ELEMENT);
540 if (result && updateHead)
541 indexes.incrementAndGet(headOffset);
542 return result;
543 }
544
545 public Block<E> next()
546 {
547 return next.get();
548 }
549
550 public boolean link(Block<E> nextBlock)
551 {
552 return next.compareAndSet(null, nextBlock);
553 }
554
555 public int head()
556 {
557 return indexes.get(headOffset);
558 }
559
560 public int tail()
561 {
562 return indexes.get(tailOffset);
563 }
564
565 public Object[] arrayCopy()
566 {
567 Object[] result = new Object[elements.length()];
568 for (int i = 0; i < result.length; ++i)
569 result[i] = elements.get(i);
570 return result;
571 }
572 }
573 }