1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.eclipse.jetty.util.thread;
16
17 import java.io.IOException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.atomic.AtomicLong;
29
30 import org.eclipse.jetty.util.BlockingArrayQueue;
31 import org.eclipse.jetty.util.component.AbstractLifeCycle;
32 import org.eclipse.jetty.util.component.AggregateLifeCycle;
33 import org.eclipse.jetty.util.component.Dumpable;
34 import org.eclipse.jetty.util.component.LifeCycle;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
38
39 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable
40 {
41 private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
42
43 private final AtomicInteger _threadsStarted = new AtomicInteger();
44 private final AtomicInteger _threadsIdle = new AtomicInteger();
45 private final AtomicLong _lastShrink = new AtomicLong();
46 private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
47 private final Object _joinLock = new Object();
48 private BlockingQueue<Runnable> _jobs;
49 private String _name;
50 private int _maxIdleTimeMs=60000;
51 private int _maxThreads=254;
52 private int _minThreads=8;
53 private int _maxQueued=-1;
54 private int _priority=Thread.NORM_PRIORITY;
55 private boolean _daemon=false;
56 private int _maxStopTime=100;
57 private boolean _detailedDump=false;
58
59
60
61
62 public QueuedThreadPool()
63 {
64 _name="qtp"+super.hashCode();
65 }
66
67
68
69
70 public QueuedThreadPool(int maxThreads)
71 {
72 this();
73 setMaxThreads(maxThreads);
74 }
75
76
77
78
79 public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
80 {
81 this();
82 _jobs=jobQ;
83 _jobs.clear();
84 }
85
86
87
88 @Override
89 protected void doStart() throws Exception
90 {
91 super.doStart();
92 _threadsStarted.set(0);
93
94 if (_jobs==null)
95 {
96 _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
97 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
98 }
99
100 int threads=_threadsStarted.get();
101 while (isRunning() && threads<_minThreads)
102 {
103 startThread(threads);
104 threads=_threadsStarted.get();
105 }
106 }
107
108
109 @Override
110 protected void doStop() throws Exception
111 {
112 super.doStop();
113 long start=System.currentTimeMillis();
114
115
116 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
117 Thread.sleep(1);
118
119
120 _jobs.clear();
121 Runnable noop = new Runnable(){public void run(){}};
122 for (int i=_threadsIdle.get();i-->0;)
123 _jobs.offer(noop);
124 Thread.yield();
125
126
127 if (_threadsStarted.get()>0)
128 for (Thread thread : _threads)
129 thread.interrupt();
130
131
132 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
133 {
134 Thread.sleep(1);
135 }
136 Thread.yield();
137 int size=_threads.size();
138 if (size>0)
139 {
140 LOG.warn(size+" threads could not be stopped");
141
142 if (size==1 || LOG.isDebugEnabled())
143 {
144 for (Thread unstopped : _threads)
145 {
146 LOG.info("Couldn't stop "+unstopped);
147 for (StackTraceElement element : unstopped.getStackTrace())
148 {
149 LOG.info(" at "+element);
150 }
151 }
152 }
153 }
154
155 synchronized (_joinLock)
156 {
157 _joinLock.notifyAll();
158 }
159 }
160
161
162
163
164
165 public void setDaemon(boolean daemon)
166 {
167 _daemon=daemon;
168 }
169
170
171
172
173
174
175
176
177
178 public void setMaxIdleTimeMs(int maxIdleTimeMs)
179 {
180 _maxIdleTimeMs=maxIdleTimeMs;
181 }
182
183
184
185
186
187 public void setMaxStopTimeMs(int stopTimeMs)
188 {
189 _maxStopTime = stopTimeMs;
190 }
191
192
193
194
195
196
197
198 public void setMaxThreads(int maxThreads)
199 {
200 _maxThreads=maxThreads;
201 if (_minThreads>_maxThreads)
202 _minThreads=_maxThreads;
203 }
204
205
206
207
208
209
210
211 public void setMinThreads(int minThreads)
212 {
213 _minThreads=minThreads;
214
215 if (_minThreads>_maxThreads)
216 _maxThreads=_minThreads;
217
218 int threads=_threadsStarted.get();
219 while (isStarted() && threads<_minThreads)
220 {
221 startThread(threads);
222 threads=_threadsStarted.get();
223 }
224 }
225
226
227
228
229
230 public void setName(String name)
231 {
232 if (isRunning())
233 throw new IllegalStateException("started");
234 _name= name;
235 }
236
237
238
239
240
241 public void setThreadsPriority(int priority)
242 {
243 _priority=priority;
244 }
245
246
247
248
249
250 public int getMaxQueued()
251 {
252 return _maxQueued;
253 }
254
255
256
257
258
259 public void setMaxQueued(int max)
260 {
261 if (isRunning())
262 throw new IllegalStateException("started");
263 _maxQueued=max;
264 }
265
266
267
268
269
270
271
272 public int getMaxIdleTimeMs()
273 {
274 return _maxIdleTimeMs;
275 }
276
277
278
279
280
281 public int getMaxStopTimeMs()
282 {
283 return _maxStopTime;
284 }
285
286
287
288
289
290
291
292 public int getMaxThreads()
293 {
294 return _maxThreads;
295 }
296
297
298
299
300
301
302
303 public int getMinThreads()
304 {
305 return _minThreads;
306 }
307
308
309
310
311
312 public String getName()
313 {
314 return _name;
315 }
316
317
318
319
320
321 public int getThreadsPriority()
322 {
323 return _priority;
324 }
325
326
327
328
329
330 public boolean isDaemon()
331 {
332 return _daemon;
333 }
334
335
336 public boolean isDetailedDump()
337 {
338 return _detailedDump;
339 }
340
341
342 public void setDetailedDump(boolean detailedDump)
343 {
344 _detailedDump = detailedDump;
345 }
346
347
348 public boolean dispatch(Runnable job)
349 {
350 if (isRunning())
351 {
352 final int jobQ = _jobs.size();
353 final int idle = getIdleThreads();
354 if(_jobs.offer(job))
355 {
356
357 if (idle==0 || jobQ>idle)
358 {
359 int threads=_threadsStarted.get();
360 if (threads<_maxThreads)
361 startThread(threads);
362 }
363 return true;
364 }
365 }
366 LOG.debug("Dispatched {} to stopped {}",job,this);
367 return false;
368 }
369
370
371 public void execute(Runnable job)
372 {
373 if (!dispatch(job))
374 throw new RejectedExecutionException();
375 }
376
377
378
379
380
381 public void join() throws InterruptedException
382 {
383 synchronized (_joinLock)
384 {
385 while (isRunning())
386 _joinLock.wait();
387 }
388
389 while (isStopping())
390 Thread.sleep(1);
391 }
392
393
394
395
396
397 public int getThreads()
398 {
399 return _threadsStarted.get();
400 }
401
402
403
404
405
406 public int getIdleThreads()
407 {
408 return _threadsIdle.get();
409 }
410
411
412
413
414
415 public boolean isLowOnThreads()
416 {
417 return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
418 }
419
420
421 private boolean startThread(int threads)
422 {
423 final int next=threads+1;
424 if (!_threadsStarted.compareAndSet(threads,next))
425 return false;
426
427 boolean started=false;
428 try
429 {
430 Thread thread=newThread(_runnable);
431 thread.setDaemon(_daemon);
432 thread.setPriority(_priority);
433 thread.setName(_name+"-"+thread.getId());
434 _threads.add(thread);
435
436 thread.start();
437 started=true;
438 }
439 finally
440 {
441 if (!started)
442 _threadsStarted.decrementAndGet();
443 }
444 return started;
445 }
446
447
448 protected Thread newThread(Runnable runnable)
449 {
450 return new Thread(runnable);
451 }
452
453
454
455 public String dump()
456 {
457 return AggregateLifeCycle.dump(this);
458 }
459
460
461 public void dump(Appendable out, String indent) throws IOException
462 {
463 List<Object> dump = new ArrayList<Object>(getMaxThreads());
464 for (final Thread thread: _threads)
465 {
466 final StackTraceElement[] trace=thread.getStackTrace();
467 boolean inIdleJobPoll=false;
468 for (StackTraceElement t : trace)
469 {
470 if ("idleJobPoll".equals(t.getMethodName()))
471 {
472 inIdleJobPoll=true;
473 break;
474 }
475 }
476 final boolean idle=inIdleJobPoll;
477
478 if (_detailedDump)
479 {
480 dump.add(new Dumpable()
481 {
482 public void dump(Appendable out, String indent) throws IOException
483 {
484 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
485 if (!idle)
486 AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
487 }
488
489 public String dump()
490 {
491 return null;
492 }
493 });
494 }
495 else
496 {
497 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":""));
498 }
499 }
500
501 AggregateLifeCycle.dumpObject(out,this);
502 AggregateLifeCycle.dump(out,indent,dump);
503
504 }
505
506
507
508 @Override
509 public String toString()
510 {
511 return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
512 }
513
514
515 private Runnable idleJobPoll() throws InterruptedException
516 {
517 return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
518 }
519
520
521 private Runnable _runnable = new Runnable()
522 {
523 public void run()
524 {
525 boolean shrink=false;
526 try
527 {
528 Runnable job=_jobs.poll();
529 while (isRunning())
530 {
531
532 while (job!=null && isRunning())
533 {
534 runJob(job);
535 job=_jobs.poll();
536 }
537
538
539 try
540 {
541 _threadsIdle.incrementAndGet();
542
543 while (isRunning() && job==null)
544 {
545 if (_maxIdleTimeMs<=0)
546 job=_jobs.take();
547 else
548 {
549
550 final int size=_threadsStarted.get();
551 if (size>_minThreads)
552 {
553 long last=_lastShrink.get();
554 long now=System.currentTimeMillis();
555 if (last==0 || (now-last)>_maxIdleTimeMs)
556 {
557 shrink=_lastShrink.compareAndSet(last,now) &&
558 _threadsStarted.compareAndSet(size,size-1);
559 if (shrink)
560 return;
561 }
562 }
563 job=idleJobPoll();
564 }
565 }
566 }
567 finally
568 {
569 _threadsIdle.decrementAndGet();
570 }
571 }
572 }
573 catch(InterruptedException e)
574 {
575 LOG.ignore(e);
576 }
577 catch(Exception e)
578 {
579 LOG.warn(e);
580 }
581 finally
582 {
583 if (!shrink)
584 _threadsStarted.decrementAndGet();
585 _threads.remove(Thread.currentThread());
586 }
587 }
588 };
589
590
591
592
593
594
595
596
597 protected void runJob(Runnable job)
598 {
599 job.run();
600 }
601
602
603
604
605
606 protected BlockingQueue<Runnable> getQueue()
607 {
608 return _jobs;
609 }
610
611
612
613
614
615
616
617 @Deprecated
618 public boolean stopThread(long id)
619 {
620 for (Thread thread: _threads)
621 {
622 if (thread.getId()==id)
623 {
624 thread.stop();
625 return true;
626 }
627 }
628 return false;
629 }
630
631
632
633
634
635
636 public boolean interruptThread(long id)
637 {
638 for (Thread thread: _threads)
639 {
640 if (thread.getId()==id)
641 {
642 thread.interrupt();
643 return true;
644 }
645 }
646 return false;
647 }
648
649
650
651
652
653
654 public String dumpThread(long id)
655 {
656 for (Thread thread: _threads)
657 {
658 if (thread.getId()==id)
659 {
660 StringBuilder buf = new StringBuilder();
661 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
662 for (StackTraceElement element : thread.getStackTrace())
663 buf.append(" at ").append(element.toString()).append('\n');
664 return buf.toString();
665 }
666 }
667 return null;
668 }
669 }