1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.util.thread;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.eclipse.jetty.util.BlockingArrayQueue;
34 import org.eclipse.jetty.util.StringUtil;
35 import org.eclipse.jetty.util.annotation.ManagedAttribute;
36 import org.eclipse.jetty.util.annotation.ManagedObject;
37 import org.eclipse.jetty.util.annotation.ManagedOperation;
38 import org.eclipse.jetty.util.annotation.Name;
39 import org.eclipse.jetty.util.component.AbstractLifeCycle;
40 import org.eclipse.jetty.util.component.ContainerLifeCycle;
41 import org.eclipse.jetty.util.component.Dumpable;
42 import org.eclipse.jetty.util.component.LifeCycle;
43 import org.eclipse.jetty.util.log.Log;
44 import org.eclipse.jetty.util.log.Logger;
45 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
46
47 @ManagedObject("A thread pool with no max bound by default")
48 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
49 {
50 private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
51
52 private final AtomicInteger _threadsStarted = new AtomicInteger();
53 private final AtomicInteger _threadsIdle = new AtomicInteger();
54 private final AtomicLong _lastShrink = new AtomicLong();
55 private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
56 private final Object _joinLock = new Object();
57 private BlockingQueue<Runnable> _jobs;
58 private String _name = "qtp" + hashCode();
59 private int _idleTimeout;
60 private int _maxThreads;
61 private int _minThreads;
62 private int _priority = Thread.NORM_PRIORITY;
63 private boolean _daemon = false;
64 private boolean _detailedDump = false;
65
66 public QueuedThreadPool()
67 {
68 this(200);
69 }
70
71 public QueuedThreadPool(int maxThreads)
72 {
73 this(maxThreads, 8);
74 }
75
76 public QueuedThreadPool(int maxThreads, int minThreads)
77 {
78 this(maxThreads, minThreads, 60000);
79 }
80
81 public QueuedThreadPool(int maxThreads, int minThreads, int idleTimeout)
82 {
83 setMinThreads(minThreads);
84 setMaxThreads(maxThreads);
85 setIdleTimeout(idleTimeout);
86 setStopTimeout(5000);
87 }
88
89 @Override
90 protected void doStart() throws Exception
91 {
92 super.doStart();
93 _threadsStarted.set(0);
94
95 if (_jobs == null)
96 setQueue(new BlockingArrayQueue<Runnable>(_minThreads, _minThreads));
97
98 startThreads(_minThreads);
99 }
100
101 @Override
102 protected void doStop() throws Exception
103 {
104 super.doStop();
105
106 long timeout = getStopTimeout();
107 BlockingQueue<Runnable> jobs = getQueue();
108
109
110 if (timeout <= 0)
111 jobs.clear();
112
113
114 Runnable noop = new Runnable()
115 {
116 @Override
117 public void run()
118 {
119 }
120 };
121 for (int i = _threadsStarted.get(); i-- > 0; )
122 jobs.offer(noop);
123
124
125 long stopby = System.currentTimeMillis() + timeout / 2;
126 for (Thread thread : _threads)
127 {
128 long canwait = stopby - System.currentTimeMillis();
129 if (canwait > 0)
130 thread.join(canwait);
131 }
132
133
134
135
136 if (_threadsStarted.get() > 0)
137 for (Thread thread : _threads)
138 thread.interrupt();
139
140
141 stopby = System.currentTimeMillis() + timeout / 2;
142 for (Thread thread : _threads)
143 {
144 long canwait = stopby - System.currentTimeMillis();
145 if (canwait > 0)
146 thread.join(canwait);
147 }
148
149 Thread.yield();
150 int size = _threads.size();
151 if (size > 0)
152 {
153 LOG.warn("{} threads could not be stopped", size);
154
155 if ((size <= Runtime.getRuntime().availableProcessors()) || LOG.isDebugEnabled())
156 {
157 for (Thread unstopped : _threads)
158 {
159 StringBuilder dmp = new StringBuilder();
160 for (StackTraceElement element : unstopped.getStackTrace())
161 {
162 dmp.append(StringUtil.__LINE_SEPARATOR).append("\tat ").append(element);
163 }
164 LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
165 }
166 }
167 }
168
169 synchronized (_joinLock)
170 {
171 _joinLock.notifyAll();
172 }
173 }
174
175
176
177
178 public void setDaemon(boolean daemon)
179 {
180 _daemon = daemon;
181 }
182
183
184
185
186
187
188
189
190
191
192 public void setIdleTimeout(int idleTimeout)
193 {
194 _idleTimeout = idleTimeout;
195 }
196
197
198
199
200
201
202
203
204 @Override
205 public void setMaxThreads(int maxThreads)
206 {
207 _maxThreads = maxThreads;
208 if (_minThreads > _maxThreads)
209 _minThreads = _maxThreads;
210 }
211
212
213
214
215
216
217
218
219 @Override
220 public void setMinThreads(int minThreads)
221 {
222 _minThreads = minThreads;
223
224 if (_minThreads > _maxThreads)
225 _maxThreads = _minThreads;
226
227 int threads = _threadsStarted.get();
228 if (isStarted() && threads < _minThreads)
229 startThreads(_minThreads - threads);
230 }
231
232
233
234
235 public void setName(String name)
236 {
237 if (isRunning())
238 throw new IllegalStateException("started");
239 _name = name;
240 }
241
242
243
244
245
246
247 public void setThreadsPriority(int priority)
248 {
249 _priority = priority;
250 }
251
252
253
254
255
256
257
258
259 @ManagedAttribute("maximum time a thread may be idle in ms")
260 public int getIdleTimeout()
261 {
262 return _idleTimeout;
263 }
264
265
266
267
268
269
270
271
272 @Override
273 @ManagedAttribute("maximum number of threads in the pool")
274 public int getMaxThreads()
275 {
276 return _maxThreads;
277 }
278
279
280
281
282
283
284
285
286 @Override
287 @ManagedAttribute("minimum number of threads in the pool")
288 public int getMinThreads()
289 {
290 return _minThreads;
291 }
292
293
294
295
296 @ManagedAttribute("name of the thread pool")
297 public String getName()
298 {
299 return _name;
300 }
301
302
303
304
305
306
307 @ManagedAttribute("priority of threads in the pool")
308 public int getThreadsPriority()
309 {
310 return _priority;
311 }
312
313
314
315
316 @ManagedAttribute("thead pool using a daemon thread")
317 public boolean isDaemon()
318 {
319 return _daemon;
320 }
321
322 public boolean isDetailedDump()
323 {
324 return _detailedDump;
325 }
326
327 public void setDetailedDump(boolean detailedDump)
328 {
329 _detailedDump = detailedDump;
330 }
331
332 @Override
333 public boolean dispatch(Runnable job)
334 {
335 LOG.debug("{} dispatched {}", this, job);
336 return isRunning() && _jobs.offer(job);
337 }
338
339 @Override
340 public void execute(Runnable job)
341 {
342 if (!dispatch(job))
343 {
344 LOG.warn("{} rejected {}", this, job);
345 throw new RejectedExecutionException(job.toString());
346 }
347 }
348
349
350
351
352 @Override
353 public void join() throws InterruptedException
354 {
355 synchronized (_joinLock)
356 {
357 while (isRunning())
358 _joinLock.wait();
359 }
360
361 while (isStopping())
362 Thread.sleep(1);
363 }
364
365
366
367
368 @Override
369 @ManagedAttribute("total number of threads currently in the pool")
370 public int getThreads()
371 {
372 return _threadsStarted.get();
373 }
374
375
376
377
378 @Override
379 @ManagedAttribute("total number of idle threads in the pool")
380 public int getIdleThreads()
381 {
382 return _threadsIdle.get();
383 }
384
385
386
387
388 @Override
389 @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
390 public boolean isLowOnThreads()
391 {
392 return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
393 }
394
395 private boolean startThreads(int threadsToStart)
396 {
397 while (threadsToStart > 0)
398 {
399 int threads = _threadsStarted.get();
400 if (threads >= _maxThreads)
401 return false;
402
403 if (!_threadsStarted.compareAndSet(threads, threads + 1))
404 continue;
405
406 boolean started = false;
407 try
408 {
409 Thread thread = newThread(_runnable);
410 thread.setDaemon(isDaemon());
411 thread.setPriority(getThreadsPriority());
412 thread.setName(_name + "-" + thread.getId());
413 _threads.add(thread);
414
415 thread.start();
416 started = true;
417 }
418 finally
419 {
420 if (!started)
421 _threadsStarted.decrementAndGet();
422 }
423 if (started)
424 threadsToStart--;
425 }
426 return true;
427 }
428
429 protected Thread newThread(Runnable runnable)
430 {
431 return new Thread(runnable);
432 }
433
434
435 @Override
436 @ManagedOperation("dump thread state")
437 public String dump()
438 {
439 return ContainerLifeCycle.dump(this);
440 }
441
442 @Override
443 public void dump(Appendable out, String indent) throws IOException
444 {
445 List<Object> dump = new ArrayList<>(getMaxThreads());
446 for (final Thread thread : _threads)
447 {
448 final StackTraceElement[] trace = thread.getStackTrace();
449 boolean inIdleJobPoll = false;
450 for (StackTraceElement t : trace)
451 {
452 if ("idleJobPoll".equals(t.getMethodName()))
453 {
454 inIdleJobPoll = true;
455 break;
456 }
457 }
458 final boolean idle = inIdleJobPoll;
459
460 if (isDetailedDump())
461 {
462 dump.add(new Dumpable()
463 {
464 @Override
465 public void dump(Appendable out, String indent) throws IOException
466 {
467 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "").append('\n');
468 if (!idle)
469 ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
470 }
471
472 @Override
473 public String dump()
474 {
475 return null;
476 }
477 });
478 }
479 else
480 {
481 dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : ""));
482 }
483 }
484
485 ContainerLifeCycle.dumpObject(out, this);
486 ContainerLifeCycle.dump(out, indent, dump);
487 }
488
489 @Override
490 public String toString()
491 {
492 return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
493 }
494
495 private Runnable idleJobPoll() throws InterruptedException
496 {
497 return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
498 }
499
500 private Runnable _runnable = new Runnable()
501 {
502 @Override
503 public void run()
504 {
505 boolean shrink = false;
506 try
507 {
508 Runnable job = _jobs.poll();
509
510 if (job != null && _threadsIdle.get() == 0)
511 {
512 startThreads(1);
513 }
514
515 while (isRunning())
516 {
517
518 while (job != null && isRunning())
519 {
520 runJob(job);
521 job = _jobs.poll();
522 }
523
524
525 try
526 {
527 _threadsIdle.incrementAndGet();
528
529 while (isRunning() && job == null)
530 {
531 if (_idleTimeout <= 0)
532 job = _jobs.take();
533 else
534 {
535
536 final int size = _threadsStarted.get();
537 if (size > _minThreads)
538 {
539 long last = _lastShrink.get();
540 long now = System.currentTimeMillis();
541 if (last == 0 || (now - last) > _idleTimeout)
542 {
543 shrink = _lastShrink.compareAndSet(last, now) &&
544 _threadsStarted.compareAndSet(size, size - 1);
545 if (shrink)
546 {
547 return;
548 }
549 }
550 }
551 job = idleJobPoll();
552 }
553 }
554 }
555 finally
556 {
557 if (_threadsIdle.decrementAndGet() == 0)
558 {
559 startThreads(1);
560 }
561 }
562 }
563 }
564 catch (InterruptedException e)
565 {
566 LOG.ignore(e);
567 }
568 catch (Throwable e)
569 {
570 LOG.warn(e);
571 }
572 finally
573 {
574 if (!shrink)
575 _threadsStarted.decrementAndGet();
576 _threads.remove(Thread.currentThread());
577 }
578 }
579 };
580
581
582
583
584
585
586
587 protected void runJob(Runnable job)
588 {
589 job.run();
590 }
591
592
593
594
595 protected BlockingQueue<Runnable> getQueue()
596 {
597 return _jobs;
598 }
599
600
601
602
603 public void setQueue(BlockingQueue<Runnable> queue)
604 {
605 _jobs = queue;
606 }
607
608
609
610
611
612 @ManagedOperation("interrupt a pool thread")
613 public boolean interruptThread(@Name("id") long id)
614 {
615 for (Thread thread : _threads)
616 {
617 if (thread.getId() == id)
618 {
619 thread.interrupt();
620 return true;
621 }
622 }
623 return false;
624 }
625
626
627
628
629
630 @ManagedOperation("dump a pool thread stack")
631 public String dumpThread(@Name("id") long id)
632 {
633 for (Thread thread : _threads)
634 {
635 if (thread.getId() == id)
636 {
637 StringBuilder buf = new StringBuilder();
638 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
639 for (StackTraceElement element : thread.getStackTrace())
640 buf.append(" at ").append(element.toString()).append('\n');
641 return buf.toString();
642 }
643 }
644 return null;
645 }
646 }