1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.util.thread;
15
16 import java.io.Serializable;
17 import java.util.ArrayList;
18 import java.util.HashSet;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Set;
22
23 import org.eclipse.jetty.util.component.AbstractLifeCycle;
24 import org.eclipse.jetty.util.log.Log;
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 public class OldQueuedThreadPool extends AbstractLifeCycle implements Serializable, ThreadPool
40 {
41 private static int __id;
42
43 private String _name;
44 private Set _threads;
45 private List _idle;
46 private Runnable[] _jobs;
47 private int _nextJob;
48 private int _nextJobSlot;
49 private int _queued;
50 private int _maxQueued;
51
52 private boolean _daemon;
53 private int _id;
54
55 private final Object _lock = new Lock();
56 private final Object _threadsLock = new Lock();
57 private final Object _joinLock = new Lock();
58
59 private long _lastShrink;
60 private int _maxIdleTimeMs=60000;
61 private int _maxThreads=250;
62 private int _minThreads=2;
63 private boolean _warned=false;
64 private int _lowThreads=0;
65 private int _priority= Thread.NORM_PRIORITY;
66 private int _spawnOrShrinkAt=0;
67 private int _maxStopTimeMs;
68
69
70
71
72
73 public OldQueuedThreadPool()
74 {
75 _name="qtp"+__id++;
76 }
77
78
79
80
81 public OldQueuedThreadPool(int maxThreads)
82 {
83 this();
84 setMaxThreads(maxThreads);
85 }
86
87
88
89
90
91 public boolean dispatch(Runnable job)
92 {
93 if (!isRunning() || job==null)
94 return false;
95
96 PoolThread thread=null;
97 boolean spawn=false;
98
99 synchronized(_lock)
100 {
101
102 int idle=_idle.size();
103 if (idle>0)
104 thread=(PoolThread)_idle.remove(idle-1);
105 else
106 {
107
108 _queued++;
109 if (_queued>_maxQueued)
110 _maxQueued=_queued;
111 _jobs[_nextJobSlot++]=job;
112 if (_nextJobSlot==_jobs.length)
113 _nextJobSlot=0;
114 if (_nextJobSlot==_nextJob)
115 {
116
117 Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
118 int split=_jobs.length-_nextJob;
119 if (split>0)
120 System.arraycopy(_jobs,_nextJob,jobs,0,split);
121 if (_nextJob!=0)
122 System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
123
124 _jobs=jobs;
125 _nextJob=0;
126 _nextJobSlot=_queued;
127 }
128
129 spawn=_queued>_spawnOrShrinkAt;
130 }
131 }
132
133 if (thread!=null)
134 {
135 thread.dispatch(job);
136 }
137 else if (spawn)
138 {
139 newThread();
140 }
141 return true;
142 }
143
144
145
146
147
148
149 public int getIdleThreads()
150 {
151 return _idle==null?0:_idle.size();
152 }
153
154
155
156
157
158 public int getLowThreads()
159 {
160 return _lowThreads;
161 }
162
163
164
165
166
167 public int getMaxQueued()
168 {
169 return _maxQueued;
170 }
171
172
173
174
175
176
177
178 public int getMaxIdleTimeMs()
179 {
180 return _maxIdleTimeMs;
181 }
182
183
184
185
186
187
188
189 public int getMaxThreads()
190 {
191 return _maxThreads;
192 }
193
194
195
196
197
198
199
200 public int getMinThreads()
201 {
202 return _minThreads;
203 }
204
205
206
207
208
209 public String getName()
210 {
211 return _name;
212 }
213
214
215
216
217
218
219 public int getThreads()
220 {
221 return _threads.size();
222 }
223
224
225
226
227
228 public int getThreadsPriority()
229 {
230 return _priority;
231 }
232
233
234 public int getQueueSize()
235 {
236 return _queued;
237 }
238
239
240
241
242
243
244 public int getSpawnOrShrinkAt()
245 {
246 return _spawnOrShrinkAt;
247 }
248
249
250
251
252
253
254 public void setSpawnOrShrinkAt(int spawnOrShrinkAt)
255 {
256 _spawnOrShrinkAt=spawnOrShrinkAt;
257 }
258
259
260
261
262
263 public int getMaxStopTimeMs()
264 {
265 return _maxStopTimeMs;
266 }
267
268
269
270
271
272 public void setMaxStopTimeMs(int stopTimeMs)
273 {
274 _maxStopTimeMs = stopTimeMs;
275 }
276
277
278
279
280
281 public boolean isDaemon()
282 {
283 return _daemon;
284 }
285
286
287 public boolean isLowOnThreads()
288 {
289 return _queued>_lowThreads;
290 }
291
292
293 public void join() throws InterruptedException
294 {
295 synchronized (_joinLock)
296 {
297 while (isRunning())
298 _joinLock.wait();
299 }
300
301
302 while (isStopping())
303 Thread.sleep(100);
304 }
305
306
307
308
309
310 public void setDaemon(boolean daemon)
311 {
312 _daemon=daemon;
313 }
314
315
316
317
318
319 public void setLowThreads(int lowThreads)
320 {
321 _lowThreads = lowThreads;
322 }
323
324
325
326
327
328
329
330
331
332 public void setMaxIdleTimeMs(int maxIdleTimeMs)
333 {
334 _maxIdleTimeMs=maxIdleTimeMs;
335 }
336
337
338
339
340
341
342
343 public void setMaxThreads(int maxThreads)
344 {
345 if (isStarted() && maxThreads<_minThreads)
346 throw new IllegalArgumentException("!minThreads<maxThreads");
347 _maxThreads=maxThreads;
348 }
349
350
351
352
353
354
355
356 public void setMinThreads(int minThreads)
357 {
358 if (isStarted() && (minThreads<=0 || minThreads>_maxThreads))
359 throw new IllegalArgumentException("!0<=minThreads<maxThreads");
360 _minThreads=minThreads;
361 synchronized (_threadsLock)
362 {
363 while (isStarted() && _threads.size()<_minThreads)
364 {
365 newThread();
366 }
367 }
368 }
369
370
371
372
373
374 public void setName(String name)
375 {
376 _name= name;
377 }
378
379
380
381
382
383 public void setThreadsPriority(int priority)
384 {
385 _priority=priority;
386 }
387
388
389
390
391
392 @Override
393 protected void doStart() throws Exception
394 {
395 if (_maxThreads<_minThreads || _minThreads<=0)
396 throw new IllegalArgumentException("!0<minThreads<maxThreads");
397
398 _threads=new HashSet();
399 _idle=new ArrayList();
400 _jobs=new Runnable[_maxThreads];
401
402 for (int i=0;i<_minThreads;i++)
403 {
404 newThread();
405 }
406 }
407
408
409
410
411
412
413
414
415
416 @Override
417 protected void doStop() throws Exception
418 {
419 super.doStop();
420
421 long start=System.currentTimeMillis();
422 for (int i=0;i<100;i++)
423 {
424 synchronized (_threadsLock)
425 {
426 Iterator iter = _threads.iterator();
427 while (iter.hasNext())
428 ((Thread)iter.next()).interrupt();
429 }
430
431 Thread.yield();
432 if (_threads.size()==0 || (_maxStopTimeMs>0 && _maxStopTimeMs < (System.currentTimeMillis()-start)))
433 break;
434
435 try
436 {
437 Thread.sleep(i*100);
438 }
439 catch(InterruptedException e){}
440
441
442 }
443
444
445 int size=_threads.size();
446 if (size>0)
447 Log.warn(size+" threads could not be stopped");
448
449 synchronized (_joinLock)
450 {
451 _joinLock.notifyAll();
452 }
453 }
454
455
456 protected void newThread()
457 {
458 synchronized (_threadsLock)
459 {
460 if (_threads.size()<_maxThreads)
461 {
462 PoolThread thread =new PoolThread();
463 _threads.add(thread);
464 thread.setName(thread.getId()+"@"+_name+"-"+_id++);
465 thread.start();
466 }
467 else if (!_warned)
468 {
469 _warned=true;
470 Log.debug("Max threads for {}",this);
471 }
472 }
473 }
474
475
476
477
478
479
480
481
482
483 protected void stopJob(Thread thread, Object job)
484 {
485 thread.interrupt();
486 }
487
488
489
490
491
492
493
494 public class PoolThread extends Thread
495 {
496 Runnable _job=null;
497
498
499 PoolThread()
500 {
501 setDaemon(_daemon);
502 setPriority(_priority);
503 }
504
505
506
507
508
509 @Override
510 public void run()
511 {
512 boolean idle=false;
513 Runnable job=null;
514 try
515 {
516 while (isRunning())
517 {
518
519 if (job!=null)
520 {
521 final Runnable todo=job;
522 job=null;
523 idle=false;
524 todo.run();
525 }
526
527 synchronized(_lock)
528 {
529
530 if (_queued>0)
531 {
532 _queued--;
533 job=_jobs[_nextJob++];
534 if (_nextJob==_jobs.length)
535 _nextJob=0;
536 continue;
537 }
538
539
540 final int threads=_threads.size();
541 if (threads>_minThreads &&
542 (threads>_maxThreads ||
543 _idle.size()>_spawnOrShrinkAt))
544 {
545 long now = System.currentTimeMillis();
546 if ((now-_lastShrink)>getMaxIdleTimeMs())
547 {
548 _lastShrink=now;
549 _idle.remove(this);
550 return;
551 }
552 }
553
554 if (!idle)
555 {
556
557 _idle.add(this);
558 idle=true;
559 }
560 }
561
562
563
564 synchronized (this)
565 {
566 if (_job==null)
567 this.wait(getMaxIdleTimeMs());
568 job=_job;
569 _job=null;
570 }
571 }
572 }
573 catch (InterruptedException e)
574 {
575 Log.ignore(e);
576 }
577 finally
578 {
579 synchronized (_lock)
580 {
581 _idle.remove(this);
582 }
583 synchronized (_threadsLock)
584 {
585 _threads.remove(this);
586 }
587 synchronized (this)
588 {
589 job=_job;
590 }
591
592
593 if (job!=null)
594 {
595 OldQueuedThreadPool.this.dispatch(job);
596 }
597 }
598 }
599
600
601 void dispatch(Runnable job)
602 {
603 synchronized (this)
604 {
605 _job=job;
606 this.notify();
607 }
608 }
609 }
610
611 private class Lock{}
612 }