1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.eclipse.jetty.util.thread;
16
17 import java.io.IOException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.atomic.AtomicLong;
29
30 import org.eclipse.jetty.util.BlockingArrayQueue;
31 import org.eclipse.jetty.util.component.AbstractLifeCycle;
32 import org.eclipse.jetty.util.component.AggregateLifeCycle;
33 import org.eclipse.jetty.util.component.Dumpable;
34 import org.eclipse.jetty.util.component.LifeCycle;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
37
38 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable
39 {
40 private final AtomicInteger _threadsStarted = new AtomicInteger();
41 private final AtomicInteger _threadsIdle = new AtomicInteger();
42 private final AtomicLong _lastShrink = new AtomicLong();
43 private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
44 private final Object _joinLock = new Object();
45 private BlockingQueue<Runnable> _jobs;
46 private String _name;
47 private int _maxIdleTimeMs=60000;
48 private int _maxThreads=254;
49 private int _minThreads=8;
50 private int _maxQueued=-1;
51 private int _priority=Thread.NORM_PRIORITY;
52 private boolean _daemon=false;
53 private int _maxStopTime=100;
54 private boolean _detailedDump=false;
55
56
57
58
59 public QueuedThreadPool()
60 {
61 _name="qtp"+super.hashCode();
62 }
63
64
65
66
67 public QueuedThreadPool(int maxThreads)
68 {
69 this();
70 setMaxThreads(maxThreads);
71 }
72
73
74
75
76 public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
77 {
78 this();
79 _jobs=jobQ;
80 _jobs.clear();
81 }
82
83
84
85 @Override
86 protected void doStart() throws Exception
87 {
88 super.doStart();
89 _threadsStarted.set(0);
90
91 if (_jobs==null)
92 {
93 _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
94 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
95 }
96
97 int threads=_threadsStarted.get();
98 while (isRunning() && threads<_minThreads)
99 {
100 startThread(threads);
101 threads=_threadsStarted.get();
102 }
103 }
104
105
106 @Override
107 protected void doStop() throws Exception
108 {
109 super.doStop();
110 long start=System.currentTimeMillis();
111
112
113 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
114 Thread.sleep(1);
115
116
117 _jobs.clear();
118 Runnable noop = new Runnable(){public void run(){}};
119 for (int i=_threadsIdle.get();i-->0;)
120 _jobs.offer(noop);
121 Thread.yield();
122
123
124 if (_threadsStarted.get()>0)
125 for (Thread thread : _threads)
126 thread.interrupt();
127
128
129 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
130 {
131 Thread.sleep(1);
132 }
133 Thread.yield();
134 int size=_threads.size();
135 if (size>0)
136 {
137 Log.warn(size+" threads could not be stopped");
138
139 if (Log.isDebugEnabled())
140 {
141 for (Thread unstopped : _threads)
142 {
143 Log.debug("Couldn't stop "+unstopped);
144 for (StackTraceElement element : unstopped.getStackTrace())
145 {
146 Log.debug(" at "+element);
147 }
148 }
149 }
150 }
151
152 synchronized (_joinLock)
153 {
154 _joinLock.notifyAll();
155 }
156 }
157
158
159
160
161
162 public void setDaemon(boolean daemon)
163 {
164 _daemon=daemon;
165 }
166
167
168
169
170
171
172
173
174
175 public void setMaxIdleTimeMs(int maxIdleTimeMs)
176 {
177 _maxIdleTimeMs=maxIdleTimeMs;
178 }
179
180
181
182
183
184 public void setMaxStopTimeMs(int stopTimeMs)
185 {
186 _maxStopTime = stopTimeMs;
187 }
188
189
190
191
192
193
194
195 public void setMaxThreads(int maxThreads)
196 {
197 _maxThreads=maxThreads;
198 if (_minThreads>_maxThreads)
199 _minThreads=_maxThreads;
200 }
201
202
203
204
205
206
207
208 public void setMinThreads(int minThreads)
209 {
210 _minThreads=minThreads;
211
212 if (_minThreads>_maxThreads)
213 _maxThreads=_minThreads;
214
215 int threads=_threadsStarted.get();
216 while (isStarted() && threads<_minThreads)
217 {
218 startThread(threads);
219 threads=_threadsStarted.get();
220 }
221 }
222
223
224
225
226
227 public void setName(String name)
228 {
229 if (isRunning())
230 throw new IllegalStateException("started");
231 _name= name;
232 }
233
234
235
236
237
238 public void setThreadsPriority(int priority)
239 {
240 _priority=priority;
241 }
242
243
244
245
246
247 public int getMaxQueued()
248 {
249 return _maxQueued;
250 }
251
252
253
254
255
256 public void setMaxQueued(int max)
257 {
258 if (isRunning())
259 throw new IllegalStateException("started");
260 _maxQueued=max;
261 }
262
263
264
265
266
267
268
269 public int getMaxIdleTimeMs()
270 {
271 return _maxIdleTimeMs;
272 }
273
274
275
276
277
278 public int getMaxStopTimeMs()
279 {
280 return _maxStopTime;
281 }
282
283
284
285
286
287
288
289 public int getMaxThreads()
290 {
291 return _maxThreads;
292 }
293
294
295
296
297
298
299
300 public int getMinThreads()
301 {
302 return _minThreads;
303 }
304
305
306
307
308
309 public String getName()
310 {
311 return _name;
312 }
313
314
315
316
317
318 public int getThreadsPriority()
319 {
320 return _priority;
321 }
322
323
324
325
326
327 public boolean isDaemon()
328 {
329 return _daemon;
330 }
331
332
333 public boolean isDetailedDump()
334 {
335 return _detailedDump;
336 }
337
338
339 public void setDetailedDump(boolean detailedDump)
340 {
341 _detailedDump = detailedDump;
342 }
343
344
345 public boolean dispatch(Runnable job)
346 {
347 if (isRunning())
348 {
349 final int jobQ = _jobs.size();
350 final int idle = getIdleThreads();
351 if(_jobs.offer(job))
352 {
353
354 if (idle==0 || jobQ>idle)
355 {
356 int threads=_threadsStarted.get();
357 if (threads<_maxThreads)
358 startThread(threads);
359 }
360 return true;
361 }
362 }
363 return false;
364 }
365
366
367 public void execute(Runnable job)
368 {
369 if (!dispatch(job))
370 throw new RejectedExecutionException();
371 }
372
373
374
375
376
377 public void join() throws InterruptedException
378 {
379 synchronized (_joinLock)
380 {
381 while (isRunning())
382 _joinLock.wait();
383 }
384
385 while (isStopping())
386 Thread.sleep(1);
387 }
388
389
390
391
392
393 public int getThreads()
394 {
395 return _threadsStarted.get();
396 }
397
398
399
400
401
402 public int getIdleThreads()
403 {
404 return _threadsIdle.get();
405 }
406
407
408
409
410
411 public boolean isLowOnThreads()
412 {
413 return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
414 }
415
416
417 private boolean startThread(int threads)
418 {
419 final int next=threads+1;
420 if (!_threadsStarted.compareAndSet(threads,next))
421 return false;
422
423 boolean started=false;
424 try
425 {
426 Thread thread=newThread(_runnable);
427 thread.setDaemon(_daemon);
428 thread.setPriority(_priority);
429 thread.setName(_name+"-"+thread.getId());
430 _threads.add(thread);
431
432 thread.start();
433 started=true;
434 }
435 finally
436 {
437 if (!started)
438 _threadsStarted.decrementAndGet();
439 }
440 return started;
441 }
442
443
444 protected Thread newThread(Runnable runnable)
445 {
446 return new Thread(runnable);
447 }
448
449
450
451 public String dump()
452 {
453 return AggregateLifeCycle.dump(this);
454 }
455
456
457 public void dump(Appendable out, String indent) throws IOException
458 {
459 List<Object> dump = new ArrayList<Object>(getMaxThreads());
460 for (final Thread thread: _threads)
461 {
462 final StackTraceElement[] trace=thread.getStackTrace();
463 boolean inIdleJobPoll=false;
464 for (StackTraceElement t : trace)
465 {
466 if ("idleJobPoll".equals(t.getMethodName()))
467 {
468 inIdleJobPoll=true;
469 break;
470 }
471 }
472 final boolean idle=inIdleJobPoll;
473
474 if (_detailedDump)
475 {
476 dump.add(new Dumpable()
477 {
478 public void dump(Appendable out, String indent) throws IOException
479 {
480 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
481 if (!idle)
482 AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
483 }
484
485 public String dump()
486 {
487 return null;
488 }
489 });
490 }
491 else
492 {
493 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":""));
494 }
495 }
496
497 out.append(String.valueOf(this)).append("\n");
498 AggregateLifeCycle.dump(out,indent,dump);
499
500 }
501
502
503
504 @Override
505 public String toString()
506 {
507 return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
508 }
509
510 private Runnable idleJobPoll() throws InterruptedException
511 {
512 return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
513 }
514
515
516 private Runnable _runnable = new Runnable()
517 {
518 public void run()
519 {
520 boolean shrink=false;
521 try
522 {
523 Runnable job=_jobs.poll();
524 while (isRunning())
525 {
526
527 while (job!=null && isRunning())
528 {
529 job.run();
530 job=_jobs.poll();
531 }
532
533
534 try
535 {
536 _threadsIdle.incrementAndGet();
537
538 while (isRunning() && job==null)
539 {
540 if (_maxIdleTimeMs<=0)
541 job=_jobs.take();
542 else
543 {
544
545 final int size=_threadsStarted.get();
546 if (size>_minThreads)
547 {
548 long last=_lastShrink.get();
549 long now=System.currentTimeMillis();
550 if (last==0 || (now-last)>_maxIdleTimeMs)
551 {
552 shrink=_lastShrink.compareAndSet(last,now) &&
553 _threadsStarted.compareAndSet(size,size-1);
554 if (shrink)
555 return;
556 }
557 }
558 job=idleJobPoll();
559 }
560 }
561 }
562 finally
563 {
564 _threadsIdle.decrementAndGet();
565 }
566 }
567 }
568 catch(InterruptedException e)
569 {
570 Log.ignore(e);
571 }
572 catch(Exception e)
573 {
574 Log.warn(e);
575 }
576 finally
577 {
578 if (!shrink)
579 _threadsStarted.decrementAndGet();
580 _threads.remove(Thread.currentThread());
581 }
582 }
583 };
584
585
586
587
588
589
590
591 @SuppressWarnings("deprecation")
592 public boolean stopThread(long id)
593 {
594 for (Thread thread: _threads)
595 {
596 if (thread.getId()==id)
597 {
598 thread.stop();
599 return true;
600 }
601 }
602 return false;
603 }
604
605
606
607
608
609
610 public boolean interruptThread(long id)
611 {
612 for (Thread thread: _threads)
613 {
614 if (thread.getId()==id)
615 {
616 thread.interrupt();
617 return true;
618 }
619 }
620 return false;
621 }
622
623
624
625
626
627
628 public String dumpThread(long id)
629 {
630 for (Thread thread: _threads)
631 {
632 if (thread.getId()==id)
633 {
634 StringBuilder buf = new StringBuilder();
635 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
636 for (StackTraceElement element : thread.getStackTrace())
637 buf.append(" at ").append(element.toString()).append('\n');
638 return buf.toString();
639 }
640 }
641 return null;
642 }
643 }