1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.util.thread;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.RejectedExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.concurrent.atomic.AtomicLong;
31
32 import org.eclipse.jetty.util.BlockingArrayQueue;
33 import org.eclipse.jetty.util.ConcurrentHashSet;
34 import org.eclipse.jetty.util.annotation.ManagedAttribute;
35 import org.eclipse.jetty.util.annotation.ManagedObject;
36 import org.eclipse.jetty.util.annotation.ManagedOperation;
37 import org.eclipse.jetty.util.annotation.Name;
38 import org.eclipse.jetty.util.component.AbstractLifeCycle;
39 import org.eclipse.jetty.util.component.ContainerLifeCycle;
40 import org.eclipse.jetty.util.component.Dumpable;
41 import org.eclipse.jetty.util.component.LifeCycle;
42 import org.eclipse.jetty.util.log.Log;
43 import org.eclipse.jetty.util.log.Logger;
44 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
45
46 @ManagedObject("A thread pool with no max bound by default")
47 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
48 {
49 private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
50
51 private final AtomicInteger _threadsStarted = new AtomicInteger();
52 private final AtomicInteger _threadsIdle = new AtomicInteger();
53 private final AtomicLong _lastShrink = new AtomicLong();
54 private final ConcurrentHashSet<Thread> _threads=new ConcurrentHashSet<Thread>();
55 private final Object _joinLock = new Object();
56 private final BlockingQueue<Runnable> _jobs;
57 private final ThreadGroup _threadGroup;
58 private String _name = "qtp" + hashCode();
59 private int _idleTimeout;
60 private int _maxThreads;
61 private int _minThreads;
62 private int _priority = Thread.NORM_PRIORITY;
63 private boolean _daemon = false;
64 private boolean _detailedDump = false;
65
66 public QueuedThreadPool()
67 {
68 this(200);
69 }
70
71 public QueuedThreadPool(@Name("maxThreads") int maxThreads)
72 {
73 this(maxThreads, 8);
74 }
75
76 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads)
77 {
78 this(maxThreads, minThreads, 60000);
79 }
80
81 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout")int idleTimeout)
82 {
83 this(maxThreads, minThreads, idleTimeout, null);
84 }
85
86 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
87 {
88 this(maxThreads, minThreads, idleTimeout, queue, null);
89 }
90
91 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
92 {
93 setMinThreads(minThreads);
94 setMaxThreads(maxThreads);
95 setIdleTimeout(idleTimeout);
96 setStopTimeout(5000);
97
98 if (queue==null)
99 {
100 int capacity=Math.max(_minThreads, 8);
101 queue=new BlockingArrayQueue<>(capacity, capacity);
102 }
103 _jobs=queue;
104 _threadGroup=threadGroup;
105 }
106
107 @Override
108 protected void doStart() throws Exception
109 {
110 super.doStart();
111 _threadsStarted.set(0);
112
113 startThreads(_minThreads);
114 }
115
116 @Override
117 protected void doStop() throws Exception
118 {
119 super.doStop();
120
121 long timeout = getStopTimeout();
122 BlockingQueue<Runnable> jobs = getQueue();
123
124
125 if (timeout <= 0)
126 jobs.clear();
127
128
129 Runnable noop = new Runnable()
130 {
131 @Override
132 public void run()
133 {
134 }
135 };
136 for (int i = _threadsStarted.get(); i-- > 0; )
137 jobs.offer(noop);
138
139
140 long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
141 for (Thread thread : _threads)
142 {
143 long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
144 if (canwait > 0)
145 thread.join(canwait);
146 }
147
148
149
150
151 if (_threadsStarted.get() > 0)
152 for (Thread thread : _threads)
153 thread.interrupt();
154
155
156 stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
157 for (Thread thread : _threads)
158 {
159 long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
160 if (canwait > 0)
161 thread.join(canwait);
162 }
163
164 Thread.yield();
165 int size = _threads.size();
166 if (size > 0)
167 {
168 Thread.yield();
169
170 if (LOG.isDebugEnabled())
171 {
172 for (Thread unstopped : _threads)
173 {
174 StringBuilder dmp = new StringBuilder();
175 for (StackTraceElement element : unstopped.getStackTrace())
176 {
177 dmp.append(System.lineSeparator()).append("\tat ").append(element);
178 }
179 LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
180 }
181 }
182 else
183 {
184 for (Thread unstopped : _threads)
185 LOG.warn("{} Couldn't stop {}",this,unstopped);
186 }
187 }
188
189 synchronized (_joinLock)
190 {
191 _joinLock.notifyAll();
192 }
193 }
194
195
196
197
198
199
200
201 public void setDaemon(boolean daemon)
202 {
203 _daemon = daemon;
204 }
205
206
207
208
209
210
211
212
213
214
215 public void setIdleTimeout(int idleTimeout)
216 {
217 _idleTimeout = idleTimeout;
218 }
219
220
221
222
223
224
225
226
227 @Override
228 public void setMaxThreads(int maxThreads)
229 {
230 _maxThreads = maxThreads;
231 if (_minThreads > _maxThreads)
232 _minThreads = _maxThreads;
233 }
234
235
236
237
238
239
240
241
242 @Override
243 public void setMinThreads(int minThreads)
244 {
245 _minThreads = minThreads;
246
247 if (_minThreads > _maxThreads)
248 _maxThreads = _minThreads;
249
250 int threads = _threadsStarted.get();
251 if (isStarted() && threads < _minThreads)
252 startThreads(_minThreads - threads);
253 }
254
255
256
257
258 public void setName(String name)
259 {
260 if (isRunning())
261 throw new IllegalStateException("started");
262 _name = name;
263 }
264
265
266
267
268
269
270 public void setThreadsPriority(int priority)
271 {
272 _priority = priority;
273 }
274
275
276
277
278
279
280
281
282 @ManagedAttribute("maximum time a thread may be idle in ms")
283 public int getIdleTimeout()
284 {
285 return _idleTimeout;
286 }
287
288
289
290
291
292
293
294
295 @Override
296 @ManagedAttribute("maximum number of threads in the pool")
297 public int getMaxThreads()
298 {
299 return _maxThreads;
300 }
301
302
303
304
305
306
307
308
309 @Override
310 @ManagedAttribute("minimum number of threads in the pool")
311 public int getMinThreads()
312 {
313 return _minThreads;
314 }
315
316
317
318
319 @ManagedAttribute("name of the thread pool")
320 public String getName()
321 {
322 return _name;
323 }
324
325
326
327
328
329
330 @ManagedAttribute("priority of threads in the pool")
331 public int getThreadsPriority()
332 {
333 return _priority;
334 }
335
336
337
338
339
340
341 @ManagedAttribute("Size of the job queue")
342 public int getQueueSize()
343 {
344 return _jobs.size();
345 }
346
347
348
349
350
351
352
353 @ManagedAttribute("thead pool using a daemon thread")
354 public boolean isDaemon()
355 {
356 return _daemon;
357 }
358
359 public boolean isDetailedDump()
360 {
361 return _detailedDump;
362 }
363
364 public void setDetailedDump(boolean detailedDump)
365 {
366 _detailedDump = detailedDump;
367 }
368
369 @Override
370 public void execute(Runnable job)
371 {
372 if (LOG.isDebugEnabled())
373 LOG.debug("queue {}",job);
374 if (!isRunning() || !_jobs.offer(job))
375 {
376 LOG.warn("{} rejected {}", this, job);
377 throw new RejectedExecutionException(job.toString());
378 }
379 else
380 {
381
382 if (getThreads() == 0)
383 startThreads(1);
384 }
385 }
386
387
388
389
390 @Override
391 public void join() throws InterruptedException
392 {
393 synchronized (_joinLock)
394 {
395 while (isRunning())
396 _joinLock.wait();
397 }
398
399 while (isStopping())
400 Thread.sleep(1);
401 }
402
403
404
405
406 @Override
407 @ManagedAttribute("total number of threads currently in the pool")
408 public int getThreads()
409 {
410 return _threadsStarted.get();
411 }
412
413
414
415
416 @Override
417 @ManagedAttribute("total number of idle threads in the pool")
418 public int getIdleThreads()
419 {
420 return _threadsIdle.get();
421 }
422
423
424
425
426 @ManagedAttribute("total number of busy threads in the pool")
427 public int getBusyThreads()
428 {
429 return getThreads() - getIdleThreads();
430 }
431
432
433
434
435 @Override
436 @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
437 public boolean isLowOnThreads()
438 {
439 return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
440 }
441
442 private boolean startThreads(int threadsToStart)
443 {
444 while (threadsToStart > 0 && isRunning())
445 {
446 int threads = _threadsStarted.get();
447 if (threads >= _maxThreads)
448 return false;
449
450 if (!_threadsStarted.compareAndSet(threads, threads + 1))
451 continue;
452
453 boolean started = false;
454 try
455 {
456 Thread thread = newThread(_runnable);
457 thread.setDaemon(isDaemon());
458 thread.setPriority(getThreadsPriority());
459 thread.setName(_name + "-" + thread.getId());
460 _threads.add(thread);
461
462 thread.start();
463 started = true;
464 --threadsToStart;
465 }
466 finally
467 {
468 if (!started)
469 _threadsStarted.decrementAndGet();
470 }
471 }
472 return true;
473 }
474
475 protected Thread newThread(Runnable runnable)
476 {
477 return new Thread(_threadGroup, runnable);
478 }
479
480 @Override
481 @ManagedOperation("dump thread state")
482 public String dump()
483 {
484 return ContainerLifeCycle.dump(this);
485 }
486
487 @Override
488 public void dump(Appendable out, String indent) throws IOException
489 {
490 List<Object> dump = new ArrayList<>(getMaxThreads());
491 for (final Thread thread : _threads)
492 {
493 final StackTraceElement[] trace = thread.getStackTrace();
494 boolean inIdleJobPoll = false;
495 for (StackTraceElement t : trace)
496 {
497 if ("idleJobPoll".equals(t.getMethodName()))
498 {
499 inIdleJobPoll = true;
500 break;
501 }
502 }
503 final boolean idle = inIdleJobPoll;
504
505 if (isDetailedDump())
506 {
507 dump.add(new Dumpable()
508 {
509 @Override
510 public void dump(Appendable out, String indent) throws IOException
511 {
512 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "");
513 if (thread.getPriority()!=Thread.NORM_PRIORITY)
514 out.append(" prio=").append(String.valueOf(thread.getPriority()));
515 out.append(System.lineSeparator());
516 if (!idle)
517 ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
518 }
519
520 @Override
521 public String dump()
522 {
523 return null;
524 }
525 });
526 }
527 else
528 {
529 int p=thread.getPriority();
530 dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p)));
531 }
532 }
533
534 ContainerLifeCycle.dumpObject(out, this);
535 ContainerLifeCycle.dump(out, indent, dump);
536 }
537
538 @Override
539 public String toString()
540 {
541 return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
542 }
543
544 private Runnable idleJobPoll() throws InterruptedException
545 {
546 return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
547 }
548
549 private Runnable _runnable = new Runnable()
550 {
551 @Override
552 public void run()
553 {
554 boolean shrink = false;
555 boolean ignore = false;
556 try
557 {
558 Runnable job = _jobs.poll();
559
560 if (job != null && _threadsIdle.get() == 0)
561 {
562 startThreads(1);
563 }
564
565 loop: while (isRunning())
566 {
567
568 while (job != null && isRunning())
569 {
570 if (LOG.isDebugEnabled())
571 LOG.debug("run {}",job);
572 runJob(job);
573 if (LOG.isDebugEnabled())
574 LOG.debug("ran {}",job);
575 if (Thread.interrupted())
576 {
577 ignore=true;
578 break loop;
579 }
580 job = _jobs.poll();
581 }
582
583
584 try
585 {
586 _threadsIdle.incrementAndGet();
587
588 while (isRunning() && job == null)
589 {
590 if (_idleTimeout <= 0)
591 job = _jobs.take();
592 else
593 {
594
595 final int size = _threadsStarted.get();
596 if (size > _minThreads)
597 {
598 long last = _lastShrink.get();
599 long now = System.nanoTime();
600 if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
601 {
602 if (_lastShrink.compareAndSet(last, now) && _threadsStarted.compareAndSet(size, size - 1))
603 {
604 shrink=true;
605 break loop;
606 }
607 }
608 }
609 job = idleJobPoll();
610 }
611 }
612 }
613 finally
614 {
615 if (_threadsIdle.decrementAndGet() == 0)
616 {
617 startThreads(1);
618 }
619 }
620 }
621 }
622 catch (InterruptedException e)
623 {
624 ignore=true;
625 LOG.ignore(e);
626 }
627 catch (Throwable e)
628 {
629 LOG.warn(e);
630 }
631 finally
632 {
633 if (!shrink && isRunning())
634 {
635 if (!ignore)
636 LOG.warn("Unexpected thread death: {} in {}",this,QueuedThreadPool.this);
637
638 if (_threadsStarted.decrementAndGet()<getMaxThreads())
639 startThreads(1);
640 }
641 _threads.remove(Thread.currentThread());
642 }
643 }
644 };
645
646
647
648
649
650
651
652 protected void runJob(Runnable job)
653 {
654 job.run();
655 }
656
657
658
659
660 protected BlockingQueue<Runnable> getQueue()
661 {
662 return _jobs;
663 }
664
665
666
667
668 public void setQueue(BlockingQueue<Runnable> queue)
669 {
670 throw new UnsupportedOperationException("Use constructor injection");
671 }
672
673
674
675
676
677 @ManagedOperation("interrupt a pool thread")
678 public boolean interruptThread(@Name("id") long id)
679 {
680 for (Thread thread : _threads)
681 {
682 if (thread.getId() == id)
683 {
684 thread.interrupt();
685 return true;
686 }
687 }
688 return false;
689 }
690
691
692
693
694
695 @ManagedOperation("dump a pool thread stack")
696 public String dumpThread(@Name("id") long id)
697 {
698 for (Thread thread : _threads)
699 {
700 if (thread.getId() == id)
701 {
702 StringBuilder buf = new StringBuilder();
703 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
704 buf.append(thread.getState()).append(":").append(System.lineSeparator());
705 for (StackTraceElement element : thread.getStackTrace())
706 buf.append(" at ").append(element.toString()).append(System.lineSeparator());
707 return buf.toString();
708 }
709 }
710 return null;
711 }
712 }