1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.eclipse.jetty.util.thread;
16
17 import java.io.IOException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.atomic.AtomicLong;
29
30 import org.eclipse.jetty.util.BlockingArrayQueue;
31 import org.eclipse.jetty.util.component.AbstractLifeCycle;
32 import org.eclipse.jetty.util.component.AggregateLifeCycle;
33 import org.eclipse.jetty.util.component.Dumpable;
34 import org.eclipse.jetty.util.component.LifeCycle;
35 import org.eclipse.jetty.util.log.Log;
36
37 public class QueuedThreadPool extends AbstractLifeCycle implements ThreadPool, Executor, Dumpable
38 {
39 private final AtomicInteger _threadsStarted = new AtomicInteger();
40 private final AtomicInteger _threadsIdle = new AtomicInteger();
41 private final AtomicLong _lastShrink = new AtomicLong();
42 private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
43 private final Object _joinLock = new Object();
44 private BlockingQueue<Runnable> _jobs;
45 private String _name;
46 private int _maxIdleTimeMs=60000;
47 private int _maxThreads=254;
48 private int _minThreads=8;
49 private int _maxQueued=-1;
50 private int _priority=Thread.NORM_PRIORITY;
51 private boolean _daemon=false;
52 private int _maxStopTime=100;
53 private boolean _detailedDump=false;
54
55
56
57
58 public QueuedThreadPool()
59 {
60 _name="qtp"+super.hashCode();
61 }
62
63
64
65
66 public QueuedThreadPool(int maxThreads)
67 {
68 this();
69 setMaxThreads(maxThreads);
70 }
71
72
73
74
75 public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
76 {
77 this();
78 _jobs=jobQ;
79 _jobs.clear();
80 }
81
82
83
84 @Override
85 protected void doStart() throws Exception
86 {
87 super.doStart();
88 _threadsStarted.set(0);
89
90 if (_jobs==null)
91 {
92 _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
93 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
94 }
95
96 int threads=_threadsStarted.get();
97 while (isRunning() && threads<_minThreads)
98 {
99 startThread(threads);
100 threads=_threadsStarted.get();
101 }
102 }
103
104
105 @Override
106 protected void doStop() throws Exception
107 {
108 super.doStop();
109 long start=System.currentTimeMillis();
110
111
112 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
113 Thread.sleep(1);
114
115
116 _jobs.clear();
117 Runnable noop = new Runnable(){public void run(){}};
118 for (int i=_threadsIdle.get();i-->0;)
119 _jobs.offer(noop);
120 Thread.yield();
121
122
123 if (_threadsStarted.get()>0)
124 for (Thread thread : _threads)
125 thread.interrupt();
126
127
128 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
129 {
130 Thread.sleep(1);
131 }
132 Thread.yield();
133 int size=_threads.size();
134 if (size>0)
135 {
136 Log.warn(size+" threads could not be stopped");
137
138 if (Log.isDebugEnabled())
139 {
140 for (Thread unstopped : _threads)
141 {
142 Log.debug("Couldn't stop "+unstopped);
143 for (StackTraceElement element : unstopped.getStackTrace())
144 {
145 Log.debug(" at "+element);
146 }
147 }
148 }
149 }
150
151 synchronized (_joinLock)
152 {
153 _joinLock.notifyAll();
154 }
155 }
156
157
158
159
160
161 public void setDaemon(boolean daemon)
162 {
163 _daemon=daemon;
164 }
165
166
167
168
169
170
171
172
173
174 public void setMaxIdleTimeMs(int maxIdleTimeMs)
175 {
176 _maxIdleTimeMs=maxIdleTimeMs;
177 }
178
179
180
181
182
183 public void setMaxStopTimeMs(int stopTimeMs)
184 {
185 _maxStopTime = stopTimeMs;
186 }
187
188
189
190
191
192
193
194 public void setMaxThreads(int maxThreads)
195 {
196 _maxThreads=maxThreads;
197 if (_minThreads>_maxThreads)
198 _minThreads=_maxThreads;
199 }
200
201
202
203
204
205
206
207 public void setMinThreads(int minThreads)
208 {
209 _minThreads=minThreads;
210
211 if (_minThreads>_maxThreads)
212 _maxThreads=_minThreads;
213
214 int threads=_threadsStarted.get();
215 while (isStarted() && threads<_minThreads)
216 {
217 startThread(threads);
218 threads=_threadsStarted.get();
219 }
220 }
221
222
223
224
225
226 public void setName(String name)
227 {
228 if (isRunning())
229 throw new IllegalStateException("started");
230 _name= name;
231 }
232
233
234
235
236
237 public void setThreadsPriority(int priority)
238 {
239 _priority=priority;
240 }
241
242
243
244
245
246 public int getMaxQueued()
247 {
248 return _maxQueued;
249 }
250
251
252
253
254
255 public void setMaxQueued(int max)
256 {
257 if (isRunning())
258 throw new IllegalStateException("started");
259 _maxQueued=max;
260 }
261
262
263
264
265
266
267
268 public int getMaxIdleTimeMs()
269 {
270 return _maxIdleTimeMs;
271 }
272
273
274
275
276
277 public int getMaxStopTimeMs()
278 {
279 return _maxStopTime;
280 }
281
282
283
284
285
286
287
288 public int getMaxThreads()
289 {
290 return _maxThreads;
291 }
292
293
294
295
296
297
298
299 public int getMinThreads()
300 {
301 return _minThreads;
302 }
303
304
305
306
307
308 public String getName()
309 {
310 return _name;
311 }
312
313
314
315
316
317 public int getThreadsPriority()
318 {
319 return _priority;
320 }
321
322
323
324
325
326 public boolean isDaemon()
327 {
328 return _daemon;
329 }
330
331
332 public boolean isDetailedDump()
333 {
334 return _detailedDump;
335 }
336
337
338 public void setDetailedDump(boolean detailedDump)
339 {
340 _detailedDump = detailedDump;
341 }
342
343
344 public boolean dispatch(Runnable job)
345 {
346 if (isRunning())
347 {
348 final int jobQ = _jobs.size();
349 final int idle = getIdleThreads();
350 if(_jobs.offer(job))
351 {
352
353 if (idle==0 || jobQ>idle)
354 {
355 int threads=_threadsStarted.get();
356 if (threads<_maxThreads)
357 startThread(threads);
358 }
359 return true;
360 }
361 }
362 return false;
363 }
364
365
366 public void execute(Runnable job)
367 {
368 if (!dispatch(job))
369 throw new RejectedExecutionException();
370 }
371
372
373
374
375
376 public void join() throws InterruptedException
377 {
378 synchronized (_joinLock)
379 {
380 while (isRunning())
381 _joinLock.wait();
382 }
383
384 while (isStopping())
385 Thread.sleep(1);
386 }
387
388
389
390
391
392 public int getThreads()
393 {
394 return _threadsStarted.get();
395 }
396
397
398
399
400
401 public int getIdleThreads()
402 {
403 return _threadsIdle.get();
404 }
405
406
407
408
409
410 public boolean isLowOnThreads()
411 {
412 return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
413 }
414
415
416 private boolean startThread(int threads)
417 {
418 final int next=threads+1;
419 if (!_threadsStarted.compareAndSet(threads,next))
420 return false;
421
422 boolean started=false;
423 try
424 {
425 Thread thread=newThread(_runnable);
426 thread.setDaemon(_daemon);
427 thread.setPriority(_priority);
428 thread.setName(_name+"-"+thread.getId());
429 _threads.add(thread);
430
431 thread.start();
432 started=true;
433 }
434 finally
435 {
436 if (!started)
437 _threadsStarted.decrementAndGet();
438 }
439 return started;
440 }
441
442
443 protected Thread newThread(Runnable runnable)
444 {
445 return new Thread(runnable);
446 }
447
448
449
450 public String dump()
451 {
452 return AggregateLifeCycle.dump(this);
453 }
454
455
456 public void dump(Appendable out, String indent) throws IOException
457 {
458 List<Object> dump = new ArrayList<Object>(getMaxThreads());
459 for (final Thread thread: _threads)
460 {
461 final StackTraceElement[] trace=thread.getStackTrace();
462 boolean inIdleJobPoll=false;
463 for (StackTraceElement t : trace)
464 {
465 if ("idleJobPoll".equals(t.getMethodName()))
466 {
467 inIdleJobPoll=true;
468 break;
469 }
470 }
471 final boolean idle=inIdleJobPoll;
472
473 if (_detailedDump)
474 {
475 dump.add(new Dumpable()
476 {
477 public void dump(Appendable out, String indent) throws IOException
478 {
479 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
480 if (!idle)
481 AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
482 }
483
484 public String dump()
485 {
486 return null;
487 }
488 });
489 }
490 else
491 {
492 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+trace[0]+(idle?" IDLE":""));
493 }
494 }
495
496 out.append(String.valueOf(this)).append("\n");
497 AggregateLifeCycle.dump(out,indent,dump);
498
499 }
500
501
502
503 @Override
504 public String toString()
505 {
506 return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
507 }
508
509 private Runnable idleJobPoll() throws InterruptedException
510 {
511 return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
512 }
513
514
515 private Runnable _runnable = new Runnable()
516 {
517 public void run()
518 {
519 boolean shrink=false;
520 try
521 {
522 Runnable job=_jobs.poll();
523 while (isRunning())
524 {
525
526 while (job!=null && isRunning())
527 {
528 job.run();
529 job=_jobs.poll();
530 }
531
532
533 try
534 {
535 _threadsIdle.incrementAndGet();
536
537 while (isRunning() && job==null)
538 {
539 if (_maxIdleTimeMs<=0)
540 job=_jobs.take();
541 else
542 {
543
544 final int size=_threadsStarted.get();
545 if (size>_minThreads)
546 {
547 long last=_lastShrink.get();
548 long now=System.currentTimeMillis();
549 if (last==0 || (now-last)>_maxIdleTimeMs)
550 {
551 shrink=_lastShrink.compareAndSet(last,now) &&
552 _threadsStarted.compareAndSet(size,size-1);
553 if (shrink)
554 return;
555 }
556 }
557 job=idleJobPoll();
558 }
559 }
560 }
561 finally
562 {
563 _threadsIdle.decrementAndGet();
564 }
565 }
566 }
567 catch(InterruptedException e)
568 {
569 Log.ignore(e);
570 }
571 catch(Exception e)
572 {
573 Log.warn(e);
574 }
575 finally
576 {
577 if (!shrink)
578 _threadsStarted.decrementAndGet();
579 _threads.remove(Thread.currentThread());
580 }
581 }
582 };
583
584
585
586
587
588
589
590 @SuppressWarnings("deprecation")
591 public boolean stopThread(long id)
592 {
593 for (Thread thread: _threads)
594 {
595 if (thread.getId()==id)
596 {
597 thread.stop();
598 return true;
599 }
600 }
601 return false;
602 }
603
604
605
606
607
608
609 public boolean interruptThread(long id)
610 {
611 for (Thread thread: _threads)
612 {
613 if (thread.getId()==id)
614 {
615 thread.interrupt();
616 return true;
617 }
618 }
619 return false;
620 }
621
622
623
624
625
626
627 public String dumpThread(long id)
628 {
629 for (Thread thread: _threads)
630 {
631 if (thread.getId()==id)
632 {
633 StringBuilder buf = new StringBuilder();
634 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
635 for (StackTraceElement element : thread.getStackTrace())
636 buf.append(" at ").append(element.toString()).append('\n');
637 return buf.toString();
638 }
639 }
640 return null;
641 }
642 }