1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.eclipse.jetty.util.thread;
16
17 import java.util.concurrent.ArrayBlockingQueue;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ConcurrentLinkedQueue;
20 import java.util.concurrent.Executor;
21 import java.util.concurrent.RejectedExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.concurrent.atomic.AtomicLong;
25
26 import org.eclipse.jetty.util.BlockingArrayQueue;
27 import org.eclipse.jetty.util.component.AbstractLifeCycle;
28 import org.eclipse.jetty.util.component.LifeCycle;
29 import org.eclipse.jetty.util.log.Log;
30
31
32 public class QueuedThreadPool extends AbstractLifeCycle implements ThreadPool, Executor
33 {
34 private final AtomicInteger _threadsStarted = new AtomicInteger();
35 private final AtomicInteger _threadsIdle = new AtomicInteger();
36 private final AtomicLong _lastShrink = new AtomicLong();
37 private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
38 private final Object _joinLock = new Object();
39 private BlockingQueue<Runnable> _jobs;
40 private String _name;
41 private int _maxIdleTimeMs=60000;
42 private int _maxThreads=254;
43 private int _minThreads=8;
44 private int _maxQueued=-1;
45 private int _priority=Thread.NORM_PRIORITY;
46 private boolean _daemon=false;
47 private int _maxStopTime=100;
48
49
50
51
52 public QueuedThreadPool()
53 {
54 _name="qtp"+super.hashCode();
55 }
56
57
58
59
60 public QueuedThreadPool(int maxThreads)
61 {
62 this();
63 setMaxThreads(maxThreads);
64 }
65
66
67
68
69 public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
70 {
71 this();
72 _jobs=jobQ;
73 _jobs.clear();
74 }
75
76
77
78 @Override
79 protected void doStart() throws Exception
80 {
81 super.doStart();
82 _threadsStarted.set(0);
83
84 if (_jobs==null)
85 {
86 _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
87 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
88 }
89
90 int threads=_threadsStarted.get();
91 while (isRunning() && threads<_minThreads)
92 {
93 startThread(threads);
94 threads=_threadsStarted.get();
95 }
96 }
97
98
99 @Override
100 protected void doStop() throws Exception
101 {
102 super.doStop();
103 long start=System.currentTimeMillis();
104
105
106 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
107 Thread.sleep(1);
108
109
110 _jobs.clear();
111 Runnable noop = new Runnable(){public void run(){}};
112 for (int i=_threadsIdle.get();i-->0;)
113 _jobs.offer(noop);
114 Thread.yield();
115
116
117 if (_threadsStarted.get()>0)
118 for (Thread thread : _threads)
119 thread.interrupt();
120
121
122 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
123 {
124 Thread.sleep(1);
125 }
126
127 if (_threads.size()>0)
128 Log.warn(_threads.size()+" threads could not be stopped");
129
130 synchronized (_joinLock)
131 {
132 _joinLock.notifyAll();
133 }
134 }
135
136
137
138
139
140 public void setDaemon(boolean daemon)
141 {
142 _daemon=daemon;
143 }
144
145
146
147
148
149
150
151
152
153 public void setMaxIdleTimeMs(int maxIdleTimeMs)
154 {
155 _maxIdleTimeMs=maxIdleTimeMs;
156 }
157
158
159
160
161
162 public void setMaxStopTimeMs(int stopTimeMs)
163 {
164 _maxStopTime = stopTimeMs;
165 }
166
167
168
169
170
171
172
173 public void setMaxThreads(int maxThreads)
174 {
175 if (isStarted() && maxThreads<_minThreads)
176 throw new IllegalArgumentException("!minThreads<maxThreads");
177 _maxThreads=maxThreads;
178 }
179
180
181
182
183
184
185
186 public void setMinThreads(int minThreads)
187 {
188 if (isStarted() && (minThreads<=0 || minThreads>_maxThreads))
189 throw new IllegalArgumentException("!0<=minThreads<maxThreads");
190 _minThreads=minThreads;
191
192 int threads=_threadsStarted.get();
193 while (isStarted() && threads<_minThreads)
194 {
195 startThread(threads);
196 threads=_threadsStarted.get();
197 }
198 }
199
200
201
202
203
204 public void setName(String name)
205 {
206 if (isRunning())
207 throw new IllegalStateException("started");
208 _name= name;
209 }
210
211
212
213
214
215 public void setThreadsPriority(int priority)
216 {
217 _priority=priority;
218 }
219
220
221
222
223
224 public int getMaxQueued()
225 {
226 return _maxQueued;
227 }
228
229
230
231
232
233 public void setMaxQueued(int max)
234 {
235 if (isRunning())
236 throw new IllegalStateException("started");
237 _maxQueued=max;
238 }
239
240
241
242
243
244
245
246 public int getMaxIdleTimeMs()
247 {
248 return _maxIdleTimeMs;
249 }
250
251
252
253
254
255 public int getMaxStopTimeMs()
256 {
257 return _maxStopTime;
258 }
259
260
261
262
263
264
265
266 public int getMaxThreads()
267 {
268 return _maxThreads;
269 }
270
271
272
273
274
275
276
277 public int getMinThreads()
278 {
279 return _minThreads;
280 }
281
282
283
284
285
286 public String getName()
287 {
288 return _name;
289 }
290
291
292
293
294
295 public int getThreadsPriority()
296 {
297 return _priority;
298 }
299
300
301
302
303
304 public boolean isDaemon()
305 {
306 return _daemon;
307 }
308
309
310
311 public boolean dispatch(Runnable job)
312 {
313 if (isRunning())
314 {
315 final int jobQ = _jobs.size();
316 final int idle = getIdleThreads();
317 if(_jobs.offer(job))
318 {
319
320 if (idle==0 || jobQ>idle)
321 {
322 int threads=_threadsStarted.get();
323 if (threads<_maxThreads)
324 startThread(threads);
325 }
326 return true;
327 }
328 }
329 return false;
330 }
331
332
333 public void execute(Runnable job)
334 {
335 if (!dispatch(job))
336 throw new RejectedExecutionException();
337 }
338
339
340
341
342
343 public void join() throws InterruptedException
344 {
345 synchronized (_joinLock)
346 {
347 while (isRunning())
348 _joinLock.wait();
349 }
350
351 while (isStopping())
352 Thread.sleep(1);
353 }
354
355
356
357
358
359 public int getThreads()
360 {
361 return _threadsStarted.get();
362 }
363
364
365
366
367
368 public int getIdleThreads()
369 {
370 return _threadsIdle.get();
371 }
372
373
374
375
376
377 public boolean isLowOnThreads()
378 {
379 return _threadsStarted.get()==_maxThreads && _jobs.size()>_threadsIdle.get();
380 }
381
382
383 private boolean startThread(int threads)
384 {
385 final int next=threads+1;
386 if (!_threadsStarted.compareAndSet(threads,next))
387 return false;
388
389 boolean started=false;
390 try
391 {
392 Thread thread=newThread(_runnable);
393 thread.setDaemon(_daemon);
394 thread.setPriority(_priority);
395 thread.setName(_name+"-"+thread.getId());
396 _threads.add(thread);
397
398 thread.start();
399 started=true;
400 }
401 finally
402 {
403 if (!started)
404 _threadsStarted.decrementAndGet();
405 }
406 return started;
407 }
408
409
410 protected Thread newThread(Runnable runnable)
411 {
412 return new Thread(runnable);
413 }
414
415
416 public String toString()
417 {
418 return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
419 }
420
421
422 private Runnable _runnable = new Runnable()
423 {
424 public void run()
425 {
426 boolean shrink=false;
427 try
428 {
429 Runnable job=_jobs.poll();
430 while (isRunning())
431 {
432
433 while (job!=null && isRunning())
434 {
435 job.run();
436 job=_jobs.poll();
437 }
438
439
440 try
441 {
442 _threadsIdle.incrementAndGet();
443
444 while (isRunning() && job==null)
445 {
446 if (_maxIdleTimeMs<=0)
447 job=_jobs.take();
448 else
449 {
450 job=_jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
451
452 if (job==null)
453 {
454
455 final int size=_threadsStarted.get();
456 if (size>_minThreads)
457 {
458 long last=_lastShrink.get();
459 long now=System.currentTimeMillis();
460 if (last==0 || (now-last)>_maxIdleTimeMs)
461 {
462 shrink=_lastShrink.compareAndSet(last,now) &&
463 _threadsStarted.compareAndSet(size,size-1);
464 if (shrink)
465 return;
466 }
467 }
468 }
469 }
470 }
471 }
472 finally
473 {
474 _threadsIdle.decrementAndGet();
475 }
476 }
477 }
478 catch(InterruptedException e)
479 {
480 Log.ignore(e);
481 }
482 catch(Exception e)
483 {
484 Log.warn(e);
485 }
486 finally
487 {
488 if (!shrink)
489 _threadsStarted.decrementAndGet();
490 _threads.remove(Thread.currentThread());
491 }
492 }
493 };
494
495 public String dump()
496 {
497 StringBuilder buf = new StringBuilder();
498
499 for (Thread thread: _threads)
500 {
501 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
502 for (StackTraceElement element : thread.getStackTrace())
503 buf.append(" at ").append(element.toString()).append('\n');
504 }
505
506 return buf.toString();
507 }
508
509
510
511
512
513
514
515 @SuppressWarnings("deprecation")
516 public boolean stopThread(long id)
517 {
518 for (Thread thread: _threads)
519 {
520 if (thread.getId()==id)
521 {
522 thread.stop();
523 return true;
524 }
525 }
526 return false;
527 }
528
529
530
531
532
533
534 public boolean interruptThread(long id)
535 {
536 for (Thread thread: _threads)
537 {
538 if (thread.getId()==id)
539 {
540 thread.interrupt();
541 return true;
542 }
543 }
544 return false;
545 }
546
547
548
549
550
551
552 public String dumpThread(long id)
553 {
554 for (Thread thread: _threads)
555 {
556 if (thread.getId()==id)
557 {
558 StringBuilder buf = new StringBuilder();
559 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
560 for (StackTraceElement element : thread.getStackTrace())
561 buf.append(" at ").append(element.toString()).append('\n');
562 return buf.toString();
563 }
564 }
565 return null;
566 }
567 }