1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.util.thread;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.eclipse.jetty.util.BlockingArrayQueue;
34 import org.eclipse.jetty.util.StringUtil;
35 import org.eclipse.jetty.util.annotation.ManagedAttribute;
36 import org.eclipse.jetty.util.annotation.ManagedObject;
37 import org.eclipse.jetty.util.annotation.ManagedOperation;
38 import org.eclipse.jetty.util.annotation.Name;
39 import org.eclipse.jetty.util.component.AbstractLifeCycle;
40 import org.eclipse.jetty.util.component.ContainerLifeCycle;
41 import org.eclipse.jetty.util.component.Dumpable;
42 import org.eclipse.jetty.util.component.LifeCycle;
43 import org.eclipse.jetty.util.log.Log;
44 import org.eclipse.jetty.util.log.Logger;
45 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
46
47 @ManagedObject("A thread pool with no max bound by default")
48 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
49 {
50 private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
51
52 private final AtomicInteger _threadsStarted = new AtomicInteger();
53 private final AtomicInteger _threadsIdle = new AtomicInteger();
54 private final AtomicLong _lastShrink = new AtomicLong();
55 private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
56 private final Object _joinLock = new Object();
57 private final BlockingQueue<Runnable> _jobs;
58 private String _name = "qtp" + hashCode();
59 private int _idleTimeout;
60 private int _maxThreads;
61 private int _minThreads;
62 private int _priority = Thread.NORM_PRIORITY;
63 private boolean _daemon = false;
64 private boolean _detailedDump = false;
65
66 public QueuedThreadPool()
67 {
68 this(200);
69 }
70
71 public QueuedThreadPool(@Name("maxThreads") int maxThreads)
72 {
73 this(maxThreads, 8);
74 }
75
76 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads)
77 {
78 this(maxThreads, minThreads, 60000);
79 }
80
81 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout")int idleTimeout)
82 {
83 this(maxThreads, minThreads, idleTimeout, null);
84 }
85
86 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
87 {
88 setMinThreads(minThreads);
89 setMaxThreads(maxThreads);
90 setIdleTimeout(idleTimeout);
91 setStopTimeout(5000);
92
93 if (queue==null)
94 queue=new BlockingArrayQueue<>(_minThreads, _minThreads);
95 _jobs=queue;
96
97 }
98
99 @Override
100 protected void doStart() throws Exception
101 {
102 super.doStart();
103 _threadsStarted.set(0);
104
105 startThreads(_minThreads);
106 }
107
108 @Override
109 protected void doStop() throws Exception
110 {
111 super.doStop();
112
113 long timeout = getStopTimeout();
114 BlockingQueue<Runnable> jobs = getQueue();
115
116
117 if (timeout <= 0)
118 jobs.clear();
119
120
121 Runnable noop = new Runnable()
122 {
123 @Override
124 public void run()
125 {
126 }
127 };
128 for (int i = _threadsStarted.get(); i-- > 0; )
129 jobs.offer(noop);
130
131
132 long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
133 for (Thread thread : _threads)
134 {
135 long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
136 if (canwait > 0)
137 thread.join(canwait);
138 }
139
140
141
142
143 if (_threadsStarted.get() > 0)
144 for (Thread thread : _threads)
145 thread.interrupt();
146
147
148 stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
149 for (Thread thread : _threads)
150 {
151 long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
152 if (canwait > 0)
153 thread.join(canwait);
154 }
155
156 Thread.yield();
157 int size = _threads.size();
158 if (size > 0)
159 {
160 Thread.yield();
161
162 if (LOG.isDebugEnabled())
163 {
164 for (Thread unstopped : _threads)
165 {
166 StringBuilder dmp = new StringBuilder();
167 for (StackTraceElement element : unstopped.getStackTrace())
168 {
169 dmp.append(StringUtil.__LINE_SEPARATOR).append("\tat ").append(element);
170 }
171 LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
172 }
173 }
174 else
175 {
176 for (Thread unstopped : _threads)
177 LOG.warn("{} Couldn't stop {}",this,unstopped);
178 }
179 }
180
181 synchronized (_joinLock)
182 {
183 _joinLock.notifyAll();
184 }
185 }
186
187
188
189
190 public void setDaemon(boolean daemon)
191 {
192 _daemon = daemon;
193 }
194
195
196
197
198
199
200
201
202
203
204 public void setIdleTimeout(int idleTimeout)
205 {
206 _idleTimeout = idleTimeout;
207 }
208
209
210
211
212
213
214
215
216 @Override
217 public void setMaxThreads(int maxThreads)
218 {
219 _maxThreads = maxThreads;
220 if (_minThreads > _maxThreads)
221 _minThreads = _maxThreads;
222 }
223
224
225
226
227
228
229
230
231 @Override
232 public void setMinThreads(int minThreads)
233 {
234 _minThreads = minThreads;
235
236 if (_minThreads > _maxThreads)
237 _maxThreads = _minThreads;
238
239 int threads = _threadsStarted.get();
240 if (isStarted() && threads < _minThreads)
241 startThreads(_minThreads - threads);
242 }
243
244
245
246
247 public void setName(String name)
248 {
249 if (isRunning())
250 throw new IllegalStateException("started");
251 _name = name;
252 }
253
254
255
256
257
258
259 public void setThreadsPriority(int priority)
260 {
261 _priority = priority;
262 }
263
264
265
266
267
268
269
270
271 @ManagedAttribute("maximum time a thread may be idle in ms")
272 public int getIdleTimeout()
273 {
274 return _idleTimeout;
275 }
276
277
278
279
280
281
282
283
284 @Override
285 @ManagedAttribute("maximum number of threads in the pool")
286 public int getMaxThreads()
287 {
288 return _maxThreads;
289 }
290
291
292
293
294
295
296
297
298 @Override
299 @ManagedAttribute("minimum number of threads in the pool")
300 public int getMinThreads()
301 {
302 return _minThreads;
303 }
304
305
306
307
308 @ManagedAttribute("name of the thread pool")
309 public String getName()
310 {
311 return _name;
312 }
313
314
315
316
317
318
319 @ManagedAttribute("priority of threads in the pool")
320 public int getThreadsPriority()
321 {
322 return _priority;
323 }
324
325
326
327
328 @ManagedAttribute("thead pool using a daemon thread")
329 public boolean isDaemon()
330 {
331 return _daemon;
332 }
333
334 public boolean isDetailedDump()
335 {
336 return _detailedDump;
337 }
338
339 public void setDetailedDump(boolean detailedDump)
340 {
341 _detailedDump = detailedDump;
342 }
343
344 @Override
345 public void execute(Runnable job)
346 {
347 if (!isRunning() || !_jobs.offer(job))
348 {
349 LOG.warn("{} rejected {}", this, job);
350 throw new RejectedExecutionException(job.toString());
351 }
352 }
353
354
355
356
357 @Override
358 public void join() throws InterruptedException
359 {
360 synchronized (_joinLock)
361 {
362 while (isRunning())
363 _joinLock.wait();
364 }
365
366 while (isStopping())
367 Thread.sleep(1);
368 }
369
370
371
372
373 @Override
374 @ManagedAttribute("total number of threads currently in the pool")
375 public int getThreads()
376 {
377 return _threadsStarted.get();
378 }
379
380
381
382
383 @Override
384 @ManagedAttribute("total number of idle threads in the pool")
385 public int getIdleThreads()
386 {
387 return _threadsIdle.get();
388 }
389
390
391
392
393 @Override
394 @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
395 public boolean isLowOnThreads()
396 {
397 return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
398 }
399
400 private boolean startThreads(int threadsToStart)
401 {
402 while (threadsToStart > 0)
403 {
404 int threads = _threadsStarted.get();
405 if (threads >= _maxThreads)
406 return false;
407
408 if (!_threadsStarted.compareAndSet(threads, threads + 1))
409 continue;
410
411 boolean started = false;
412 try
413 {
414 Thread thread = newThread(_runnable);
415 thread.setDaemon(isDaemon());
416 thread.setPriority(getThreadsPriority());
417 thread.setName(_name + "-" + thread.getId());
418 _threads.add(thread);
419
420 thread.start();
421 started = true;
422 }
423 finally
424 {
425 if (!started)
426 _threadsStarted.decrementAndGet();
427 }
428 if (started)
429 threadsToStart--;
430 }
431 return true;
432 }
433
434 protected Thread newThread(Runnable runnable)
435 {
436 return new Thread(runnable);
437 }
438
439
440 @Override
441 @ManagedOperation("dump thread state")
442 public String dump()
443 {
444 return ContainerLifeCycle.dump(this);
445 }
446
447 @Override
448 public void dump(Appendable out, String indent) throws IOException
449 {
450 List<Object> dump = new ArrayList<>(getMaxThreads());
451 for (final Thread thread : _threads)
452 {
453 final StackTraceElement[] trace = thread.getStackTrace();
454 boolean inIdleJobPoll = false;
455 for (StackTraceElement t : trace)
456 {
457 if ("idleJobPoll".equals(t.getMethodName()))
458 {
459 inIdleJobPoll = true;
460 break;
461 }
462 }
463 final boolean idle = inIdleJobPoll;
464
465 if (isDetailedDump())
466 {
467 dump.add(new Dumpable()
468 {
469 @Override
470 public void dump(Appendable out, String indent) throws IOException
471 {
472 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "").append('\n');
473 if (!idle)
474 ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
475 }
476
477 @Override
478 public String dump()
479 {
480 return null;
481 }
482 });
483 }
484 else
485 {
486 dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : ""));
487 }
488 }
489
490 ContainerLifeCycle.dumpObject(out, this);
491 ContainerLifeCycle.dump(out, indent, dump);
492 }
493
494 @Override
495 public String toString()
496 {
497 return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
498 }
499
500 private Runnable idleJobPoll() throws InterruptedException
501 {
502 return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
503 }
504
505 private Runnable _runnable = new Runnable()
506 {
507 @Override
508 public void run()
509 {
510 boolean shrink = false;
511 try
512 {
513 Runnable job = _jobs.poll();
514
515 if (job != null && _threadsIdle.get() == 0)
516 {
517 startThreads(1);
518 }
519
520 while (isRunning())
521 {
522
523 while (job != null && isRunning())
524 {
525 runJob(job);
526 job = _jobs.poll();
527 }
528
529
530 try
531 {
532 _threadsIdle.incrementAndGet();
533
534 while (isRunning() && job == null)
535 {
536 if (_idleTimeout <= 0)
537 job = _jobs.take();
538 else
539 {
540
541 final int size = _threadsStarted.get();
542 if (size > _minThreads)
543 {
544 long last = _lastShrink.get();
545 long now = System.nanoTime();
546 if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
547 {
548 shrink = _lastShrink.compareAndSet(last, now) &&
549 _threadsStarted.compareAndSet(size, size - 1);
550 if (shrink)
551 {
552 return;
553 }
554 }
555 }
556 job = idleJobPoll();
557 }
558 }
559 }
560 finally
561 {
562 if (_threadsIdle.decrementAndGet() == 0)
563 {
564 startThreads(1);
565 }
566 }
567 }
568 }
569 catch (InterruptedException e)
570 {
571 LOG.ignore(e);
572 }
573 catch (Throwable e)
574 {
575 LOG.warn(e);
576 }
577 finally
578 {
579 if (!shrink)
580 _threadsStarted.decrementAndGet();
581 _threads.remove(Thread.currentThread());
582 }
583 }
584 };
585
586
587
588
589
590
591
592 protected void runJob(Runnable job)
593 {
594 job.run();
595 }
596
597
598
599
600 protected BlockingQueue<Runnable> getQueue()
601 {
602 return _jobs;
603 }
604
605
606
607
608 public void setQueue(BlockingQueue<Runnable> queue)
609 {
610 throw new UnsupportedOperationException("Use constructor injection");
611 }
612
613
614
615
616
617 @ManagedOperation("interrupt a pool thread")
618 public boolean interruptThread(@Name("id") long id)
619 {
620 for (Thread thread : _threads)
621 {
622 if (thread.getId() == id)
623 {
624 thread.interrupt();
625 return true;
626 }
627 }
628 return false;
629 }
630
631
632
633
634
635 @ManagedOperation("dump a pool thread stack")
636 public String dumpThread(@Name("id") long id)
637 {
638 for (Thread thread : _threads)
639 {
640 if (thread.getId() == id)
641 {
642 StringBuilder buf = new StringBuilder();
643 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
644 for (StackTraceElement element : thread.getStackTrace())
645 buf.append(" at ").append(element.toString()).append('\n');
646 return buf.toString();
647 }
648 }
649 return null;
650 }
651 }