1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.eclipse.jetty.util.thread;
16
17 import java.util.concurrent.ArrayBlockingQueue;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ConcurrentLinkedQueue;
20 import java.util.concurrent.Executor;
21 import java.util.concurrent.RejectedExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.concurrent.atomic.AtomicLong;
25
26 import org.eclipse.jetty.util.BlockingArrayQueue;
27 import org.eclipse.jetty.util.component.AbstractLifeCycle;
28 import org.eclipse.jetty.util.component.LifeCycle;
29 import org.eclipse.jetty.util.log.Log;
30
31 public class QueuedThreadPool extends AbstractLifeCycle implements ThreadPool, Executor
32 {
33 private final AtomicInteger _threadsStarted = new AtomicInteger();
34 private final AtomicInteger _threadsIdle = new AtomicInteger();
35 private final AtomicLong _lastShrink = new AtomicLong();
36 private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
37 private final Object _joinLock = new Object();
38 private BlockingQueue<Runnable> _jobs;
39 private String _name;
40 private int _maxIdleTimeMs=60000;
41 private int _maxThreads=254;
42 private int _minThreads=8;
43 private int _maxQueued=-1;
44 private int _priority=Thread.NORM_PRIORITY;
45 private boolean _daemon=false;
46 private int _maxStopTime=100;
47
48
49
50
51 public QueuedThreadPool()
52 {
53 _name="qtp"+super.hashCode();
54 }
55
56
57
58
59 public QueuedThreadPool(int maxThreads)
60 {
61 this();
62 setMaxThreads(maxThreads);
63 }
64
65
66
67
68 public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
69 {
70 this();
71 _jobs=jobQ;
72 _jobs.clear();
73 }
74
75
76
77 @Override
78 protected void doStart() throws Exception
79 {
80 super.doStart();
81 _threadsStarted.set(0);
82
83 if (_jobs==null)
84 {
85 _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
86 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
87 }
88
89 int threads=_threadsStarted.get();
90 while (isRunning() && threads<_minThreads)
91 {
92 startThread(threads);
93 threads=_threadsStarted.get();
94 }
95 }
96
97
98 @Override
99 protected void doStop() throws Exception
100 {
101 super.doStop();
102 long start=System.currentTimeMillis();
103
104
105 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
106 Thread.sleep(1);
107
108
109 _jobs.clear();
110 Runnable noop = new Runnable(){public void run(){}};
111 for (int i=_threadsIdle.get();i-->0;)
112 _jobs.offer(noop);
113 Thread.yield();
114
115
116 if (_threadsStarted.get()>0)
117 for (Thread thread : _threads)
118 thread.interrupt();
119
120
121 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
122 {
123 Thread.sleep(1);
124 }
125 Thread.yield();
126 int size=_threads.size();
127 if (size>0)
128 Log.warn(size+" threads could not be stopped");
129
130 synchronized (_joinLock)
131 {
132 _joinLock.notifyAll();
133 }
134 }
135
136
137
138
139
140 public void setDaemon(boolean daemon)
141 {
142 _daemon=daemon;
143 }
144
145
146
147
148
149
150
151
152
153 public void setMaxIdleTimeMs(int maxIdleTimeMs)
154 {
155 _maxIdleTimeMs=maxIdleTimeMs;
156 }
157
158
159
160
161
162 public void setMaxStopTimeMs(int stopTimeMs)
163 {
164 _maxStopTime = stopTimeMs;
165 }
166
167
168
169
170
171
172
173 public void setMaxThreads(int maxThreads)
174 {
175 if (isStarted() && maxThreads<_minThreads)
176 throw new IllegalArgumentException("!minThreads<maxThreads");
177 _maxThreads=maxThreads;
178 }
179
180
181
182
183
184
185
186 public void setMinThreads(int minThreads)
187 {
188 if (isStarted() && (minThreads<=0 || minThreads>_maxThreads))
189 throw new IllegalArgumentException("!0<=minThreads<maxThreads");
190 _minThreads=minThreads;
191
192 int threads=_threadsStarted.get();
193 while (isStarted() && threads<_minThreads)
194 {
195 startThread(threads);
196 threads=_threadsStarted.get();
197 }
198 }
199
200
201
202
203
204 public void setName(String name)
205 {
206 if (isRunning())
207 throw new IllegalStateException("started");
208 _name= name;
209 }
210
211
212
213
214
215 public void setThreadsPriority(int priority)
216 {
217 _priority=priority;
218 }
219
220
221
222
223
224 public int getMaxQueued()
225 {
226 return _maxQueued;
227 }
228
229
230
231
232
233 public void setMaxQueued(int max)
234 {
235 if (isRunning())
236 throw new IllegalStateException("started");
237 _maxQueued=max;
238 }
239
240
241
242
243
244
245
246 public int getMaxIdleTimeMs()
247 {
248 return _maxIdleTimeMs;
249 }
250
251
252
253
254
255 public int getMaxStopTimeMs()
256 {
257 return _maxStopTime;
258 }
259
260
261
262
263
264
265
266 public int getMaxThreads()
267 {
268 return _maxThreads;
269 }
270
271
272
273
274
275
276
277 public int getMinThreads()
278 {
279 return _minThreads;
280 }
281
282
283
284
285
286 public String getName()
287 {
288 return _name;
289 }
290
291
292
293
294
295 public int getThreadsPriority()
296 {
297 return _priority;
298 }
299
300
301
302
303
304 public boolean isDaemon()
305 {
306 return _daemon;
307 }
308
309
310
311 public boolean dispatch(Runnable job)
312 {
313 if (isRunning())
314 {
315 final int jobQ = _jobs.size();
316 final int idle = getIdleThreads();
317 if(_jobs.offer(job))
318 {
319
320 if (idle==0 || jobQ>idle)
321 {
322 int threads=_threadsStarted.get();
323 if (threads<_maxThreads)
324 startThread(threads);
325 }
326 return true;
327 }
328 }
329 return false;
330 }
331
332
333 public void execute(Runnable job)
334 {
335 if (!dispatch(job))
336 throw new RejectedExecutionException();
337 }
338
339
340
341
342
343 public void join() throws InterruptedException
344 {
345 synchronized (_joinLock)
346 {
347 while (isRunning())
348 _joinLock.wait();
349 }
350
351 while (isStopping())
352 Thread.sleep(1);
353 }
354
355
356
357
358
359 public int getThreads()
360 {
361 return _threadsStarted.get();
362 }
363
364
365
366
367
368 public int getIdleThreads()
369 {
370 return _threadsIdle.get();
371 }
372
373
374
375
376
377 public boolean isLowOnThreads()
378 {
379 return _threadsStarted.get()==_maxThreads && _jobs.size()>_threadsIdle.get();
380 }
381
382
383 private boolean startThread(int threads)
384 {
385 final int next=threads+1;
386 if (!_threadsStarted.compareAndSet(threads,next))
387 return false;
388
389 boolean started=false;
390 try
391 {
392 Thread thread=newThread(_runnable);
393 thread.setDaemon(_daemon);
394 thread.setPriority(_priority);
395 thread.setName(_name+"-"+thread.getId());
396 _threads.add(thread);
397
398 thread.start();
399 started=true;
400 }
401 finally
402 {
403 if (!started)
404 _threadsStarted.decrementAndGet();
405 }
406 return started;
407 }
408
409
410 protected Thread newThread(Runnable runnable)
411 {
412 return new Thread(runnable);
413 }
414
415
416 @Override
417 public String toString()
418 {
419 return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
420 }
421
422
423 private Runnable _runnable = new Runnable()
424 {
425 public void run()
426 {
427 boolean shrink=false;
428 try
429 {
430 Runnable job=_jobs.poll();
431 while (isRunning())
432 {
433
434 while (job!=null && isRunning())
435 {
436 job.run();
437 job=_jobs.poll();
438 }
439
440
441 try
442 {
443 _threadsIdle.incrementAndGet();
444
445 while (isRunning() && job==null)
446 {
447 if (_maxIdleTimeMs<=0)
448 job=_jobs.take();
449 else
450 {
451
452 final int size=_threadsStarted.get();
453 if (size>_minThreads)
454 {
455 long last=_lastShrink.get();
456 long now=System.currentTimeMillis();
457 if (last==0 || (now-last)>_maxIdleTimeMs)
458 {
459 shrink=_lastShrink.compareAndSet(last,now) &&
460 _threadsStarted.compareAndSet(size,size-1);
461 if (shrink)
462 return;
463 }
464 }
465 job=_jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
466 }
467 }
468 }
469 finally
470 {
471 _threadsIdle.decrementAndGet();
472 }
473 }
474 }
475 catch(InterruptedException e)
476 {
477 Log.ignore(e);
478 }
479 catch(Exception e)
480 {
481 Log.warn(e);
482 }
483 finally
484 {
485 if (!shrink)
486 _threadsStarted.decrementAndGet();
487 _threads.remove(Thread.currentThread());
488 }
489 }
490 };
491
492 public String dump()
493 {
494 StringBuilder buf = new StringBuilder();
495
496 for (Thread thread: _threads)
497 {
498 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
499 for (StackTraceElement element : thread.getStackTrace())
500 buf.append(" at ").append(element.toString()).append('\n');
501 }
502
503 return buf.toString();
504 }
505
506
507
508
509
510
511
512 @SuppressWarnings("deprecation")
513 public boolean stopThread(long id)
514 {
515 for (Thread thread: _threads)
516 {
517 if (thread.getId()==id)
518 {
519 thread.stop();
520 return true;
521 }
522 }
523 return false;
524 }
525
526
527
528
529
530
531 public boolean interruptThread(long id)
532 {
533 for (Thread thread: _threads)
534 {
535 if (thread.getId()==id)
536 {
537 thread.interrupt();
538 return true;
539 }
540 }
541 return false;
542 }
543
544
545
546
547
548
549 public String dumpThread(long id)
550 {
551 for (Thread thread: _threads)
552 {
553 if (thread.getId()==id)
554 {
555 StringBuilder buf = new StringBuilder();
556 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
557 for (StackTraceElement element : thread.getStackTrace())
558 buf.append(" at ").append(element.toString()).append('\n');
559 return buf.toString();
560 }
561 }
562 return null;
563 }
564 }