1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.util.thread;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.eclipse.jetty.util.BlockingArrayQueue;
34 import org.eclipse.jetty.util.StringUtil;
35 import org.eclipse.jetty.util.annotation.ManagedAttribute;
36 import org.eclipse.jetty.util.annotation.ManagedObject;
37 import org.eclipse.jetty.util.annotation.ManagedOperation;
38 import org.eclipse.jetty.util.annotation.Name;
39 import org.eclipse.jetty.util.component.AbstractLifeCycle;
40 import org.eclipse.jetty.util.component.ContainerLifeCycle;
41 import org.eclipse.jetty.util.component.Dumpable;
42 import org.eclipse.jetty.util.component.LifeCycle;
43 import org.eclipse.jetty.util.log.Log;
44 import org.eclipse.jetty.util.log.Logger;
45 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
46
47 @ManagedObject("A thread pool with no max bound by default")
48 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
49 {
50 private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
51
52 private final AtomicInteger _threadsStarted = new AtomicInteger();
53 private final AtomicInteger _threadsIdle = new AtomicInteger();
54 private final AtomicLong _lastShrink = new AtomicLong();
55 private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
56 private final Object _joinLock = new Object();
57 private final BlockingQueue<Runnable> _jobs;
58 private String _name = "qtp" + hashCode();
59 private int _idleTimeout;
60 private int _maxThreads;
61 private int _minThreads;
62 private int _priority = Thread.NORM_PRIORITY;
63 private boolean _daemon = false;
64 private boolean _detailedDump = false;
65
66 public QueuedThreadPool()
67 {
68 this(200);
69 }
70
71 public QueuedThreadPool(@Name("maxThreads") int maxThreads)
72 {
73 this(maxThreads, 8);
74 }
75
76 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads)
77 {
78 this(maxThreads, minThreads, 60000);
79 }
80
81 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout")int idleTimeout)
82 {
83 this(maxThreads, minThreads, idleTimeout, null);
84 }
85
86 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
87 {
88 setMinThreads(minThreads);
89 setMaxThreads(maxThreads);
90 setIdleTimeout(idleTimeout);
91 setStopTimeout(5000);
92
93 if (queue==null)
94 queue=new BlockingArrayQueue<>(_minThreads, _minThreads);
95 _jobs=queue;
96
97 }
98
99 @Override
100 protected void doStart() throws Exception
101 {
102 super.doStart();
103 _threadsStarted.set(0);
104
105 startThreads(_minThreads);
106 }
107
108 @Override
109 protected void doStop() throws Exception
110 {
111 super.doStop();
112
113 long timeout = getStopTimeout();
114 BlockingQueue<Runnable> jobs = getQueue();
115
116
117 if (timeout <= 0)
118 jobs.clear();
119
120
121 Runnable noop = new Runnable()
122 {
123 @Override
124 public void run()
125 {
126 }
127 };
128 for (int i = _threadsStarted.get(); i-- > 0; )
129 jobs.offer(noop);
130
131
132 long stopby = System.currentTimeMillis() + timeout / 2;
133 for (Thread thread : _threads)
134 {
135 long canwait = stopby - System.currentTimeMillis();
136 if (canwait > 0)
137 thread.join(canwait);
138 }
139
140
141
142
143 if (_threadsStarted.get() > 0)
144 for (Thread thread : _threads)
145 thread.interrupt();
146
147
148 stopby = System.currentTimeMillis() + timeout / 2;
149 for (Thread thread : _threads)
150 {
151 long canwait = stopby - System.currentTimeMillis();
152 if (canwait > 0)
153 thread.join(canwait);
154 }
155
156 Thread.yield();
157 int size = _threads.size();
158 if (size > 0)
159 {
160 Thread.yield();
161
162 if (LOG.isDebugEnabled())
163 {
164 for (Thread unstopped : _threads)
165 {
166 StringBuilder dmp = new StringBuilder();
167 for (StackTraceElement element : unstopped.getStackTrace())
168 {
169 dmp.append(StringUtil.__LINE_SEPARATOR).append("\tat ").append(element);
170 }
171 LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
172 }
173 }
174 else
175 {
176 for (Thread unstopped : _threads)
177 LOG.warn("{} Couldn't stop {}",this,unstopped);
178 }
179 }
180
181 synchronized (_joinLock)
182 {
183 _joinLock.notifyAll();
184 }
185 }
186
187
188
189
190 public void setDaemon(boolean daemon)
191 {
192 _daemon = daemon;
193 }
194
195
196
197
198
199
200
201
202
203
204 public void setIdleTimeout(int idleTimeout)
205 {
206 _idleTimeout = idleTimeout;
207 }
208
209
210
211
212
213
214
215
216 @Override
217 public void setMaxThreads(int maxThreads)
218 {
219 _maxThreads = maxThreads;
220 if (_minThreads > _maxThreads)
221 _minThreads = _maxThreads;
222 }
223
224
225
226
227
228
229
230
231 @Override
232 public void setMinThreads(int minThreads)
233 {
234 _minThreads = minThreads;
235
236 if (_minThreads > _maxThreads)
237 _maxThreads = _minThreads;
238
239 int threads = _threadsStarted.get();
240 if (isStarted() && threads < _minThreads)
241 startThreads(_minThreads - threads);
242 }
243
244
245
246
247 public void setName(String name)
248 {
249 if (isRunning())
250 throw new IllegalStateException("started");
251 _name = name;
252 }
253
254
255
256
257
258
259 public void setThreadsPriority(int priority)
260 {
261 _priority = priority;
262 }
263
264
265
266
267
268
269
270
271 @ManagedAttribute("maximum time a thread may be idle in ms")
272 public int getIdleTimeout()
273 {
274 return _idleTimeout;
275 }
276
277
278
279
280
281
282
283
284 @Override
285 @ManagedAttribute("maximum number of threads in the pool")
286 public int getMaxThreads()
287 {
288 return _maxThreads;
289 }
290
291
292
293
294
295
296
297
298 @Override
299 @ManagedAttribute("minimum number of threads in the pool")
300 public int getMinThreads()
301 {
302 return _minThreads;
303 }
304
305
306
307
308 @ManagedAttribute("name of the thread pool")
309 public String getName()
310 {
311 return _name;
312 }
313
314
315
316
317
318
319 @ManagedAttribute("priority of threads in the pool")
320 public int getThreadsPriority()
321 {
322 return _priority;
323 }
324
325
326
327
328 @ManagedAttribute("thead pool using a daemon thread")
329 public boolean isDaemon()
330 {
331 return _daemon;
332 }
333
334 public boolean isDetailedDump()
335 {
336 return _detailedDump;
337 }
338
339 public void setDetailedDump(boolean detailedDump)
340 {
341 _detailedDump = detailedDump;
342 }
343
344 @Override
345 public boolean dispatch(Runnable job)
346 {
347 LOG.debug("{} dispatched {}", this, job);
348 return isRunning() && _jobs.offer(job);
349 }
350
351 @Override
352 public void execute(Runnable job)
353 {
354 if (!dispatch(job))
355 {
356 LOG.warn("{} rejected {}", this, job);
357 throw new RejectedExecutionException(job.toString());
358 }
359 }
360
361
362
363
364 @Override
365 public void join() throws InterruptedException
366 {
367 synchronized (_joinLock)
368 {
369 while (isRunning())
370 _joinLock.wait();
371 }
372
373 while (isStopping())
374 Thread.sleep(1);
375 }
376
377
378
379
380 @Override
381 @ManagedAttribute("total number of threads currently in the pool")
382 public int getThreads()
383 {
384 return _threadsStarted.get();
385 }
386
387
388
389
390 @Override
391 @ManagedAttribute("total number of idle threads in the pool")
392 public int getIdleThreads()
393 {
394 return _threadsIdle.get();
395 }
396
397
398
399
400 @Override
401 @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
402 public boolean isLowOnThreads()
403 {
404 return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
405 }
406
407 private boolean startThreads(int threadsToStart)
408 {
409 while (threadsToStart > 0)
410 {
411 int threads = _threadsStarted.get();
412 if (threads >= _maxThreads)
413 return false;
414
415 if (!_threadsStarted.compareAndSet(threads, threads + 1))
416 continue;
417
418 boolean started = false;
419 try
420 {
421 Thread thread = newThread(_runnable);
422 thread.setDaemon(isDaemon());
423 thread.setPriority(getThreadsPriority());
424 thread.setName(_name + "-" + thread.getId());
425 _threads.add(thread);
426
427 thread.start();
428 started = true;
429 }
430 finally
431 {
432 if (!started)
433 _threadsStarted.decrementAndGet();
434 }
435 if (started)
436 threadsToStart--;
437 }
438 return true;
439 }
440
441 protected Thread newThread(Runnable runnable)
442 {
443 return new Thread(runnable);
444 }
445
446
447 @Override
448 @ManagedOperation("dump thread state")
449 public String dump()
450 {
451 return ContainerLifeCycle.dump(this);
452 }
453
454 @Override
455 public void dump(Appendable out, String indent) throws IOException
456 {
457 List<Object> dump = new ArrayList<>(getMaxThreads());
458 for (final Thread thread : _threads)
459 {
460 final StackTraceElement[] trace = thread.getStackTrace();
461 boolean inIdleJobPoll = false;
462 for (StackTraceElement t : trace)
463 {
464 if ("idleJobPoll".equals(t.getMethodName()))
465 {
466 inIdleJobPoll = true;
467 break;
468 }
469 }
470 final boolean idle = inIdleJobPoll;
471
472 if (isDetailedDump())
473 {
474 dump.add(new Dumpable()
475 {
476 @Override
477 public void dump(Appendable out, String indent) throws IOException
478 {
479 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "").append('\n');
480 if (!idle)
481 ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
482 }
483
484 @Override
485 public String dump()
486 {
487 return null;
488 }
489 });
490 }
491 else
492 {
493 dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : ""));
494 }
495 }
496
497 ContainerLifeCycle.dumpObject(out, this);
498 ContainerLifeCycle.dump(out, indent, dump);
499 }
500
501 @Override
502 public String toString()
503 {
504 return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
505 }
506
507 private Runnable idleJobPoll() throws InterruptedException
508 {
509 return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
510 }
511
512 private Runnable _runnable = new Runnable()
513 {
514 @Override
515 public void run()
516 {
517 boolean shrink = false;
518 try
519 {
520 Runnable job = _jobs.poll();
521
522 if (job != null && _threadsIdle.get() == 0)
523 {
524 startThreads(1);
525 }
526
527 while (isRunning())
528 {
529
530 while (job != null && isRunning())
531 {
532 runJob(job);
533 job = _jobs.poll();
534 }
535
536
537 try
538 {
539 _threadsIdle.incrementAndGet();
540
541 while (isRunning() && job == null)
542 {
543 if (_idleTimeout <= 0)
544 job = _jobs.take();
545 else
546 {
547
548 final int size = _threadsStarted.get();
549 if (size > _minThreads)
550 {
551 long last = _lastShrink.get();
552 long now = System.currentTimeMillis();
553 if (last == 0 || (now - last) > _idleTimeout)
554 {
555 shrink = _lastShrink.compareAndSet(last, now) &&
556 _threadsStarted.compareAndSet(size, size - 1);
557 if (shrink)
558 {
559 return;
560 }
561 }
562 }
563 job = idleJobPoll();
564 }
565 }
566 }
567 finally
568 {
569 if (_threadsIdle.decrementAndGet() == 0)
570 {
571 startThreads(1);
572 }
573 }
574 }
575 }
576 catch (InterruptedException e)
577 {
578 LOG.ignore(e);
579 }
580 catch (Throwable e)
581 {
582 LOG.warn(e);
583 }
584 finally
585 {
586 if (!shrink)
587 _threadsStarted.decrementAndGet();
588 _threads.remove(Thread.currentThread());
589 }
590 }
591 };
592
593
594
595
596
597
598
599 protected void runJob(Runnable job)
600 {
601 job.run();
602 }
603
604
605
606
607 protected BlockingQueue<Runnable> getQueue()
608 {
609 return _jobs;
610 }
611
612
613
614
615 public void setQueue(BlockingQueue<Runnable> queue)
616 {
617 throw new UnsupportedOperationException("Use constructor injection");
618 }
619
620
621
622
623
624 @ManagedOperation("interrupt a pool thread")
625 public boolean interruptThread(@Name("id") long id)
626 {
627 for (Thread thread : _threads)
628 {
629 if (thread.getId() == id)
630 {
631 thread.interrupt();
632 return true;
633 }
634 }
635 return false;
636 }
637
638
639
640
641
642 @ManagedOperation("dump a pool thread stack")
643 public String dumpThread(@Name("id") long id)
644 {
645 for (Thread thread : _threads)
646 {
647 if (thread.getId() == id)
648 {
649 StringBuilder buf = new StringBuilder();
650 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
651 for (StackTraceElement element : thread.getStackTrace())
652 buf.append(" at ").append(element.toString()).append('\n');
653 return buf.toString();
654 }
655 }
656 return null;
657 }
658 }