1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.eclipse.jetty.util.thread;
16
17 import java.io.IOException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.atomic.AtomicLong;
29
30 import org.eclipse.jetty.util.BlockingArrayQueue;
31 import org.eclipse.jetty.util.component.AbstractLifeCycle;
32 import org.eclipse.jetty.util.component.AggregateLifeCycle;
33 import org.eclipse.jetty.util.component.Dumpable;
34 import org.eclipse.jetty.util.component.LifeCycle;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
38
39 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable
40 {
41 private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
42
43 private final AtomicInteger _threadsStarted = new AtomicInteger();
44 private final AtomicInteger _threadsIdle = new AtomicInteger();
45 private final AtomicLong _lastShrink = new AtomicLong();
46 private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
47 private final Object _joinLock = new Object();
48 private BlockingQueue<Runnable> _jobs;
49 private String _name;
50 private int _maxIdleTimeMs=60000;
51 private int _maxThreads=254;
52 private int _minThreads=8;
53 private int _maxQueued=-1;
54 private int _priority=Thread.NORM_PRIORITY;
55 private boolean _daemon=false;
56 private int _maxStopTime=100;
57 private boolean _detailedDump=false;
58
59
60
61
62 public QueuedThreadPool()
63 {
64 _name="qtp"+super.hashCode();
65 }
66
67
68
69
70 public QueuedThreadPool(int maxThreads)
71 {
72 this();
73 setMaxThreads(maxThreads);
74 }
75
76
77
78
79 public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
80 {
81 this();
82 _jobs=jobQ;
83 _jobs.clear();
84 }
85
86
87
88 @Override
89 protected void doStart() throws Exception
90 {
91 super.doStart();
92 _threadsStarted.set(0);
93
94 if (_jobs==null)
95 {
96 _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
97 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
98 }
99
100 int threads=_threadsStarted.get();
101 while (isRunning() && threads<_minThreads)
102 {
103 startThread(threads);
104 threads=_threadsStarted.get();
105 }
106 }
107
108
109 @Override
110 protected void doStop() throws Exception
111 {
112 super.doStop();
113 long start=System.currentTimeMillis();
114
115
116 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
117 Thread.sleep(1);
118
119
120 _jobs.clear();
121 Runnable noop = new Runnable(){public void run(){}};
122 for (int i=_threadsIdle.get();i-->0;)
123 _jobs.offer(noop);
124 Thread.yield();
125
126
127 if (_threadsStarted.get()>0)
128 for (Thread thread : _threads)
129 thread.interrupt();
130
131
132 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
133 {
134 Thread.sleep(1);
135 }
136 Thread.yield();
137 int size=_threads.size();
138 if (size>0)
139 {
140 LOG.warn(size+" threads could not be stopped");
141
142 if (size==1 || LOG.isDebugEnabled())
143 {
144 for (Thread unstopped : _threads)
145 {
146 LOG.info("Couldn't stop "+unstopped);
147 for (StackTraceElement element : unstopped.getStackTrace())
148 {
149 LOG.info(" at "+element);
150 }
151 }
152 }
153 }
154
155 synchronized (_joinLock)
156 {
157 _joinLock.notifyAll();
158 }
159 }
160
161
162
163
164
165 public void setDaemon(boolean daemon)
166 {
167 _daemon=daemon;
168 }
169
170
171
172
173
174
175
176
177
178 public void setMaxIdleTimeMs(int maxIdleTimeMs)
179 {
180 _maxIdleTimeMs=maxIdleTimeMs;
181 }
182
183
184
185
186
187 public void setMaxStopTimeMs(int stopTimeMs)
188 {
189 _maxStopTime = stopTimeMs;
190 }
191
192
193
194
195
196
197
198 public void setMaxThreads(int maxThreads)
199 {
200 _maxThreads=maxThreads;
201 if (_minThreads>_maxThreads)
202 _minThreads=_maxThreads;
203 }
204
205
206
207
208
209
210
211 public void setMinThreads(int minThreads)
212 {
213 _minThreads=minThreads;
214
215 if (_minThreads>_maxThreads)
216 _maxThreads=_minThreads;
217
218 int threads=_threadsStarted.get();
219 while (isStarted() && threads<_minThreads)
220 {
221 startThread(threads);
222 threads=_threadsStarted.get();
223 }
224 }
225
226
227
228
229
230 public void setName(String name)
231 {
232 if (isRunning())
233 throw new IllegalStateException("started");
234 _name= name;
235 }
236
237
238
239
240
241 public void setThreadsPriority(int priority)
242 {
243 _priority=priority;
244 }
245
246
247
248
249
250 public int getMaxQueued()
251 {
252 return _maxQueued;
253 }
254
255
256
257
258
259 public void setMaxQueued(int max)
260 {
261 if (isRunning())
262 throw new IllegalStateException("started");
263 _maxQueued=max;
264 }
265
266
267
268
269
270
271
272 public int getMaxIdleTimeMs()
273 {
274 return _maxIdleTimeMs;
275 }
276
277
278
279
280
281 public int getMaxStopTimeMs()
282 {
283 return _maxStopTime;
284 }
285
286
287
288
289
290
291
292 public int getMaxThreads()
293 {
294 return _maxThreads;
295 }
296
297
298
299
300
301
302
303 public int getMinThreads()
304 {
305 return _minThreads;
306 }
307
308
309
310
311
312 public String getName()
313 {
314 return _name;
315 }
316
317
318
319
320
321 public int getThreadsPriority()
322 {
323 return _priority;
324 }
325
326
327
328
329
330 public boolean isDaemon()
331 {
332 return _daemon;
333 }
334
335
336 public boolean isDetailedDump()
337 {
338 return _detailedDump;
339 }
340
341
342 public void setDetailedDump(boolean detailedDump)
343 {
344 _detailedDump = detailedDump;
345 }
346
347
348 public boolean dispatch(Runnable job)
349 {
350 if (isRunning())
351 {
352 final int jobQ = _jobs.size();
353 final int idle = getIdleThreads();
354 if(_jobs.offer(job))
355 {
356
357 if (idle==0 || jobQ>idle)
358 {
359 int threads=_threadsStarted.get();
360 if (threads<_maxThreads)
361 startThread(threads);
362 }
363 return true;
364 }
365 }
366 return false;
367 }
368
369
370 public void execute(Runnable job)
371 {
372 if (!dispatch(job))
373 throw new RejectedExecutionException();
374 }
375
376
377
378
379
380 public void join() throws InterruptedException
381 {
382 synchronized (_joinLock)
383 {
384 while (isRunning())
385 _joinLock.wait();
386 }
387
388 while (isStopping())
389 Thread.sleep(1);
390 }
391
392
393
394
395
396 public int getThreads()
397 {
398 return _threadsStarted.get();
399 }
400
401
402
403
404
405 public int getIdleThreads()
406 {
407 return _threadsIdle.get();
408 }
409
410
411
412
413
414 public boolean isLowOnThreads()
415 {
416 return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
417 }
418
419
420 private boolean startThread(int threads)
421 {
422 final int next=threads+1;
423 if (!_threadsStarted.compareAndSet(threads,next))
424 return false;
425
426 boolean started=false;
427 try
428 {
429 Thread thread=newThread(_runnable);
430 thread.setDaemon(_daemon);
431 thread.setPriority(_priority);
432 thread.setName(_name+"-"+thread.getId());
433 _threads.add(thread);
434
435 thread.start();
436 started=true;
437 }
438 finally
439 {
440 if (!started)
441 _threadsStarted.decrementAndGet();
442 }
443 return started;
444 }
445
446
447 protected Thread newThread(Runnable runnable)
448 {
449 return new Thread(runnable);
450 }
451
452
453
454 public String dump()
455 {
456 return AggregateLifeCycle.dump(this);
457 }
458
459
460 public void dump(Appendable out, String indent) throws IOException
461 {
462 List<Object> dump = new ArrayList<Object>(getMaxThreads());
463 for (final Thread thread: _threads)
464 {
465 final StackTraceElement[] trace=thread.getStackTrace();
466 boolean inIdleJobPoll=false;
467 for (StackTraceElement t : trace)
468 {
469 if ("idleJobPoll".equals(t.getMethodName()))
470 {
471 inIdleJobPoll=true;
472 break;
473 }
474 }
475 final boolean idle=inIdleJobPoll;
476
477 if (_detailedDump)
478 {
479 dump.add(new Dumpable()
480 {
481 public void dump(Appendable out, String indent) throws IOException
482 {
483 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
484 if (!idle)
485 AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
486 }
487
488 public String dump()
489 {
490 return null;
491 }
492 });
493 }
494 else
495 {
496 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":""));
497 }
498 }
499
500 out.append(String.valueOf(this)).append("\n");
501 AggregateLifeCycle.dump(out,indent,dump);
502
503 }
504
505
506
507 @Override
508 public String toString()
509 {
510 return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}#"+getState();
511 }
512
513
514 private Runnable idleJobPoll() throws InterruptedException
515 {
516 return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
517 }
518
519
520 private Runnable _runnable = new Runnable()
521 {
522 public void run()
523 {
524 boolean shrink=false;
525 try
526 {
527 Runnable job=_jobs.poll();
528 while (isRunning())
529 {
530
531 while (job!=null && isRunning())
532 {
533 runJob(job);
534 job=_jobs.poll();
535 }
536
537
538 try
539 {
540 _threadsIdle.incrementAndGet();
541
542 while (isRunning() && job==null)
543 {
544 if (_maxIdleTimeMs<=0)
545 job=_jobs.take();
546 else
547 {
548
549 final int size=_threadsStarted.get();
550 if (size>_minThreads)
551 {
552 long last=_lastShrink.get();
553 long now=System.currentTimeMillis();
554 if (last==0 || (now-last)>_maxIdleTimeMs)
555 {
556 shrink=_lastShrink.compareAndSet(last,now) &&
557 _threadsStarted.compareAndSet(size,size-1);
558 if (shrink)
559 return;
560 }
561 }
562 job=idleJobPoll();
563 }
564 }
565 }
566 finally
567 {
568 _threadsIdle.decrementAndGet();
569 }
570 }
571 }
572 catch(InterruptedException e)
573 {
574 LOG.ignore(e);
575 }
576 catch(Exception e)
577 {
578 LOG.warn(e);
579 }
580 finally
581 {
582 if (!shrink)
583 _threadsStarted.decrementAndGet();
584 _threads.remove(Thread.currentThread());
585 }
586 }
587 };
588
589
590
591
592
593
594
595
596 protected void runJob(Runnable job)
597 {
598 job.run();
599 }
600
601
602
603
604
605 protected BlockingQueue<Runnable> getQueue()
606 {
607 return _jobs;
608 }
609
610
611
612
613
614
615
616 @Deprecated
617 public boolean stopThread(long id)
618 {
619 for (Thread thread: _threads)
620 {
621 if (thread.getId()==id)
622 {
623 thread.stop();
624 return true;
625 }
626 }
627 return false;
628 }
629
630
631
632
633
634
635 public boolean interruptThread(long id)
636 {
637 for (Thread thread: _threads)
638 {
639 if (thread.getId()==id)
640 {
641 thread.interrupt();
642 return true;
643 }
644 }
645 return false;
646 }
647
648
649
650
651
652
653 public String dumpThread(long id)
654 {
655 for (Thread thread: _threads)
656 {
657 if (thread.getId()==id)
658 {
659 StringBuilder buf = new StringBuilder();
660 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
661 for (StackTraceElement element : thread.getStackTrace())
662 buf.append(" at ").append(element.toString()).append('\n');
663 return buf.toString();
664 }
665 }
666 return null;
667 }
668 }