1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.eclipse.jetty.util.thread;
16
17 import java.io.IOException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.atomic.AtomicLong;
29
30 import org.eclipse.jetty.util.BlockingArrayQueue;
31 import org.eclipse.jetty.util.component.AbstractLifeCycle;
32 import org.eclipse.jetty.util.component.AggregateLifeCycle;
33 import org.eclipse.jetty.util.component.Dumpable;
34 import org.eclipse.jetty.util.component.LifeCycle;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
38
39 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable
40 {
41 private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
42
43 private final AtomicInteger _threadsStarted = new AtomicInteger();
44 private final AtomicInteger _threadsIdle = new AtomicInteger();
45 private final AtomicLong _lastShrink = new AtomicLong();
46 private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
47 private final Object _joinLock = new Object();
48 private BlockingQueue<Runnable> _jobs;
49 private String _name;
50 private int _maxIdleTimeMs=60000;
51 private int _maxThreads=254;
52 private int _minThreads=8;
53 private int _maxQueued=-1;
54 private int _priority=Thread.NORM_PRIORITY;
55 private boolean _daemon=false;
56 private int _maxStopTime=100;
57 private boolean _detailedDump=false;
58
59
60
61
62 public QueuedThreadPool()
63 {
64 _name="qtp"+super.hashCode();
65 }
66
67
68
69
70 public QueuedThreadPool(int maxThreads)
71 {
72 this();
73 setMaxThreads(maxThreads);
74 }
75
76
77
78
79 public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
80 {
81 this();
82 _jobs=jobQ;
83 _jobs.clear();
84 }
85
86
87
88 @Override
89 protected void doStart() throws Exception
90 {
91 super.doStart();
92 _threadsStarted.set(0);
93
94 if (_jobs==null)
95 {
96 _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
97 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
98 }
99
100 int threads=_threadsStarted.get();
101 while (isRunning() && threads<_minThreads)
102 {
103 startThread(threads);
104 threads=_threadsStarted.get();
105 }
106 }
107
108
109 @Override
110 protected void doStop() throws Exception
111 {
112 super.doStop();
113 long start=System.currentTimeMillis();
114
115
116 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
117 Thread.sleep(1);
118
119
120 _jobs.clear();
121 Runnable noop = new Runnable(){public void run(){}};
122 for (int i=_threadsIdle.get();i-->0;)
123 _jobs.offer(noop);
124 Thread.yield();
125
126
127 if (_threadsStarted.get()>0)
128 for (Thread thread : _threads)
129 thread.interrupt();
130
131
132 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
133 {
134 Thread.sleep(1);
135 }
136 Thread.yield();
137 int size=_threads.size();
138 if (size>0)
139 {
140 LOG.warn(size+" threads could not be stopped");
141
142 if (size==1 || LOG.isDebugEnabled())
143 {
144 for (Thread unstopped : _threads)
145 {
146 LOG.info("Couldn't stop "+unstopped);
147 for (StackTraceElement element : unstopped.getStackTrace())
148 {
149 LOG.info(" at "+element);
150 }
151 }
152 }
153 }
154
155 synchronized (_joinLock)
156 {
157 _joinLock.notifyAll();
158 }
159 }
160
161
162
163
164
165 public void setDaemon(boolean daemon)
166 {
167 _daemon=daemon;
168 }
169
170
171
172
173
174
175
176
177
178 public void setMaxIdleTimeMs(int maxIdleTimeMs)
179 {
180 _maxIdleTimeMs=maxIdleTimeMs;
181 }
182
183
184
185
186
187 public void setMaxStopTimeMs(int stopTimeMs)
188 {
189 _maxStopTime = stopTimeMs;
190 }
191
192
193
194
195
196
197
198 public void setMaxThreads(int maxThreads)
199 {
200 _maxThreads=maxThreads;
201 if (_minThreads>_maxThreads)
202 _minThreads=_maxThreads;
203 }
204
205
206
207
208
209
210
211 public void setMinThreads(int minThreads)
212 {
213 _minThreads=minThreads;
214
215 if (_minThreads>_maxThreads)
216 _maxThreads=_minThreads;
217
218 int threads=_threadsStarted.get();
219 while (isStarted() && threads<_minThreads)
220 {
221 startThread(threads);
222 threads=_threadsStarted.get();
223 }
224 }
225
226
227
228
229
230 public void setName(String name)
231 {
232 if (isRunning())
233 throw new IllegalStateException("started");
234 _name= name;
235 }
236
237
238
239
240
241 public void setThreadsPriority(int priority)
242 {
243 _priority=priority;
244 }
245
246
247
248
249
250 public int getMaxQueued()
251 {
252 return _maxQueued;
253 }
254
255
256
257
258
259 public void setMaxQueued(int max)
260 {
261 if (isRunning())
262 throw new IllegalStateException("started");
263 _maxQueued=max;
264 }
265
266
267
268
269
270
271
272 public int getMaxIdleTimeMs()
273 {
274 return _maxIdleTimeMs;
275 }
276
277
278
279
280
281 public int getMaxStopTimeMs()
282 {
283 return _maxStopTime;
284 }
285
286
287
288
289
290
291
292 public int getMaxThreads()
293 {
294 return _maxThreads;
295 }
296
297
298
299
300
301
302
303 public int getMinThreads()
304 {
305 return _minThreads;
306 }
307
308
309
310
311
312 public String getName()
313 {
314 return _name;
315 }
316
317
318
319
320
321 public int getThreadsPriority()
322 {
323 return _priority;
324 }
325
326
327
328
329
330 public boolean isDaemon()
331 {
332 return _daemon;
333 }
334
335
336 public boolean isDetailedDump()
337 {
338 return _detailedDump;
339 }
340
341
342 public void setDetailedDump(boolean detailedDump)
343 {
344 _detailedDump = detailedDump;
345 }
346
347
348 public boolean dispatch(Runnable job)
349 {
350 if (isRunning())
351 {
352 final int jobQ = _jobs.size();
353 final int idle = getIdleThreads();
354 if(_jobs.offer(job))
355 {
356
357 if (idle==0 || jobQ>idle)
358 {
359 int threads=_threadsStarted.get();
360 if (threads<_maxThreads)
361 startThread(threads);
362 }
363 return true;
364 }
365 }
366 LOG.debug("Dispatched {} to stopped {}",job,this);
367 return false;
368 }
369
370
371 public void execute(Runnable job)
372 {
373 if (!dispatch(job))
374 throw new RejectedExecutionException();
375 }
376
377
378
379
380
381 public void join() throws InterruptedException
382 {
383 synchronized (_joinLock)
384 {
385 while (isRunning())
386 _joinLock.wait();
387 }
388
389 while (isStopping())
390 Thread.sleep(1);
391 }
392
393
394
395
396
397 public int getThreads()
398 {
399 return _threadsStarted.get();
400 }
401
402
403
404
405
406 public int getIdleThreads()
407 {
408 return _threadsIdle.get();
409 }
410
411
412
413
414
415 public boolean isLowOnThreads()
416 {
417 return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
418 }
419
420
421 private boolean startThread(int threads)
422 {
423 final int next=threads+1;
424 if (!_threadsStarted.compareAndSet(threads,next))
425 return false;
426
427 boolean started=false;
428 try
429 {
430 Thread thread=newThread(_runnable);
431 thread.setDaemon(_daemon);
432 thread.setPriority(_priority);
433 thread.setName(_name+"-"+thread.getId());
434 _threads.add(thread);
435
436 thread.start();
437 started=true;
438 }
439 finally
440 {
441 if (!started)
442 _threadsStarted.decrementAndGet();
443 }
444 return started;
445 }
446
447
448 protected Thread newThread(Runnable runnable)
449 {
450 return new Thread(runnable);
451 }
452
453
454
455 public String dump()
456 {
457 return AggregateLifeCycle.dump(this);
458 }
459
460
461 public void dump(Appendable out, String indent) throws IOException
462 {
463 List<Object> dump = new ArrayList<Object>(getMaxThreads());
464 for (final Thread thread: _threads)
465 {
466 final StackTraceElement[] trace=thread.getStackTrace();
467 boolean inIdleJobPoll=false;
468
469 if (trace != null)
470 {
471 for (StackTraceElement t : trace)
472 {
473 if ("idleJobPoll".equals(t.getMethodName()))
474 {
475 inIdleJobPoll = true;
476 break;
477 }
478 }
479 }
480 final boolean idle=inIdleJobPoll;
481
482 if (_detailedDump)
483 {
484 dump.add(new Dumpable()
485 {
486 public void dump(Appendable out, String indent) throws IOException
487 {
488 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
489 if (!idle)
490 AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
491 }
492
493 public String dump()
494 {
495 return null;
496 }
497 });
498 }
499 else
500 {
501 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":""));
502 }
503 }
504
505 AggregateLifeCycle.dumpObject(out,this);
506 AggregateLifeCycle.dump(out,indent,dump);
507
508 }
509
510
511
512 @Override
513 public String toString()
514 {
515 return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
516 }
517
518
519 private Runnable idleJobPoll() throws InterruptedException
520 {
521 return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
522 }
523
524
525 private Runnable _runnable = new Runnable()
526 {
527 public void run()
528 {
529 boolean shrink=false;
530 try
531 {
532 Runnable job=_jobs.poll();
533 while (isRunning())
534 {
535
536 while (job!=null && isRunning())
537 {
538 runJob(job);
539 job=_jobs.poll();
540 }
541
542
543 try
544 {
545 _threadsIdle.incrementAndGet();
546
547 while (isRunning() && job==null)
548 {
549 if (_maxIdleTimeMs<=0)
550 job=_jobs.take();
551 else
552 {
553
554 final int size=_threadsStarted.get();
555 if (size>_minThreads)
556 {
557 long last=_lastShrink.get();
558 long now=System.currentTimeMillis();
559 if (last==0 || (now-last)>_maxIdleTimeMs)
560 {
561 shrink=_lastShrink.compareAndSet(last,now) &&
562 _threadsStarted.compareAndSet(size,size-1);
563 if (shrink)
564 return;
565 }
566 }
567 job=idleJobPoll();
568 }
569 }
570 }
571 finally
572 {
573 _threadsIdle.decrementAndGet();
574 }
575 }
576 }
577 catch(InterruptedException e)
578 {
579 LOG.ignore(e);
580 }
581 catch(Exception e)
582 {
583 LOG.warn(e);
584 }
585 finally
586 {
587 if (!shrink)
588 _threadsStarted.decrementAndGet();
589 _threads.remove(Thread.currentThread());
590 }
591 }
592 };
593
594
595
596
597
598
599
600
601 protected void runJob(Runnable job)
602 {
603 job.run();
604 }
605
606
607
608
609
610 protected BlockingQueue<Runnable> getQueue()
611 {
612 return _jobs;
613 }
614
615
616
617
618
619
620
621 @Deprecated
622 public boolean stopThread(long id)
623 {
624 for (Thread thread: _threads)
625 {
626 if (thread.getId()==id)
627 {
628 thread.stop();
629 return true;
630 }
631 }
632 return false;
633 }
634
635
636
637
638
639
640 public boolean interruptThread(long id)
641 {
642 for (Thread thread: _threads)
643 {
644 if (thread.getId()==id)
645 {
646 thread.interrupt();
647 return true;
648 }
649 }
650 return false;
651 }
652
653
654
655
656
657
658 public String dumpThread(long id)
659 {
660 for (Thread thread: _threads)
661 {
662 if (thread.getId()==id)
663 {
664 StringBuilder buf = new StringBuilder();
665 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
666 for (StackTraceElement element : thread.getStackTrace())
667 buf.append(" at ").append(element.toString()).append('\n');
668 return buf.toString();
669 }
670 }
671 return null;
672 }
673 }