1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.util.thread;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.eclipse.jetty.util.BlockingArrayQueue;
34 import org.eclipse.jetty.util.StringUtil;
35 import org.eclipse.jetty.util.annotation.ManagedAttribute;
36 import org.eclipse.jetty.util.annotation.ManagedObject;
37 import org.eclipse.jetty.util.annotation.ManagedOperation;
38 import org.eclipse.jetty.util.annotation.Name;
39 import org.eclipse.jetty.util.component.AbstractLifeCycle;
40 import org.eclipse.jetty.util.component.ContainerLifeCycle;
41 import org.eclipse.jetty.util.component.Dumpable;
42 import org.eclipse.jetty.util.log.Log;
43 import org.eclipse.jetty.util.log.Logger;
44 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
45
46 @ManagedObject("A thread pool with no max bound by default")
47 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
48 {
49 private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
50
51 private final AtomicInteger _threadsStarted = new AtomicInteger();
52 private final AtomicInteger _threadsIdle = new AtomicInteger();
53 private final AtomicLong _lastShrink = new AtomicLong();
54 private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
55 private final Object _joinLock = new Object();
56 private final BlockingQueue<Runnable> _jobs;
57 private String _name = "qtp" + hashCode();
58 private int _idleTimeout;
59 private int _maxThreads;
60 private int _minThreads;
61 private int _priority = Thread.NORM_PRIORITY;
62 private boolean _daemon = false;
63 private boolean _detailedDump = false;
64
65 public QueuedThreadPool()
66 {
67 this(200);
68 }
69
70 public QueuedThreadPool(@Name("maxThreads") int maxThreads)
71 {
72 this(maxThreads, 8);
73 }
74
75 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads)
76 {
77 this(maxThreads, minThreads, 60000);
78 }
79
80 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout")int idleTimeout)
81 {
82 this(maxThreads, minThreads, idleTimeout, null);
83 }
84
85 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
86 {
87 setMinThreads(minThreads);
88 setMaxThreads(maxThreads);
89 setIdleTimeout(idleTimeout);
90 setStopTimeout(5000);
91
92 if (queue==null)
93 queue=new BlockingArrayQueue<>(_minThreads, _minThreads);
94 _jobs=queue;
95
96 }
97
98 @Override
99 protected void doStart() throws Exception
100 {
101 super.doStart();
102 _threadsStarted.set(0);
103
104 startThreads(_minThreads);
105 }
106
107 @Override
108 protected void doStop() throws Exception
109 {
110 super.doStop();
111
112 long timeout = getStopTimeout();
113 BlockingQueue<Runnable> jobs = getQueue();
114
115
116 if (timeout <= 0)
117 jobs.clear();
118
119
120 Runnable noop = new Runnable()
121 {
122 @Override
123 public void run()
124 {
125 }
126 };
127 for (int i = _threadsStarted.get(); i-- > 0; )
128 jobs.offer(noop);
129
130
131 long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
132 for (Thread thread : _threads)
133 {
134 long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
135 if (canwait > 0)
136 thread.join(canwait);
137 }
138
139
140
141
142 if (_threadsStarted.get() > 0)
143 for (Thread thread : _threads)
144 thread.interrupt();
145
146
147 stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
148 for (Thread thread : _threads)
149 {
150 long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
151 if (canwait > 0)
152 thread.join(canwait);
153 }
154
155 Thread.yield();
156 int size = _threads.size();
157 if (size > 0)
158 {
159 Thread.yield();
160
161 if (LOG.isDebugEnabled())
162 {
163 for (Thread unstopped : _threads)
164 {
165 StringBuilder dmp = new StringBuilder();
166 for (StackTraceElement element : unstopped.getStackTrace())
167 {
168 dmp.append(StringUtil.__LINE_SEPARATOR).append("\tat ").append(element);
169 }
170 LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
171 }
172 }
173 else
174 {
175 for (Thread unstopped : _threads)
176 LOG.warn("{} Couldn't stop {}",this,unstopped);
177 }
178 }
179
180 synchronized (_joinLock)
181 {
182 _joinLock.notifyAll();
183 }
184 }
185
186
187
188
189 public void setDaemon(boolean daemon)
190 {
191 _daemon = daemon;
192 }
193
194
195
196
197
198
199
200
201
202
203 public void setIdleTimeout(int idleTimeout)
204 {
205 _idleTimeout = idleTimeout;
206 }
207
208
209
210
211
212
213
214
215 @Override
216 public void setMaxThreads(int maxThreads)
217 {
218 _maxThreads = maxThreads;
219 if (_minThreads > _maxThreads)
220 _minThreads = _maxThreads;
221 }
222
223
224
225
226
227
228
229
230 @Override
231 public void setMinThreads(int minThreads)
232 {
233 _minThreads = minThreads;
234
235 if (_minThreads > _maxThreads)
236 _maxThreads = _minThreads;
237
238 int threads = _threadsStarted.get();
239 if (isStarted() && threads < _minThreads)
240 startThreads(_minThreads - threads);
241 }
242
243
244
245
246 public void setName(String name)
247 {
248 if (isRunning())
249 throw new IllegalStateException("started");
250 _name = name;
251 }
252
253
254
255
256
257
258 public void setThreadsPriority(int priority)
259 {
260 _priority = priority;
261 }
262
263
264
265
266
267
268
269
270 @ManagedAttribute("maximum time a thread may be idle in ms")
271 public int getIdleTimeout()
272 {
273 return _idleTimeout;
274 }
275
276
277
278
279
280
281
282
283 @Override
284 @ManagedAttribute("maximum number of threads in the pool")
285 public int getMaxThreads()
286 {
287 return _maxThreads;
288 }
289
290
291
292
293
294
295
296
297 @Override
298 @ManagedAttribute("minimum number of threads in the pool")
299 public int getMinThreads()
300 {
301 return _minThreads;
302 }
303
304
305
306
307 @ManagedAttribute("name of the thread pool")
308 public String getName()
309 {
310 return _name;
311 }
312
313
314
315
316
317
318 @ManagedAttribute("priority of threads in the pool")
319 public int getThreadsPriority()
320 {
321 return _priority;
322 }
323
324
325
326
327
328
329 @ManagedAttribute("Size of the job queue")
330 public int getQueueSize()
331 {
332 return _jobs.size();
333 }
334
335
336
337
338 @ManagedAttribute("thead pool using a daemon thread")
339 public boolean isDaemon()
340 {
341 return _daemon;
342 }
343
344 public boolean isDetailedDump()
345 {
346 return _detailedDump;
347 }
348
349 public void setDetailedDump(boolean detailedDump)
350 {
351 _detailedDump = detailedDump;
352 }
353
354 @Override
355 public void execute(Runnable job)
356 {
357 if (!isRunning() || !_jobs.offer(job))
358 {
359 LOG.warn("{} rejected {}", this, job);
360 throw new RejectedExecutionException(job.toString());
361 }
362 }
363
364
365
366
367 @Override
368 public void join() throws InterruptedException
369 {
370 synchronized (_joinLock)
371 {
372 while (isRunning())
373 _joinLock.wait();
374 }
375
376 while (isStopping())
377 Thread.sleep(1);
378 }
379
380
381
382
383 @Override
384 @ManagedAttribute("total number of threads currently in the pool")
385 public int getThreads()
386 {
387 return _threadsStarted.get();
388 }
389
390
391
392
393 @Override
394 @ManagedAttribute("total number of idle threads in the pool")
395 public int getIdleThreads()
396 {
397 return _threadsIdle.get();
398 }
399
400
401
402
403 @Override
404 @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
405 public boolean isLowOnThreads()
406 {
407 return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
408 }
409
410 private boolean startThreads(int threadsToStart)
411 {
412 while (threadsToStart > 0 && isRunning())
413 {
414 int threads = _threadsStarted.get();
415 if (threads >= _maxThreads)
416 return false;
417
418 if (!_threadsStarted.compareAndSet(threads, threads + 1))
419 continue;
420
421 boolean started = false;
422 try
423 {
424 Thread thread = newThread(_runnable);
425 thread.setDaemon(isDaemon());
426 thread.setPriority(getThreadsPriority());
427 thread.setName(_name + "-" + thread.getId());
428 _threads.add(thread);
429
430 thread.start();
431 started = true;
432 }
433 finally
434 {
435 if (!started)
436 _threadsStarted.decrementAndGet();
437 }
438 if (started)
439 threadsToStart--;
440 }
441 return true;
442 }
443
444 protected Thread newThread(Runnable runnable)
445 {
446 return new Thread(runnable);
447 }
448
449
450 @Override
451 @ManagedOperation("dump thread state")
452 public String dump()
453 {
454 return ContainerLifeCycle.dump(this);
455 }
456
457 @Override
458 public void dump(Appendable out, String indent) throws IOException
459 {
460 List<Object> dump = new ArrayList<>(getMaxThreads());
461 for (final Thread thread : _threads)
462 {
463 final StackTraceElement[] trace = thread.getStackTrace();
464 boolean inIdleJobPoll = false;
465 for (StackTraceElement t : trace)
466 {
467 if ("idleJobPoll".equals(t.getMethodName()))
468 {
469 inIdleJobPoll = true;
470 break;
471 }
472 }
473 final boolean idle = inIdleJobPoll;
474
475 if (isDetailedDump())
476 {
477 dump.add(new Dumpable()
478 {
479 @Override
480 public void dump(Appendable out, String indent) throws IOException
481 {
482 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "");
483 if (thread.getPriority()!=Thread.NORM_PRIORITY)
484 out.append(" prio="+thread.getPriority());
485 out.append('\n');
486 if (!idle)
487 ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
488 }
489
490 @Override
491 public String dump()
492 {
493 return null;
494 }
495 });
496 }
497 else
498 {
499 int p=thread.getPriority();
500 dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p)));
501 }
502 }
503
504 ContainerLifeCycle.dumpObject(out, this);
505 ContainerLifeCycle.dump(out, indent, dump);
506 }
507
508 @Override
509 public String toString()
510 {
511 return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
512 }
513
514 private Runnable idleJobPoll() throws InterruptedException
515 {
516 return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
517 }
518
519 private Runnable _runnable = new Runnable()
520 {
521 @Override
522 public void run()
523 {
524 boolean shrink = false;
525 boolean ignore = false;
526 try
527 {
528 Runnable job = _jobs.poll();
529
530 if (job != null && _threadsIdle.get() == 0)
531 {
532 startThreads(1);
533 }
534
535 loop: while (isRunning())
536 {
537
538 while (job != null && isRunning())
539 {
540 runJob(job);
541 if (Thread.interrupted())
542 {
543 ignore=true;
544 break loop;
545 }
546 job = _jobs.poll();
547 }
548
549
550 try
551 {
552 _threadsIdle.incrementAndGet();
553
554 while (isRunning() && job == null)
555 {
556 if (_idleTimeout <= 0)
557 job = _jobs.take();
558 else
559 {
560
561 final int size = _threadsStarted.get();
562 if (size > _minThreads)
563 {
564 long last = _lastShrink.get();
565 long now = System.nanoTime();
566 if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
567 {
568 if (_lastShrink.compareAndSet(last, now) && _threadsStarted.compareAndSet(size, size - 1))
569 {
570 shrink=true;
571 break loop;
572 }
573 }
574 }
575 job = idleJobPoll();
576 }
577 }
578 }
579 finally
580 {
581 if (_threadsIdle.decrementAndGet() == 0)
582 {
583 startThreads(1);
584 }
585 }
586 }
587 }
588 catch (InterruptedException e)
589 {
590 ignore=true;
591 LOG.ignore(e);
592 }
593 catch (Throwable e)
594 {
595 LOG.warn(e);
596 }
597 finally
598 {
599 if (!shrink && isRunning())
600 {
601 if (!ignore)
602 LOG.warn("Unexpected thread death: {} in {}",this,QueuedThreadPool.this);
603
604 if (_threadsStarted.decrementAndGet()<getMaxThreads())
605 startThreads(1);
606 }
607 _threads.remove(Thread.currentThread());
608 }
609 }
610 };
611
612
613
614
615
616
617
618 protected void runJob(Runnable job)
619 {
620 job.run();
621 }
622
623
624
625
626 protected BlockingQueue<Runnable> getQueue()
627 {
628 return _jobs;
629 }
630
631
632
633
634 public void setQueue(BlockingQueue<Runnable> queue)
635 {
636 throw new UnsupportedOperationException("Use constructor injection");
637 }
638
639
640
641
642
643 @ManagedOperation("interrupt a pool thread")
644 public boolean interruptThread(@Name("id") long id)
645 {
646 for (Thread thread : _threads)
647 {
648 if (thread.getId() == id)
649 {
650 thread.interrupt();
651 return true;
652 }
653 }
654 return false;
655 }
656
657
658
659
660
661 @ManagedOperation("dump a pool thread stack")
662 public String dumpThread(@Name("id") long id)
663 {
664 for (Thread thread : _threads)
665 {
666 if (thread.getId() == id)
667 {
668 StringBuilder buf = new StringBuilder();
669 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
670 for (StackTraceElement element : thread.getStackTrace())
671 buf.append(" at ").append(element.toString()).append('\n');
672 return buf.toString();
673 }
674 }
675 return null;
676 }
677
678
679 }