1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.util;
15
16 import java.util.AbstractList;
17 import java.util.Collection;
18 import java.util.NoSuchElementException;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicInteger;
22 import java.util.concurrent.locks.Condition;
23 import java.util.concurrent.locks.ReentrantLock;
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
44 {
45 public final int DEFAULT_CAPACITY=128;
46 public final int DEFAULT_GROWTH=64;
47 private final int _limit;
48 private final AtomicInteger _size=new AtomicInteger();
49 private final int _growCapacity;
50
51 private volatile int _capacity;
52 private Object[] _elements;
53 private int _head;
54 private int _tail;
55
56 private final ReentrantLock _headLock = new ReentrantLock();
57 private final Condition _notEmpty = _headLock.newCondition();
58 private final ReentrantLock _tailLock = new ReentrantLock();
59
60
61
62
63
64 public BlockingArrayQueue()
65 {
66 _elements=new Object[DEFAULT_CAPACITY];
67 _growCapacity=DEFAULT_GROWTH;
68 _capacity=_elements.length;
69 _limit=Integer.MAX_VALUE;
70 }
71
72
73
74
75
76 public BlockingArrayQueue(int limit)
77 {
78 _elements=new Object[limit];
79 _capacity=_elements.length;
80 _growCapacity=-1;
81 _limit=limit;
82 }
83
84
85
86
87
88
89 public BlockingArrayQueue(int capacity,int growBy)
90 {
91 _elements=new Object[capacity];
92 _capacity=_elements.length;
93 _growCapacity=growBy;
94 _limit=Integer.MAX_VALUE;
95 }
96
97
98
99
100
101
102
103 public BlockingArrayQueue(int capacity,int growBy,int limit)
104 {
105 if (capacity>limit)
106 throw new IllegalArgumentException();
107
108 _elements=new Object[capacity];
109 _capacity=_elements.length;
110 _growCapacity=growBy;
111 _limit=limit;
112 }
113
114
115 public int getCapacity()
116 {
117 return _capacity;
118 }
119
120
121 public int getLimit()
122 {
123 return _limit;
124 }
125
126
127 @Override
128 public boolean add(E e)
129 {
130 return offer(e);
131 }
132
133
134 public E element()
135 {
136 E e = peek();
137 if (e==null)
138 throw new NoSuchElementException();
139 return e;
140 }
141
142
143 public E peek()
144 {
145 if (_size.get() == 0)
146 return null;
147
148 E e = null;
149 _headLock.lock();
150 try
151 {
152 if (_size.get() > 0)
153 e = (E)_elements[_head];
154 }
155 finally
156 {
157 _headLock.unlock();
158 }
159
160 return e;
161 }
162
163
164 public boolean offer(E e)
165 {
166 if (e == null)
167 throw new NullPointerException();
168
169 boolean not_empty=false;
170 _tailLock.lock();
171 try
172 {
173 if (_size.get() >= _limit)
174 return false;
175
176
177 if (_size.get()==_capacity)
178 {
179 _headLock.lock();
180 try
181 {
182 if (!grow())
183 return false;
184 }
185 finally
186 {
187 _headLock.unlock();
188 }
189 }
190
191
192 _elements[_tail]=e;
193 _tail=(_tail+1)%_capacity;
194
195 not_empty=0==_size.getAndIncrement();
196
197 }
198 finally
199 {
200 _tailLock.unlock();
201 }
202
203 if (not_empty)
204 {
205 _headLock.lock();
206 try
207 {
208 _notEmpty.signal();
209 }
210 finally
211 {
212 _headLock.unlock();
213 }
214 }
215
216 return true;
217 }
218
219
220
221 public E poll()
222 {
223 if (_size.get() == 0)
224 return null;
225
226 E e = null;
227 _headLock.lock();
228 try
229 {
230 if (_size.get() > 0)
231 {
232 final int head=_head;
233 e = (E)_elements[head];
234 _elements[head]=null;
235 _head=(head+1)%_capacity;
236
237 if (_size.decrementAndGet()>0)
238 _notEmpty.signal();
239 }
240 }
241 finally
242 {
243 _headLock.unlock();
244 }
245
246 return e;
247 }
248
249
250
251
252
253
254
255
256 public E take() throws InterruptedException
257 {
258 E e = null;
259 _headLock.lockInterruptibly();
260 try
261 {
262 try
263 {
264 while (_size.get() == 0)
265 {
266 _notEmpty.await();
267 }
268 }
269 catch (InterruptedException ie)
270 {
271 _notEmpty.signal();
272 throw ie;
273 }
274
275 final int head=_head;
276 e = (E)_elements[head];
277 _elements[head]=null;
278 _head=(head+1)%_capacity;
279
280 if (_size.decrementAndGet()>0)
281 _notEmpty.signal();
282 }
283 finally
284 {
285 _headLock.unlock();
286 }
287
288 return e;
289 }
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304 public E poll(long time, TimeUnit unit) throws InterruptedException
305 {
306
307 E e = null;
308
309 long nanos = unit.toNanos(time);
310
311 _headLock.lockInterruptibly();
312 try
313 {
314 try
315 {
316 while (_size.get() == 0)
317 {
318 if (nanos<=0)
319 return null;
320 nanos = _notEmpty.awaitNanos(nanos);
321 }
322 }
323 catch (InterruptedException ie)
324 {
325 _notEmpty.signal();
326 throw ie;
327 }
328
329 e = (E)_elements[_head];
330 _elements[_head]=null;
331 _head=(_head+1)%_capacity;
332
333 if (_size.decrementAndGet()>0)
334 _notEmpty.signal();
335 }
336 finally
337 {
338 _headLock.unlock();
339 }
340
341 return e;
342 }
343
344
345 public E remove()
346 {
347 E e=poll();
348 if (e==null)
349 throw new NoSuchElementException();
350 return e;
351 }
352
353
354 @Override
355 public void clear()
356 {
357 _tailLock.lock();
358 try
359 {
360 _headLock.lock();
361 try
362 {
363 _head=0;
364 _tail=0;
365 _size.set(0);
366 }
367 finally
368 {
369 _headLock.unlock();
370 }
371 }
372 finally
373 {
374 _tailLock.unlock();
375 }
376 }
377
378
379 @Override
380 public boolean isEmpty()
381 {
382 return _size.get()==0;
383 }
384
385
386 @Override
387 public int size()
388 {
389 return _size.get();
390 }
391
392
393 @Override
394 public E get(int index)
395 {
396 _tailLock.lock();
397 try
398 {
399 _headLock.lock();
400 try
401 {
402 if (index<0 || index>=_size.get())
403 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
404 int i = _head+index;
405 if (i>=_capacity)
406 i-=_capacity;
407 return (E)_elements[i];
408 }
409 finally
410 {
411 _headLock.unlock();
412 }
413 }
414 finally
415 {
416 _tailLock.unlock();
417 }
418 }
419
420
421 @Override
422 public E remove(int index)
423 {
424 _tailLock.lock();
425 try
426 {
427 _headLock.lock();
428 try
429 {
430
431 if (index<0 || index>=_size.get())
432 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
433
434 int i = _head+index;
435 if (i>=_capacity)
436 i-=_capacity;
437 E old=(E)_elements[i];
438
439 if (i<_tail)
440 {
441 System.arraycopy(_elements,i+1,_elements,i,_tail-i);
442 _tail--;
443 _size.decrementAndGet();
444 }
445 else
446 {
447 System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
448 if (_tail>0)
449 {
450 _elements[_capacity]=_elements[0];
451 System.arraycopy(_elements,1,_elements,0,_tail-1);
452 _tail--;
453 }
454 else
455 _tail=_capacity-1;
456
457 _size.decrementAndGet();
458 }
459
460 return old;
461 }
462 finally
463 {
464 _headLock.unlock();
465 }
466 }
467 finally
468 {
469 _tailLock.unlock();
470 }
471 }
472
473
474 @Override
475 public E set(int index, E e)
476 {
477 if (e == null)
478 throw new NullPointerException();
479
480 _tailLock.lock();
481 try
482 {
483 _headLock.lock();
484 try
485 {
486
487 if (index<0 || index>=_size.get())
488 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
489
490 int i = _head+index;
491 if (i>=_capacity)
492 i-=_capacity;
493 E old=(E)_elements[i];
494 _elements[i]=e;
495 return old;
496 }
497 finally
498 {
499 _headLock.unlock();
500 }
501 }
502 finally
503 {
504 _tailLock.unlock();
505 }
506 }
507
508
509 @Override
510 public void add(int index, E e)
511 {
512 if (e == null)
513 throw new NullPointerException();
514
515 _tailLock.lock();
516 try
517 {
518 _headLock.lock();
519 try
520 {
521
522 if (index<0 || index>_size.get())
523 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
524
525 if (index==_size.get())
526 {
527 add(e);
528 }
529 else
530 {
531 if (_tail==_head)
532 if (!grow())
533 throw new IllegalStateException("full");
534
535 int i = _head+index;
536 if (i>=_capacity)
537 i-=_capacity;
538
539 _size.incrementAndGet();
540 _tail=(_tail+1)%_capacity;
541
542
543 if (i<_tail)
544 {
545 System.arraycopy(_elements,i,_elements,i+1,_tail-i);
546 _elements[i]=e;
547 }
548 else
549 {
550 if (_tail>0)
551 {
552 System.arraycopy(_elements,0,_elements,1,_tail);
553 _elements[0]=_elements[_capacity-1];
554 }
555
556 System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
557 _elements[i]=e;
558 }
559 }
560 }
561 finally
562 {
563 _headLock.unlock();
564 }
565 }
566 finally
567 {
568 _tailLock.unlock();
569 }
570 }
571
572
573 private boolean grow()
574 {
575 if (_growCapacity<=0)
576 return false;
577
578 _tailLock.lock();
579 try
580 {
581 _headLock.lock();
582 try
583 {
584 final int head=_head;
585 final int tail=_tail;
586 final int new_tail;
587
588 Object[] elements=new Object[_capacity+_growCapacity];
589
590 if (head<tail)
591 {
592 new_tail=tail-head;
593 System.arraycopy(_elements,head,elements,0,new_tail);
594 }
595 else if (head>tail || _size.get()>0)
596 {
597 new_tail=_capacity+tail-head;
598 int cut=_capacity-head;
599 System.arraycopy(_elements,head,elements,0,cut);
600 System.arraycopy(_elements,0,elements,cut,tail);
601 }
602 else
603 {
604 new_tail=0;
605 }
606
607 _elements=elements;
608 _capacity=_elements.length;
609 _head=0;
610 _tail=new_tail;
611 return true;
612 }
613 finally
614 {
615 _headLock.unlock();
616 }
617 }
618 finally
619 {
620 _tailLock.unlock();
621 }
622
623 }
624
625
626 public int drainTo(Collection<? super E> c)
627 {
628 throw new UnsupportedOperationException();
629 }
630
631
632 public int drainTo(Collection<? super E> c, int maxElements)
633 {
634 throw new UnsupportedOperationException();
635 }
636
637
638 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
639 {
640 throw new UnsupportedOperationException();
641 }
642
643
644 public void put(E o) throws InterruptedException
645 {
646 if (!add(o))
647 throw new IllegalStateException("full");
648 }
649
650
651 public int remainingCapacity()
652 {
653 _tailLock.lock();
654 try
655 {
656 _headLock.lock();
657 try
658 {
659 return getCapacity()-size();
660 }
661 finally
662 {
663 _headLock.unlock();
664 }
665 }
666 finally
667 {
668 _tailLock.unlock();
669 }
670 }
671 }