1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.util;
20
21 import java.util.AbstractQueue;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.NoSuchElementException;
27 import java.util.Objects;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.atomic.AtomicIntegerArray;
31 import java.util.concurrent.atomic.AtomicReference;
32 import java.util.concurrent.atomic.AtomicReferenceArray;
33
34
35
36
37
38
39
40
41
42
43
44
45
46 public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
47 {
48 public static final int DEFAULT_BLOCK_SIZE = 512;
49 public static final Object REMOVED_ELEMENT = new Object()
50 {
51 @Override
52 public String toString()
53 {
54 return "X";
55 }
56 };
57
58 private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
59 private static final int TAIL_OFFSET = MemoryUtils.getIntegersPerCacheLine()*2 -1;
60
61 private final AtomicReferenceArray<Block<T>> _blocks = new AtomicReferenceArray<>(TAIL_OFFSET + 1);
62 private final int _blockSize;
63
64 public ConcurrentArrayQueue()
65 {
66 this(DEFAULT_BLOCK_SIZE);
67 }
68
69 public ConcurrentArrayQueue(int blockSize)
70 {
71 _blockSize = blockSize;
72 Block<T> block = newBlock();
73 _blocks.set(HEAD_OFFSET,block);
74 _blocks.set(TAIL_OFFSET,block);
75 }
76
77 public int getBlockSize()
78 {
79 return _blockSize;
80 }
81
82 protected Block<T> getHeadBlock()
83 {
84 return _blocks.get(HEAD_OFFSET);
85 }
86
87 protected Block<T> getTailBlock()
88 {
89 return _blocks.get(TAIL_OFFSET);
90 }
91
92 @Override
93 public boolean offer(T item)
94 {
95 item = Objects.requireNonNull(item);
96
97 final Block<T> initialTailBlock = getTailBlock();
98 Block<T> currentTailBlock = initialTailBlock;
99 int tail = currentTailBlock.tail();
100 while (true)
101 {
102 if (tail == getBlockSize())
103 {
104 Block<T> nextTailBlock = currentTailBlock.next();
105 if (nextTailBlock == null)
106 {
107 nextTailBlock = newBlock();
108 if (currentTailBlock.link(nextTailBlock))
109 {
110
111 currentTailBlock = nextTailBlock;
112 }
113 else
114 {
115
116 currentTailBlock = currentTailBlock.next();
117 }
118 }
119 else
120 {
121
122 currentTailBlock = nextTailBlock;
123 }
124 tail = currentTailBlock.tail();
125 }
126 else
127 {
128 if (currentTailBlock.peek(tail) == null)
129 {
130 if (currentTailBlock.store(tail, item))
131 {
132
133 break;
134 }
135 else
136 {
137
138 ++tail;
139 }
140 }
141 else
142 {
143
144 ++tail;
145 }
146 }
147 }
148
149 updateTailBlock(initialTailBlock, currentTailBlock);
150
151 return true;
152 }
153
154 private void updateTailBlock(Block<T> oldTailBlock, Block<T> newTailBlock)
155 {
156
157 if (oldTailBlock != newTailBlock)
158 {
159
160
161
162 casTailBlock(oldTailBlock, newTailBlock);
163 }
164 }
165
166 protected boolean casTailBlock(Block<T> current, Block<T> update)
167 {
168 return _blocks.compareAndSet(TAIL_OFFSET,current,update);
169 }
170
171 @Override
172 public T poll()
173 {
174 final Block<T> initialHeadBlock = getHeadBlock();
175 Block<T> currentHeadBlock = initialHeadBlock;
176 int head = currentHeadBlock.head();
177 T result = null;
178 while (true)
179 {
180 if (head == getBlockSize())
181 {
182 Block<T> nextHeadBlock = currentHeadBlock.next();
183 if (nextHeadBlock == null)
184 {
185
186
187
188
189
190
191 break;
192 }
193 else
194 {
195
196 currentHeadBlock = nextHeadBlock;
197 head = currentHeadBlock.head();
198 }
199 }
200 else
201 {
202 Object element = currentHeadBlock.peek(head);
203 if (element == REMOVED_ELEMENT)
204 {
205
206 ++head;
207 }
208 else
209 {
210 result = (T)element;
211 if (result != null)
212 {
213 if (currentHeadBlock.remove(head, result, true))
214 {
215
216 break;
217 }
218 else
219 {
220
221 ++head;
222 }
223 }
224 else
225 {
226
227 break;
228 }
229 }
230 }
231 }
232
233 updateHeadBlock(initialHeadBlock, currentHeadBlock);
234
235 return result;
236 }
237
238 private void updateHeadBlock(Block<T> oldHeadBlock, Block<T> newHeadBlock)
239 {
240
241 if (oldHeadBlock != newHeadBlock)
242 {
243
244
245
246 casHeadBlock(oldHeadBlock, newHeadBlock);
247 }
248 }
249
250 protected boolean casHeadBlock(Block<T> current, Block<T> update)
251 {
252 return _blocks.compareAndSet(HEAD_OFFSET,current,update);
253 }
254
255 @Override
256 public T peek()
257 {
258 Block<T> currentHeadBlock = getHeadBlock();
259 int head = currentHeadBlock.head();
260 while (true)
261 {
262 if (head == getBlockSize())
263 {
264 Block<T> nextHeadBlock = currentHeadBlock.next();
265 if (nextHeadBlock == null)
266 {
267
268 return null;
269 }
270 else
271 {
272
273 currentHeadBlock = nextHeadBlock;
274 head = currentHeadBlock.head();
275 }
276 }
277 else
278 {
279 Object element = currentHeadBlock.peek(head);
280 if (element == REMOVED_ELEMENT)
281 {
282
283 ++head;
284 }
285 else
286 {
287 return (T)element;
288 }
289 }
290 }
291 }
292
293 @Override
294 public boolean remove(Object o)
295 {
296 Block<T> currentHeadBlock = getHeadBlock();
297 int head = currentHeadBlock.head();
298 boolean result = false;
299 while (true)
300 {
301 if (head == getBlockSize())
302 {
303 Block<T> nextHeadBlock = currentHeadBlock.next();
304 if (nextHeadBlock == null)
305 {
306
307 break;
308 }
309 else
310 {
311
312 currentHeadBlock = nextHeadBlock;
313 head = currentHeadBlock.head();
314 }
315 }
316 else
317 {
318 Object element = currentHeadBlock.peek(head);
319 if (element == REMOVED_ELEMENT)
320 {
321
322 ++head;
323 }
324 else
325 {
326 if (element == null)
327 {
328
329 break;
330 }
331 else
332 {
333 if (element.equals(o))
334 {
335
336 if (currentHeadBlock.remove(head, o, false))
337 {
338 result = true;
339 break;
340 }
341 else
342 {
343 ++head;
344 }
345 }
346 else
347 {
348
349 ++head;
350 }
351 }
352 }
353 }
354 }
355
356 return result;
357 }
358
359 @Override
360 public boolean removeAll(Collection<?> c)
361 {
362
363 return super.removeAll(c);
364 }
365
366 @Override
367 public boolean retainAll(Collection<?> c)
368 {
369
370 return super.retainAll(c);
371 }
372
373 @Override
374 public Iterator<T> iterator()
375 {
376 final List<Object[]> blocks = new ArrayList<>();
377 Block<T> currentHeadBlock = getHeadBlock();
378 while (currentHeadBlock != null)
379 {
380 Object[] elements = currentHeadBlock.arrayCopy();
381 blocks.add(elements);
382 currentHeadBlock = currentHeadBlock.next();
383 }
384 return new Iterator<T>()
385 {
386 private int blockIndex;
387 private int index;
388
389 @Override
390 public boolean hasNext()
391 {
392 while (true)
393 {
394 if (blockIndex == blocks.size())
395 return false;
396
397 Object element = blocks.get(blockIndex)[index];
398
399 if (element == null)
400 return false;
401
402 if (element != REMOVED_ELEMENT)
403 return true;
404
405 advance();
406 }
407 }
408
409 @Override
410 public T next()
411 {
412 while (true)
413 {
414 if (blockIndex == blocks.size())
415 throw new NoSuchElementException();
416
417 Object element = blocks.get(blockIndex)[index];
418
419 if (element == null)
420 throw new NoSuchElementException();
421
422 advance();
423
424 if (element != REMOVED_ELEMENT)
425 return (T)element;
426 }
427 }
428
429 private void advance()
430 {
431 if (++index == getBlockSize())
432 {
433 index = 0;
434 ++blockIndex;
435 }
436 }
437
438 @Override
439 public void remove()
440 {
441 throw new UnsupportedOperationException();
442 }
443 };
444 }
445
446 @Override
447 public int size()
448 {
449 Block<T> currentHeadBlock = getHeadBlock();
450 int head = currentHeadBlock.head();
451 int size = 0;
452 while (true)
453 {
454 if (head == getBlockSize())
455 {
456 Block<T> nextHeadBlock = currentHeadBlock.next();
457 if (nextHeadBlock == null)
458 {
459 break;
460 }
461 else
462 {
463
464 currentHeadBlock = nextHeadBlock;
465 head = currentHeadBlock.head();
466 }
467 }
468 else
469 {
470 Object element = currentHeadBlock.peek(head);
471 if (element == REMOVED_ELEMENT)
472 {
473
474 ++head;
475 }
476 else if (element != null)
477 {
478 ++size;
479 ++head;
480 }
481 else
482 {
483 break;
484 }
485 }
486 }
487 return size;
488 }
489
490 protected Block<T> newBlock()
491 {
492 return new Block<>(getBlockSize());
493 }
494
495 protected int getBlockCount()
496 {
497 int result = 0;
498 Block<T> headBlock = getHeadBlock();
499 while (headBlock != null)
500 {
501 ++result;
502 headBlock = headBlock.next();
503 }
504 return result;
505 }
506
507 protected static final class Block<E>
508 {
509 private static final int headOffset = MemoryUtils.getIntegersPerCacheLine()-1;
510 private static final int tailOffset = MemoryUtils.getIntegersPerCacheLine()*2-1;
511
512 private final AtomicReferenceArray<Object> elements;
513 private final AtomicReference<Block<E>> next = new AtomicReference<>();
514 private final AtomicIntegerArray indexes = new AtomicIntegerArray(TAIL_OFFSET+1);
515
516 protected Block(int blockSize)
517 {
518 elements = new AtomicReferenceArray<>(blockSize);
519 }
520
521 public Object peek(int index)
522 {
523 return elements.get(index);
524 }
525
526 public boolean store(int index, E item)
527 {
528 boolean result = elements.compareAndSet(index, null, item);
529 if (result)
530 indexes.incrementAndGet(tailOffset);
531 return result;
532 }
533
534 public boolean remove(int index, Object item, boolean updateHead)
535 {
536 boolean result = elements.compareAndSet(index, item, REMOVED_ELEMENT);
537 if (result && updateHead)
538 indexes.incrementAndGet(headOffset);
539 return result;
540 }
541
542 public Block<E> next()
543 {
544 return next.get();
545 }
546
547 public boolean link(Block<E> nextBlock)
548 {
549 return next.compareAndSet(null, nextBlock);
550 }
551
552 public int head()
553 {
554 return indexes.get(headOffset);
555 }
556
557 public int tail()
558 {
559 return indexes.get(tailOffset);
560 }
561
562 public Object[] arrayCopy()
563 {
564 Object[] result = new Object[elements.length()];
565 for (int i = 0; i < result.length; ++i)
566 result[i] = elements.get(i);
567 return result;
568 }
569 }
570 }