1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.util.thread;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.concurrent.ArrayBlockingQueue;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.Executor;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.eclipse.jetty.util.BlockingArrayQueue;
36 import org.eclipse.jetty.util.component.AbstractLifeCycle;
37 import org.eclipse.jetty.util.component.AggregateLifeCycle;
38 import org.eclipse.jetty.util.component.Dumpable;
39 import org.eclipse.jetty.util.component.LifeCycle;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
43
44 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable
45 {
46 private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
47
48 private final AtomicInteger _threadsStarted = new AtomicInteger();
49 private final AtomicInteger _threadsIdle = new AtomicInteger();
50 private final AtomicLong _lastShrink = new AtomicLong();
51 private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
52 private final Object _joinLock = new Object();
53 private BlockingQueue<Runnable> _jobs;
54 private String _name;
55 private int _maxIdleTimeMs=60000;
56 private int _maxThreads=254;
57 private int _minThreads=8;
58 private int _maxQueued=-1;
59 private int _priority=Thread.NORM_PRIORITY;
60 private boolean _daemon=false;
61 private int _maxStopTime=100;
62 private boolean _detailedDump=false;
63
64
65
66
67 public QueuedThreadPool()
68 {
69 _name="qtp"+super.hashCode();
70 }
71
72
73
74
75 public QueuedThreadPool(int maxThreads)
76 {
77 this();
78 setMaxThreads(maxThreads);
79 }
80
81
82
83
84 public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
85 {
86 this();
87 _jobs=jobQ;
88 _jobs.clear();
89 }
90
91
92
93 @Override
94 protected void doStart() throws Exception
95 {
96 super.doStart();
97 _threadsStarted.set(0);
98
99 if (_jobs==null)
100 {
101 _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
102 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
103 }
104
105 int threads=_threadsStarted.get();
106 while (isRunning() && threads<_minThreads)
107 {
108 startThread(threads);
109 threads=_threadsStarted.get();
110 }
111 }
112
113
114 @Override
115 protected void doStop() throws Exception
116 {
117 super.doStop();
118 long start=System.currentTimeMillis();
119
120
121 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
122 Thread.sleep(1);
123
124
125 _jobs.clear();
126 Runnable noop = new Runnable(){public void run(){}};
127 for (int i=_threadsIdle.get();i-->0;)
128 _jobs.offer(noop);
129 Thread.yield();
130
131
132 if (_threadsStarted.get()>0)
133 for (Thread thread : _threads)
134 thread.interrupt();
135
136
137 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
138 {
139 Thread.sleep(1);
140 }
141 Thread.yield();
142 int size=_threads.size();
143 if (size>0)
144 {
145 LOG.warn(size+" threads could not be stopped");
146
147 if (size==1 || LOG.isDebugEnabled())
148 {
149 for (Thread unstopped : _threads)
150 {
151 LOG.info("Couldn't stop "+unstopped);
152 for (StackTraceElement element : unstopped.getStackTrace())
153 {
154 LOG.info(" at "+element);
155 }
156 }
157 }
158 }
159
160 synchronized (_joinLock)
161 {
162 _joinLock.notifyAll();
163 }
164 }
165
166
167
168
169
170 public void setDaemon(boolean daemon)
171 {
172 _daemon=daemon;
173 }
174
175
176
177
178
179
180
181
182
183 public void setMaxIdleTimeMs(int maxIdleTimeMs)
184 {
185 _maxIdleTimeMs=maxIdleTimeMs;
186 }
187
188
189
190
191
192 public void setMaxStopTimeMs(int stopTimeMs)
193 {
194 _maxStopTime = stopTimeMs;
195 }
196
197
198
199
200
201
202
203 public void setMaxThreads(int maxThreads)
204 {
205 _maxThreads=maxThreads;
206 if (_minThreads>_maxThreads)
207 _minThreads=_maxThreads;
208 }
209
210
211
212
213
214
215
216 public void setMinThreads(int minThreads)
217 {
218 _minThreads=minThreads;
219
220 if (_minThreads>_maxThreads)
221 _maxThreads=_minThreads;
222
223 int threads=_threadsStarted.get();
224 while (isStarted() && threads<_minThreads)
225 {
226 startThread(threads);
227 threads=_threadsStarted.get();
228 }
229 }
230
231
232
233
234
235 public void setName(String name)
236 {
237 if (isRunning())
238 throw new IllegalStateException("started");
239 _name= name;
240 }
241
242
243
244
245
246 public void setThreadsPriority(int priority)
247 {
248 _priority=priority;
249 }
250
251
252
253
254
255 public int getMaxQueued()
256 {
257 return _maxQueued;
258 }
259
260
261
262
263
264 public void setMaxQueued(int max)
265 {
266 if (isRunning())
267 throw new IllegalStateException("started");
268 _maxQueued=max;
269 }
270
271
272
273
274
275
276
277 public int getMaxIdleTimeMs()
278 {
279 return _maxIdleTimeMs;
280 }
281
282
283
284
285
286 public int getMaxStopTimeMs()
287 {
288 return _maxStopTime;
289 }
290
291
292
293
294
295
296
297 public int getMaxThreads()
298 {
299 return _maxThreads;
300 }
301
302
303
304
305
306
307
308 public int getMinThreads()
309 {
310 return _minThreads;
311 }
312
313
314
315
316
317 public String getName()
318 {
319 return _name;
320 }
321
322
323
324
325
326 public int getThreadsPriority()
327 {
328 return _priority;
329 }
330
331
332
333
334
335 public boolean isDaemon()
336 {
337 return _daemon;
338 }
339
340
341 public boolean isDetailedDump()
342 {
343 return _detailedDump;
344 }
345
346
347 public void setDetailedDump(boolean detailedDump)
348 {
349 _detailedDump = detailedDump;
350 }
351
352
353 public boolean dispatch(Runnable job)
354 {
355 if (isRunning())
356 {
357 final int jobQ = _jobs.size();
358 final int idle = getIdleThreads();
359 if(_jobs.offer(job))
360 {
361
362 if (idle==0 || jobQ>idle)
363 {
364 int threads=_threadsStarted.get();
365 if (threads<_maxThreads)
366 startThread(threads);
367 }
368 return true;
369 }
370 }
371 LOG.debug("Dispatched {} to stopped {}",job,this);
372 return false;
373 }
374
375
376 public void execute(Runnable job)
377 {
378 if (!dispatch(job))
379 throw new RejectedExecutionException();
380 }
381
382
383
384
385
386 public void join() throws InterruptedException
387 {
388 synchronized (_joinLock)
389 {
390 while (isRunning())
391 _joinLock.wait();
392 }
393
394 while (isStopping())
395 Thread.sleep(1);
396 }
397
398
399
400
401
402 public int getThreads()
403 {
404 return _threadsStarted.get();
405 }
406
407
408
409
410
411 public int getIdleThreads()
412 {
413 return _threadsIdle.get();
414 }
415
416
417
418
419
420 public boolean isLowOnThreads()
421 {
422 return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
423 }
424
425
426 private boolean startThread(int threads)
427 {
428 final int next=threads+1;
429 if (!_threadsStarted.compareAndSet(threads,next))
430 return false;
431
432 boolean started=false;
433 try
434 {
435 Thread thread=newThread(_runnable);
436 thread.setDaemon(_daemon);
437 thread.setPriority(_priority);
438 thread.setName(_name+"-"+thread.getId());
439 _threads.add(thread);
440
441 thread.start();
442 started=true;
443 }
444 finally
445 {
446 if (!started)
447 _threadsStarted.decrementAndGet();
448 }
449 return started;
450 }
451
452
453 protected Thread newThread(Runnable runnable)
454 {
455 return new Thread(runnable);
456 }
457
458
459
460 public String dump()
461 {
462 return AggregateLifeCycle.dump(this);
463 }
464
465
466 public void dump(Appendable out, String indent) throws IOException
467 {
468 List<Object> dump = new ArrayList<Object>(getMaxThreads());
469 for (final Thread thread: _threads)
470 {
471 final StackTraceElement[] trace=thread.getStackTrace();
472 boolean inIdleJobPoll=false;
473
474 if (trace != null)
475 {
476 for (StackTraceElement t : trace)
477 {
478 if ("idleJobPoll".equals(t.getMethodName()))
479 {
480 inIdleJobPoll = true;
481 break;
482 }
483 }
484 }
485 final boolean idle=inIdleJobPoll;
486
487 if (_detailedDump)
488 {
489 dump.add(new Dumpable()
490 {
491 public void dump(Appendable out, String indent) throws IOException
492 {
493 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
494 if (!idle)
495 AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
496 }
497
498 public String dump()
499 {
500 return null;
501 }
502 });
503 }
504 else
505 {
506 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":""));
507 }
508 }
509
510 AggregateLifeCycle.dumpObject(out,this);
511 AggregateLifeCycle.dump(out,indent,dump);
512
513 }
514
515
516
517 @Override
518 public String toString()
519 {
520 return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
521 }
522
523
524 private Runnable idleJobPoll() throws InterruptedException
525 {
526 return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
527 }
528
529
530 private Runnable _runnable = new Runnable()
531 {
532 public void run()
533 {
534 boolean shrink=false;
535 try
536 {
537 Runnable job=_jobs.poll();
538 while (isRunning())
539 {
540
541 while (job!=null && isRunning())
542 {
543 runJob(job);
544 job=_jobs.poll();
545 }
546
547
548 try
549 {
550 _threadsIdle.incrementAndGet();
551
552 while (isRunning() && job==null)
553 {
554 if (_maxIdleTimeMs<=0)
555 job=_jobs.take();
556 else
557 {
558
559 final int size=_threadsStarted.get();
560 if (size>_minThreads)
561 {
562 long last=_lastShrink.get();
563 long now=System.currentTimeMillis();
564 if (last==0 || (now-last)>_maxIdleTimeMs)
565 {
566 shrink=_lastShrink.compareAndSet(last,now) &&
567 _threadsStarted.compareAndSet(size,size-1);
568 if (shrink)
569 return;
570 }
571 }
572 job=idleJobPoll();
573 }
574 }
575 }
576 finally
577 {
578 _threadsIdle.decrementAndGet();
579 }
580 }
581 }
582 catch(InterruptedException e)
583 {
584 LOG.ignore(e);
585 }
586 catch(Exception e)
587 {
588 LOG.warn(e);
589 }
590 finally
591 {
592 if (!shrink)
593 _threadsStarted.decrementAndGet();
594 _threads.remove(Thread.currentThread());
595 }
596 }
597 };
598
599
600
601
602
603
604
605
606 protected void runJob(Runnable job)
607 {
608 job.run();
609 }
610
611
612
613
614
615 protected BlockingQueue<Runnable> getQueue()
616 {
617 return _jobs;
618 }
619
620
621
622
623
624
625
626 @Deprecated
627 public boolean stopThread(long id)
628 {
629 for (Thread thread: _threads)
630 {
631 if (thread.getId()==id)
632 {
633 thread.stop();
634 return true;
635 }
636 }
637 return false;
638 }
639
640
641
642
643
644
645 public boolean interruptThread(long id)
646 {
647 for (Thread thread: _threads)
648 {
649 if (thread.getId()==id)
650 {
651 thread.interrupt();
652 return true;
653 }
654 }
655 return false;
656 }
657
658
659
660
661
662
663 public String dumpThread(long id)
664 {
665 for (Thread thread: _threads)
666 {
667 if (thread.getId()==id)
668 {
669 StringBuilder buf = new StringBuilder();
670 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
671 for (StackTraceElement element : thread.getStackTrace())
672 buf.append(" at ").append(element.toString()).append('\n');
673 return buf.toString();
674 }
675 }
676 return null;
677 }
678 }