1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.util.thread;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.eclipse.jetty.util.BlockingArrayQueue;
34 import org.eclipse.jetty.util.annotation.ManagedAttribute;
35 import org.eclipse.jetty.util.annotation.ManagedObject;
36 import org.eclipse.jetty.util.annotation.ManagedOperation;
37 import org.eclipse.jetty.util.annotation.Name;
38 import org.eclipse.jetty.util.component.AbstractLifeCycle;
39 import org.eclipse.jetty.util.component.ContainerLifeCycle;
40 import org.eclipse.jetty.util.component.Dumpable;
41 import org.eclipse.jetty.util.component.LifeCycle;
42 import org.eclipse.jetty.util.log.Log;
43 import org.eclipse.jetty.util.log.Logger;
44 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
45
46 @ManagedObject("A thread pool with no max bound by default")
47 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
48 {
49 private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
50
51 private final AtomicInteger _threadsStarted = new AtomicInteger();
52 private final AtomicInteger _threadsIdle = new AtomicInteger();
53 private final AtomicLong _lastShrink = new AtomicLong();
54 private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
55 private final Object _joinLock = new Object();
56 private final BlockingQueue<Runnable> _jobs;
57 private String _name = "qtp" + hashCode();
58 private int _idleTimeout;
59 private int _maxThreads;
60 private int _minThreads;
61 private int _priority = Thread.NORM_PRIORITY;
62 private boolean _daemon = false;
63 private boolean _detailedDump = false;
64
65 public QueuedThreadPool()
66 {
67 this(200);
68 }
69
70 public QueuedThreadPool(@Name("maxThreads") int maxThreads)
71 {
72 this(maxThreads, 8);
73 }
74
75 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads)
76 {
77 this(maxThreads, minThreads, 60000);
78 }
79
80 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout")int idleTimeout)
81 {
82 this(maxThreads, minThreads, idleTimeout, null);
83 }
84
85 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
86 {
87 setMinThreads(minThreads);
88 setMaxThreads(maxThreads);
89 setIdleTimeout(idleTimeout);
90 setStopTimeout(5000);
91
92 if (queue==null)
93 {
94 int capacity=Math.max(_minThreads, 8);
95 queue=new BlockingArrayQueue<>(capacity, capacity);
96 }
97 _jobs=queue;
98 }
99
100 @Override
101 protected void doStart() throws Exception
102 {
103 super.doStart();
104 _threadsStarted.set(0);
105
106 startThreads(_minThreads);
107 }
108
109 @Override
110 protected void doStop() throws Exception
111 {
112 super.doStop();
113
114 long timeout = getStopTimeout();
115 BlockingQueue<Runnable> jobs = getQueue();
116
117
118 if (timeout <= 0)
119 jobs.clear();
120
121
122 Runnable noop = new Runnable()
123 {
124 @Override
125 public void run()
126 {
127 }
128 };
129 for (int i = _threadsStarted.get(); i-- > 0; )
130 jobs.offer(noop);
131
132
133 long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
134 for (Thread thread : _threads)
135 {
136 long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
137 if (canwait > 0)
138 thread.join(canwait);
139 }
140
141
142
143
144 if (_threadsStarted.get() > 0)
145 for (Thread thread : _threads)
146 thread.interrupt();
147
148
149 stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
150 for (Thread thread : _threads)
151 {
152 long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
153 if (canwait > 0)
154 thread.join(canwait);
155 }
156
157 Thread.yield();
158 int size = _threads.size();
159 if (size > 0)
160 {
161 Thread.yield();
162
163 if (LOG.isDebugEnabled())
164 {
165 for (Thread unstopped : _threads)
166 {
167 StringBuilder dmp = new StringBuilder();
168 for (StackTraceElement element : unstopped.getStackTrace())
169 {
170 dmp.append(System.lineSeparator()).append("\tat ").append(element);
171 }
172 LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
173 }
174 }
175 else
176 {
177 for (Thread unstopped : _threads)
178 LOG.warn("{} Couldn't stop {}",this,unstopped);
179 }
180 }
181
182 synchronized (_joinLock)
183 {
184 _joinLock.notifyAll();
185 }
186 }
187
188
189
190
191 public void setDaemon(boolean daemon)
192 {
193 _daemon = daemon;
194 }
195
196
197
198
199
200
201
202
203
204
205 public void setIdleTimeout(int idleTimeout)
206 {
207 _idleTimeout = idleTimeout;
208 }
209
210
211
212
213
214
215
216
217 @Override
218 public void setMaxThreads(int maxThreads)
219 {
220 _maxThreads = maxThreads;
221 if (_minThreads > _maxThreads)
222 _minThreads = _maxThreads;
223 }
224
225
226
227
228
229
230
231
232 @Override
233 public void setMinThreads(int minThreads)
234 {
235 _minThreads = minThreads;
236
237 if (_minThreads > _maxThreads)
238 _maxThreads = _minThreads;
239
240 int threads = _threadsStarted.get();
241 if (isStarted() && threads < _minThreads)
242 startThreads(_minThreads - threads);
243 }
244
245
246
247
248 public void setName(String name)
249 {
250 if (isRunning())
251 throw new IllegalStateException("started");
252 _name = name;
253 }
254
255
256
257
258
259
260 public void setThreadsPriority(int priority)
261 {
262 _priority = priority;
263 }
264
265
266
267
268
269
270
271
272 @ManagedAttribute("maximum time a thread may be idle in ms")
273 public int getIdleTimeout()
274 {
275 return _idleTimeout;
276 }
277
278
279
280
281
282
283
284
285 @Override
286 @ManagedAttribute("maximum number of threads in the pool")
287 public int getMaxThreads()
288 {
289 return _maxThreads;
290 }
291
292
293
294
295
296
297
298
299 @Override
300 @ManagedAttribute("minimum number of threads in the pool")
301 public int getMinThreads()
302 {
303 return _minThreads;
304 }
305
306
307
308
309 @ManagedAttribute("name of the thread pool")
310 public String getName()
311 {
312 return _name;
313 }
314
315
316
317
318
319
320 @ManagedAttribute("priority of threads in the pool")
321 public int getThreadsPriority()
322 {
323 return _priority;
324 }
325
326
327
328
329
330
331 @ManagedAttribute("Size of the job queue")
332 public int getQueueSize()
333 {
334 return _jobs.size();
335 }
336
337
338
339
340 @ManagedAttribute("thead pool using a daemon thread")
341 public boolean isDaemon()
342 {
343 return _daemon;
344 }
345
346 public boolean isDetailedDump()
347 {
348 return _detailedDump;
349 }
350
351 public void setDetailedDump(boolean detailedDump)
352 {
353 _detailedDump = detailedDump;
354 }
355
356 @Override
357 public void execute(Runnable job)
358 {
359 if (!isRunning() || !_jobs.offer(job))
360 {
361 LOG.warn("{} rejected {}", this, job);
362 throw new RejectedExecutionException(job.toString());
363 }
364 else
365 {
366
367 if (getThreads() == 0)
368 startThreads(1);
369 }
370 }
371
372
373
374
375 @Override
376 public void join() throws InterruptedException
377 {
378 synchronized (_joinLock)
379 {
380 while (isRunning())
381 _joinLock.wait();
382 }
383
384 while (isStopping())
385 Thread.sleep(1);
386 }
387
388
389
390
391 @Override
392 @ManagedAttribute("total number of threads currently in the pool")
393 public int getThreads()
394 {
395 return _threadsStarted.get();
396 }
397
398
399
400
401 @Override
402 @ManagedAttribute("total number of idle threads in the pool")
403 public int getIdleThreads()
404 {
405 return _threadsIdle.get();
406 }
407
408
409
410
411 @Override
412 @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
413 public boolean isLowOnThreads()
414 {
415 return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
416 }
417
418 private boolean startThreads(int threadsToStart)
419 {
420 while (threadsToStart > 0 && isRunning())
421 {
422 int threads = _threadsStarted.get();
423 if (threads >= _maxThreads)
424 return false;
425
426 if (!_threadsStarted.compareAndSet(threads, threads + 1))
427 continue;
428
429 boolean started = false;
430 try
431 {
432 Thread thread = newThread(_runnable);
433 thread.setDaemon(isDaemon());
434 thread.setPriority(getThreadsPriority());
435 thread.setName(_name + "-" + thread.getId());
436 _threads.add(thread);
437
438 thread.start();
439 started = true;
440 --threadsToStart;
441 }
442 finally
443 {
444 if (!started)
445 _threadsStarted.decrementAndGet();
446 }
447 }
448 return true;
449 }
450
451 protected Thread newThread(Runnable runnable)
452 {
453 return new Thread(runnable);
454 }
455
456 @Override
457 @ManagedOperation("dump thread state")
458 public String dump()
459 {
460 return ContainerLifeCycle.dump(this);
461 }
462
463 @Override
464 public void dump(Appendable out, String indent) throws IOException
465 {
466 List<Object> dump = new ArrayList<>(getMaxThreads());
467 for (final Thread thread : _threads)
468 {
469 final StackTraceElement[] trace = thread.getStackTrace();
470 boolean inIdleJobPoll = false;
471 for (StackTraceElement t : trace)
472 {
473 if ("idleJobPoll".equals(t.getMethodName()))
474 {
475 inIdleJobPoll = true;
476 break;
477 }
478 }
479 final boolean idle = inIdleJobPoll;
480
481 if (isDetailedDump())
482 {
483 dump.add(new Dumpable()
484 {
485 @Override
486 public void dump(Appendable out, String indent) throws IOException
487 {
488 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "");
489 if (thread.getPriority()!=Thread.NORM_PRIORITY)
490 out.append(" prio=").append(String.valueOf(thread.getPriority()));
491 out.append(System.lineSeparator());
492 if (!idle)
493 ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
494 }
495
496 @Override
497 public String dump()
498 {
499 return null;
500 }
501 });
502 }
503 else
504 {
505 int p=thread.getPriority();
506 dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p)));
507 }
508 }
509
510 ContainerLifeCycle.dumpObject(out, this);
511 ContainerLifeCycle.dump(out, indent, dump);
512 }
513
514 @Override
515 public String toString()
516 {
517 return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
518 }
519
520 private Runnable idleJobPoll() throws InterruptedException
521 {
522 return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
523 }
524
525 private Runnable _runnable = new Runnable()
526 {
527 @Override
528 public void run()
529 {
530 boolean shrink = false;
531 boolean ignore = false;
532 try
533 {
534 Runnable job = _jobs.poll();
535
536 if (job != null && _threadsIdle.get() == 0)
537 {
538 startThreads(1);
539 }
540
541 loop: while (isRunning())
542 {
543
544 while (job != null && isRunning())
545 {
546 runJob(job);
547 if (Thread.interrupted())
548 {
549 ignore=true;
550 break loop;
551 }
552 job = _jobs.poll();
553 }
554
555
556 try
557 {
558 _threadsIdle.incrementAndGet();
559
560 while (isRunning() && job == null)
561 {
562 if (_idleTimeout <= 0)
563 job = _jobs.take();
564 else
565 {
566
567 final int size = _threadsStarted.get();
568 if (size > _minThreads)
569 {
570 long last = _lastShrink.get();
571 long now = System.nanoTime();
572 if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
573 {
574 if (_lastShrink.compareAndSet(last, now) && _threadsStarted.compareAndSet(size, size - 1))
575 {
576 shrink=true;
577 break loop;
578 }
579 }
580 }
581 job = idleJobPoll();
582 }
583 }
584 }
585 finally
586 {
587 if (_threadsIdle.decrementAndGet() == 0)
588 {
589 startThreads(1);
590 }
591 }
592 }
593 }
594 catch (InterruptedException e)
595 {
596 ignore=true;
597 LOG.ignore(e);
598 }
599 catch (Throwable e)
600 {
601 LOG.warn(e);
602 }
603 finally
604 {
605 if (!shrink && isRunning())
606 {
607 if (!ignore)
608 LOG.warn("Unexpected thread death: {} in {}",this,QueuedThreadPool.this);
609
610 if (_threadsStarted.decrementAndGet()<getMaxThreads())
611 startThreads(1);
612 }
613 _threads.remove(Thread.currentThread());
614 }
615 }
616 };
617
618
619
620
621
622
623
624 protected void runJob(Runnable job)
625 {
626 job.run();
627 }
628
629
630
631
632 protected BlockingQueue<Runnable> getQueue()
633 {
634 return _jobs;
635 }
636
637
638
639
640 public void setQueue(BlockingQueue<Runnable> queue)
641 {
642 throw new UnsupportedOperationException("Use constructor injection");
643 }
644
645
646
647
648
649 @ManagedOperation("interrupt a pool thread")
650 public boolean interruptThread(@Name("id") long id)
651 {
652 for (Thread thread : _threads)
653 {
654 if (thread.getId() == id)
655 {
656 thread.interrupt();
657 return true;
658 }
659 }
660 return false;
661 }
662
663
664
665
666
667 @ManagedOperation("dump a pool thread stack")
668 public String dumpThread(@Name("id") long id)
669 {
670 for (Thread thread : _threads)
671 {
672 if (thread.getId() == id)
673 {
674 StringBuilder buf = new StringBuilder();
675 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
676 buf.append(thread.getState()).append(":").append(System.lineSeparator());
677 for (StackTraceElement element : thread.getStackTrace())
678 buf.append(" at ").append(element.toString()).append(System.lineSeparator());
679 return buf.toString();
680 }
681 }
682 return null;
683 }
684 }