1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.util;
15
16 import java.util.AbstractList;
17 import java.util.Collection;
18 import java.util.NoSuchElementException;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicInteger;
22 import java.util.concurrent.locks.Condition;
23 import java.util.concurrent.locks.ReentrantLock;
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
44 {
45 public final int DEFAULT_CAPACITY=128;
46 public final int DEFAULT_GROWTH=64;
47 private final int _limit;
48 private final AtomicInteger _size=new AtomicInteger();
49 private final int _growCapacity;
50
51 private volatile int _capacity;
52 private Object[] _elements;
53
54 private final ReentrantLock _headLock = new ReentrantLock();
55 private final Condition _notEmpty = _headLock.newCondition();
56 private int _head;
57
58
59
60 private long _space0;
61 private long _space1;
62 private long _space2;
63 private long _space3;
64 private long _space4;
65 private long _space5;
66 private long _space6;
67 private long _space7;
68
69 private final ReentrantLock _tailLock = new ReentrantLock();
70 private int _tail;
71
72
73
74
75
76
77 public BlockingArrayQueue()
78 {
79 _elements=new Object[DEFAULT_CAPACITY];
80 _growCapacity=DEFAULT_GROWTH;
81 _capacity=_elements.length;
82 _limit=Integer.MAX_VALUE;
83 }
84
85
86
87
88
89 public BlockingArrayQueue(int limit)
90 {
91 _elements=new Object[limit];
92 _capacity=_elements.length;
93 _growCapacity=-1;
94 _limit=limit;
95 }
96
97
98
99
100
101
102 public BlockingArrayQueue(int capacity,int growBy)
103 {
104 _elements=new Object[capacity];
105 _capacity=_elements.length;
106 _growCapacity=growBy;
107 _limit=Integer.MAX_VALUE;
108 }
109
110
111
112
113
114
115
116 public BlockingArrayQueue(int capacity,int growBy,int limit)
117 {
118 if (capacity>limit)
119 throw new IllegalArgumentException();
120
121 _elements=new Object[capacity];
122 _capacity=_elements.length;
123 _growCapacity=growBy;
124 _limit=limit;
125 }
126
127
128 public int getCapacity()
129 {
130 return _capacity;
131 }
132
133
134 public int getLimit()
135 {
136 return _limit;
137 }
138
139
140 @Override
141 public boolean add(E e)
142 {
143 return offer(e);
144 }
145
146
147 public E element()
148 {
149 E e = peek();
150 if (e==null)
151 throw new NoSuchElementException();
152 return e;
153 }
154
155
156 @SuppressWarnings("unchecked")
157 public E peek()
158 {
159 if (_size.get() == 0)
160 return null;
161
162 E e = null;
163 _headLock.lock();
164 try
165 {
166 if (_size.get() > 0)
167 e = (E)_elements[_head];
168 }
169 finally
170 {
171 _headLock.unlock();
172 }
173
174 return e;
175 }
176
177
178 public boolean offer(E e)
179 {
180 if (e == null)
181 throw new NullPointerException();
182
183 boolean not_empty=false;
184 _tailLock.lock();
185 try
186 {
187 if (_size.get() >= _limit)
188 return false;
189
190
191 if (_size.get()==_capacity)
192 {
193 _headLock.lock();
194 try
195 {
196 if (!grow())
197 return false;
198 }
199 finally
200 {
201 _headLock.unlock();
202 }
203 }
204
205
206 _elements[_tail]=e;
207 _tail=(_tail+1)%_capacity;
208
209 not_empty=0==_size.getAndIncrement();
210
211 }
212 finally
213 {
214 _tailLock.unlock();
215 }
216
217 if (not_empty)
218 {
219 _headLock.lock();
220 try
221 {
222 _notEmpty.signal();
223 }
224 finally
225 {
226 _headLock.unlock();
227 }
228 }
229
230 return true;
231 }
232
233
234
235 @SuppressWarnings("unchecked")
236 public E poll()
237 {
238 if (_size.get() == 0)
239 return null;
240
241 E e = null;
242 _headLock.lock();
243 try
244 {
245 if (_size.get() > 0)
246 {
247 final int head=_head;
248 e = (E)_elements[head];
249 _elements[head]=null;
250 _head=(head+1)%_capacity;
251
252 if (_size.decrementAndGet()>0)
253 _notEmpty.signal();
254 }
255 }
256 finally
257 {
258 _headLock.unlock();
259 }
260
261 return e;
262 }
263
264
265
266
267
268
269
270
271 @SuppressWarnings("unchecked")
272 public E take() throws InterruptedException
273 {
274 E e = null;
275 _headLock.lockInterruptibly();
276 try
277 {
278 try
279 {
280 while (_size.get() == 0)
281 {
282 _notEmpty.await();
283 }
284 }
285 catch (InterruptedException ie)
286 {
287 _notEmpty.signal();
288 throw ie;
289 }
290
291 final int head=_head;
292 e = (E)_elements[head];
293 _elements[head]=null;
294 _head=(head+1)%_capacity;
295
296 if (_size.decrementAndGet()>0)
297 _notEmpty.signal();
298 }
299 finally
300 {
301 _headLock.unlock();
302 }
303
304 return e;
305 }
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320 @SuppressWarnings("unchecked")
321 public E poll(long time, TimeUnit unit) throws InterruptedException
322 {
323
324 E e = null;
325
326 long nanos = unit.toNanos(time);
327
328 _headLock.lockInterruptibly();
329 try
330 {
331 try
332 {
333 while (_size.get() == 0)
334 {
335 if (nanos<=0)
336 return null;
337 nanos = _notEmpty.awaitNanos(nanos);
338 }
339 }
340 catch (InterruptedException ie)
341 {
342 _notEmpty.signal();
343 throw ie;
344 }
345
346 e = (E)_elements[_head];
347 _elements[_head]=null;
348 _head=(_head+1)%_capacity;
349
350 if (_size.decrementAndGet()>0)
351 _notEmpty.signal();
352 }
353 finally
354 {
355 _headLock.unlock();
356 }
357
358 return e;
359 }
360
361
362 public E remove()
363 {
364 E e=poll();
365 if (e==null)
366 throw new NoSuchElementException();
367 return e;
368 }
369
370
371 @Override
372 public void clear()
373 {
374 _tailLock.lock();
375 try
376 {
377 _headLock.lock();
378 try
379 {
380 _head=0;
381 _tail=0;
382 _size.set(0);
383 }
384 finally
385 {
386 _headLock.unlock();
387 }
388 }
389 finally
390 {
391 _tailLock.unlock();
392 }
393 }
394
395
396 @Override
397 public boolean isEmpty()
398 {
399 return _size.get()==0;
400 }
401
402
403 @Override
404 public int size()
405 {
406 return _size.get();
407 }
408
409
410 @SuppressWarnings("unchecked")
411 @Override
412 public E get(int index)
413 {
414 _tailLock.lock();
415 try
416 {
417 _headLock.lock();
418 try
419 {
420 if (index<0 || index>=_size.get())
421 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
422 int i = _head+index;
423 if (i>=_capacity)
424 i-=_capacity;
425 return (E)_elements[i];
426 }
427 finally
428 {
429 _headLock.unlock();
430 }
431 }
432 finally
433 {
434 _tailLock.unlock();
435 }
436 }
437
438
439 @Override
440 public E remove(int index)
441 {
442 _tailLock.lock();
443 try
444 {
445 _headLock.lock();
446 try
447 {
448
449 if (index<0 || index>=_size.get())
450 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
451
452 int i = _head+index;
453 if (i>=_capacity)
454 i-=_capacity;
455 @SuppressWarnings("unchecked")
456 E old=(E)_elements[i];
457
458 if (i<_tail)
459 {
460 System.arraycopy(_elements,i+1,_elements,i,_tail-i);
461 _tail--;
462 _size.decrementAndGet();
463 }
464 else
465 {
466 System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
467 if (_tail>0)
468 {
469 _elements[_capacity]=_elements[0];
470 System.arraycopy(_elements,1,_elements,0,_tail-1);
471 _tail--;
472 }
473 else
474 _tail=_capacity-1;
475
476 _size.decrementAndGet();
477 }
478
479 return old;
480 }
481 finally
482 {
483 _headLock.unlock();
484 }
485 }
486 finally
487 {
488 _tailLock.unlock();
489 }
490 }
491
492
493 @Override
494 public E set(int index, E e)
495 {
496 if (e == null)
497 throw new NullPointerException();
498
499 _tailLock.lock();
500 try
501 {
502 _headLock.lock();
503 try
504 {
505
506 if (index<0 || index>=_size.get())
507 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
508
509 int i = _head+index;
510 if (i>=_capacity)
511 i-=_capacity;
512 @SuppressWarnings("unchecked")
513 E old=(E)_elements[i];
514 _elements[i]=e;
515 return old;
516 }
517 finally
518 {
519 _headLock.unlock();
520 }
521 }
522 finally
523 {
524 _tailLock.unlock();
525 }
526 }
527
528
529 @Override
530 public void add(int index, E e)
531 {
532 if (e == null)
533 throw new NullPointerException();
534
535 _tailLock.lock();
536 try
537 {
538 _headLock.lock();
539 try
540 {
541
542 if (index<0 || index>_size.get())
543 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
544
545 if (index==_size.get())
546 {
547 add(e);
548 }
549 else
550 {
551 if (_tail==_head)
552 if (!grow())
553 throw new IllegalStateException("full");
554
555 int i = _head+index;
556 if (i>=_capacity)
557 i-=_capacity;
558
559 _size.incrementAndGet();
560 _tail=(_tail+1)%_capacity;
561
562
563 if (i<_tail)
564 {
565 System.arraycopy(_elements,i,_elements,i+1,_tail-i);
566 _elements[i]=e;
567 }
568 else
569 {
570 if (_tail>0)
571 {
572 System.arraycopy(_elements,0,_elements,1,_tail);
573 _elements[0]=_elements[_capacity-1];
574 }
575
576 System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
577 _elements[i]=e;
578 }
579 }
580 }
581 finally
582 {
583 _headLock.unlock();
584 }
585 }
586 finally
587 {
588 _tailLock.unlock();
589 }
590 }
591
592
593 private boolean grow()
594 {
595 if (_growCapacity<=0)
596 return false;
597
598 _tailLock.lock();
599 try
600 {
601 _headLock.lock();
602 try
603 {
604 final int head=_head;
605 final int tail=_tail;
606 final int new_tail;
607
608 Object[] elements=new Object[_capacity+_growCapacity];
609
610 if (head<tail)
611 {
612 new_tail=tail-head;
613 System.arraycopy(_elements,head,elements,0,new_tail);
614 }
615 else if (head>tail || _size.get()>0)
616 {
617 new_tail=_capacity+tail-head;
618 int cut=_capacity-head;
619 System.arraycopy(_elements,head,elements,0,cut);
620 System.arraycopy(_elements,0,elements,cut,tail);
621 }
622 else
623 {
624 new_tail=0;
625 }
626
627 _elements=elements;
628 _capacity=_elements.length;
629 _head=0;
630 _tail=new_tail;
631 return true;
632 }
633 finally
634 {
635 _headLock.unlock();
636 }
637 }
638 finally
639 {
640 _tailLock.unlock();
641 }
642
643 }
644
645
646 public int drainTo(Collection<? super E> c)
647 {
648 throw new UnsupportedOperationException();
649 }
650
651
652 public int drainTo(Collection<? super E> c, int maxElements)
653 {
654 throw new UnsupportedOperationException();
655 }
656
657
658 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
659 {
660 throw new UnsupportedOperationException();
661 }
662
663
664 public void put(E o) throws InterruptedException
665 {
666 if (!add(o))
667 throw new IllegalStateException("full");
668 }
669
670
671 public int remainingCapacity()
672 {
673 _tailLock.lock();
674 try
675 {
676 _headLock.lock();
677 try
678 {
679 return getCapacity()-size();
680 }
681 finally
682 {
683 _headLock.unlock();
684 }
685 }
686 finally
687 {
688 _tailLock.unlock();
689 }
690 }
691
692
693
694 long sumOfSpace()
695 {
696
697 return _space0++ +_space1++ +_space2++ +_space3++ +_space4++ +_space5++ +_space6++ +_space7++;
698 }
699 }