1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.util;
15
16 import java.util.AbstractList;
17 import java.util.Collection;
18 import java.util.NoSuchElementException;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicInteger;
22 import java.util.concurrent.locks.Condition;
23 import java.util.concurrent.locks.ReentrantLock;
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
44 {
45 public final int DEFAULT_CAPACITY=128;
46 public final int DEFAULT_GROWTH=64;
47 private final int _limit;
48 private final AtomicInteger _size=new AtomicInteger();
49 private final int _growCapacity;
50
51 private volatile int _capacity;
52 private Object[] _elements;
53 private int _head;
54 private int _tail;
55
56 private final ReentrantLock _headLock = new ReentrantLock();
57 private final Condition _notEmpty = _headLock.newCondition();
58 private final ReentrantLock _tailLock = new ReentrantLock();
59
60
61
62
63
64 public BlockingArrayQueue()
65 {
66 _elements=new Object[DEFAULT_CAPACITY];
67 _growCapacity=DEFAULT_GROWTH;
68 _capacity=_elements.length;
69 _limit=Integer.MAX_VALUE;
70 }
71
72
73
74
75
76 public BlockingArrayQueue(int limit)
77 {
78 _elements=new Object[limit];
79 _capacity=_elements.length;
80 _growCapacity=-1;
81 _limit=limit;
82 }
83
84
85
86
87
88
89 public BlockingArrayQueue(int capacity,int growBy)
90 {
91 _elements=new Object[capacity];
92 _capacity=_elements.length;
93 _growCapacity=growBy;
94 _limit=Integer.MAX_VALUE;
95 }
96
97
98
99
100
101
102
103 public BlockingArrayQueue(int capacity,int growBy,int limit)
104 {
105 if (capacity>limit)
106 throw new IllegalArgumentException();
107
108 _elements=new Object[capacity];
109 _capacity=_elements.length;
110 _growCapacity=growBy;
111 _limit=limit;
112 }
113
114
115 public int getCapacity()
116 {
117 return _capacity;
118 }
119
120
121 public int getLimit()
122 {
123 return _limit;
124 }
125
126
127 public boolean add(E e)
128 {
129 return offer(e);
130 }
131
132
133 public E element()
134 {
135 E e = peek();
136 if (e==null)
137 throw new NoSuchElementException();
138 return e;
139 }
140
141
142 public E peek()
143 {
144 if (_size.get() == 0)
145 return null;
146
147 E e = null;
148 _headLock.lock();
149 try
150 {
151 if (_size.get() > 0)
152 e = (E)_elements[_head];
153 }
154 finally
155 {
156 _headLock.unlock();
157 }
158
159 return e;
160 }
161
162
163 public boolean offer(E e)
164 {
165 if (e == null)
166 throw new NullPointerException();
167
168 boolean not_empty=false;
169 _tailLock.lock();
170 try
171 {
172 if (_size.get() >= _limit)
173 return false;
174
175
176 if (_size.get()==_capacity)
177 {
178 _headLock.lock();
179 try
180 {
181 if (!grow())
182 return false;
183 }
184 finally
185 {
186 _headLock.unlock();
187 }
188 }
189
190
191 _elements[_tail]=e;
192 _tail=(_tail+1)%_capacity;
193
194 not_empty=0==_size.getAndIncrement();
195
196 }
197 finally
198 {
199 _tailLock.unlock();
200 }
201
202 if (not_empty)
203 {
204 _headLock.lock();
205 try
206 {
207 _notEmpty.signal();
208 }
209 finally
210 {
211 _headLock.unlock();
212 }
213 }
214
215 return true;
216 }
217
218
219
220 public E poll()
221 {
222 if (_size.get() == 0)
223 return null;
224
225 E e = null;
226 _headLock.lock();
227 try
228 {
229 if (_size.get() > 0)
230 {
231 final int head=_head;
232 e = (E)_elements[head];
233 _elements[head]=null;
234 _head=(head+1)%_capacity;
235
236 if (_size.decrementAndGet()>0)
237 _notEmpty.signal();
238 }
239 }
240 finally
241 {
242 _headLock.unlock();
243 }
244
245 return e;
246 }
247
248
249
250
251
252
253
254
255 public E take() throws InterruptedException
256 {
257 E e = null;
258 _headLock.lockInterruptibly();
259 try
260 {
261 try
262 {
263 while (_size.get() == 0)
264 {
265 _notEmpty.await();
266 }
267 }
268 catch (InterruptedException ie)
269 {
270 _notEmpty.signal();
271 throw ie;
272 }
273
274 final int head=_head;
275 e = (E)_elements[head];
276 _elements[head]=null;
277 _head=(head+1)%_capacity;
278
279 if (_size.decrementAndGet()>0)
280 _notEmpty.signal();
281 }
282 finally
283 {
284 _headLock.unlock();
285 }
286
287 return e;
288 }
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303 public E poll(long time, TimeUnit unit) throws InterruptedException
304 {
305
306 E e = null;
307
308 long nanos = unit.toNanos(time);
309
310 _headLock.lockInterruptibly();
311 try
312 {
313 try
314 {
315 while (_size.get() == 0)
316 {
317 if (nanos<=0)
318 return null;
319 nanos = _notEmpty.awaitNanos(nanos);
320 }
321 }
322 catch (InterruptedException ie)
323 {
324 _notEmpty.signal();
325 throw ie;
326 }
327
328 e = (E)_elements[_head];
329 _elements[_head]=null;
330 _head=(_head+1)%_capacity;
331
332 if (_size.decrementAndGet()>0)
333 _notEmpty.signal();
334 }
335 finally
336 {
337 _headLock.unlock();
338 }
339
340 return e;
341 }
342
343
344 public E remove()
345 {
346 E e=poll();
347 if (e==null)
348 throw new NoSuchElementException();
349 return e;
350 }
351
352
353 public void clear()
354 {
355 _tailLock.lock();
356 try
357 {
358 _headLock.lock();
359 try
360 {
361 _head=0;
362 _tail=0;
363 _size.set(0);
364 }
365 finally
366 {
367 _headLock.unlock();
368 }
369 }
370 finally
371 {
372 _tailLock.unlock();
373 }
374 }
375
376
377 public boolean isEmpty()
378 {
379 return _size.get()==0;
380 }
381
382
383 public int size()
384 {
385 return _size.get();
386 }
387
388
389 public E get(int index)
390 {
391 _tailLock.lock();
392 try
393 {
394 _headLock.lock();
395 try
396 {
397 if (index<0 || index>=_size.get())
398 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
399 int i = _head+index;
400 if (i>=_capacity)
401 i-=_capacity;
402 return (E)_elements[i];
403 }
404 finally
405 {
406 _headLock.unlock();
407 }
408 }
409 finally
410 {
411 _tailLock.unlock();
412 }
413 }
414
415
416 public E remove(int index)
417 {
418 _tailLock.lock();
419 try
420 {
421 _headLock.lock();
422 try
423 {
424
425 if (index<0 || index>=_size.get())
426 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
427
428 int i = _head+index;
429 if (i>=_capacity)
430 i-=_capacity;
431 E old=(E)_elements[i];
432
433 if (i<_tail)
434 {
435 System.arraycopy(_elements,i+1,_elements,i,_tail-i);
436 _tail--;
437 _size.decrementAndGet();
438 }
439 else
440 {
441 System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
442 if (_tail>0)
443 {
444 _elements[_capacity]=_elements[0];
445 System.arraycopy(_elements,1,_elements,0,_tail-1);
446 _tail--;
447 }
448 else
449 _tail=_capacity-1;
450
451 _size.decrementAndGet();
452 }
453
454 return old;
455 }
456 finally
457 {
458 _headLock.unlock();
459 }
460 }
461 finally
462 {
463 _tailLock.unlock();
464 }
465 }
466
467
468 public E set(int index, E e)
469 {
470 if (e == null)
471 throw new NullPointerException();
472
473 _tailLock.lock();
474 try
475 {
476 _headLock.lock();
477 try
478 {
479
480 if (index<0 || index>=_size.get())
481 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
482
483 int i = _head+index;
484 if (i>=_capacity)
485 i-=_capacity;
486 E old=(E)_elements[i];
487 _elements[i]=e;
488 return old;
489 }
490 finally
491 {
492 _headLock.unlock();
493 }
494 }
495 finally
496 {
497 _tailLock.unlock();
498 }
499 }
500
501
502 public void add(int index, E e)
503 {
504 if (e == null)
505 throw new NullPointerException();
506
507 _tailLock.lock();
508 try
509 {
510 _headLock.lock();
511 try
512 {
513
514 if (index<0 || index>_size.get())
515 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
516
517 if (index==_size.get())
518 {
519 add(e);
520 }
521 else
522 {
523 if (_tail==_head)
524 if (!grow())
525 throw new IllegalStateException("full");
526
527 int i = _head+index;
528 if (i>=_capacity)
529 i-=_capacity;
530
531 _size.incrementAndGet();
532 _tail=(_tail+1)%_capacity;
533
534
535 if (i<_tail)
536 {
537 System.arraycopy(_elements,i,_elements,i+1,_tail-i);
538 _elements[i]=e;
539 }
540 else
541 {
542 if (_tail>0)
543 {
544 System.arraycopy(_elements,0,_elements,1,_tail);
545 _elements[0]=_elements[_capacity-1];
546 }
547
548 System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
549 _elements[i]=e;
550 }
551 }
552 }
553 finally
554 {
555 _headLock.unlock();
556 }
557 }
558 finally
559 {
560 _tailLock.unlock();
561 }
562 }
563
564
565 private boolean grow()
566 {
567 if (_growCapacity<=0)
568 return false;
569
570 _tailLock.lock();
571 try
572 {
573 _headLock.lock();
574 try
575 {
576 final int head=_head;
577 final int tail=_tail;
578 final int new_tail;
579
580 Object[] elements=new Object[_capacity+_growCapacity];
581
582 if (head<tail)
583 {
584 new_tail=tail-head;
585 System.arraycopy(_elements,head,elements,0,new_tail);
586 }
587 else if (head>tail || _size.get()>0)
588 {
589 new_tail=_capacity+tail-head;
590 int cut=_capacity-head;
591 System.arraycopy(_elements,head,elements,0,cut);
592 System.arraycopy(_elements,0,elements,cut,tail);
593 }
594 else
595 {
596 new_tail=0;
597 }
598
599 _elements=elements;
600 _capacity=_elements.length;
601 _head=0;
602 _tail=new_tail;
603 return true;
604 }
605 finally
606 {
607 _headLock.unlock();
608 }
609 }
610 finally
611 {
612 _tailLock.unlock();
613 }
614
615 }
616
617
618 public int drainTo(Collection<? super E> c)
619 {
620 throw new UnsupportedOperationException();
621 }
622
623
624 public int drainTo(Collection<? super E> c, int maxElements)
625 {
626 throw new UnsupportedOperationException();
627 }
628
629
630 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
631 {
632 throw new UnsupportedOperationException();
633 }
634
635
636 public void put(E o) throws InterruptedException
637 {
638 if (!add(o))
639 throw new IllegalStateException("full");
640 }
641
642
643 public int remainingCapacity()
644 {
645 _tailLock.lock();
646 try
647 {
648 _headLock.lock();
649 try
650 {
651 return getCapacity()-size();
652 }
653 finally
654 {
655 _headLock.unlock();
656 }
657 }
658 finally
659 {
660 _tailLock.unlock();
661 }
662 }
663 }