View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.util.thread;
15  
16  import java.io.Serializable;
17  import java.util.ArrayList;
18  import java.util.HashSet;
19  import java.util.Iterator;
20  import java.util.List;
21  import java.util.Set;
22  
23  import org.eclipse.jetty.util.component.AbstractLifeCycle;
24  import org.eclipse.jetty.util.log.Log;
25  
26  /* ------------------------------------------------------------ */
27  /** A pool of threads.
28   * <p>
29   * Avoids the expense of thread creation by pooling threads after
30   * their run methods exit for reuse.
31   * <p>
32   * If an idle thread is available a job is directly dispatched,
33   * otherwise the job is queued.  After queuing a job, if the total
34   * number of threads is less than the maximum pool size, a new thread 
35   * is spawned.
36   * <p>
37   * 
38   */
39  public class OldQueuedThreadPool extends AbstractLifeCycle implements Serializable, ThreadPool
40  {
41      private static int __id;
42      
43      private String _name;
44      private Set _threads;
45      private List _idle;
46      private Runnable[] _jobs;
47      private int _nextJob;
48      private int _nextJobSlot;
49      private int _queued;
50      private int _maxQueued;
51      
52      private boolean _daemon;
53      private int _id;
54  
55      private final Object _lock = new Lock();
56      private final Object _threadsLock = new Lock();
57      private final Object _joinLock = new Lock();
58  
59      private long _lastShrink;
60      private int _maxIdleTimeMs=60000;
61      private int _maxThreads=250;
62      private int _minThreads=2;
63      private boolean _warned=false;
64      private int _lowThreads=0;
65      private int _priority= Thread.NORM_PRIORITY;
66      private int _spawnOrShrinkAt=0;
67      private int _maxStopTimeMs;
68  
69      
70      /* ------------------------------------------------------------------- */
71      /* Construct
72       */
73      public OldQueuedThreadPool()
74      {
75          _name="qtp"+__id++;
76      }
77      
78      /* ------------------------------------------------------------------- */
79      /* Construct
80       */
81      public OldQueuedThreadPool(int maxThreads)
82      {
83          this();
84          setMaxThreads(maxThreads);
85      }
86  
87      /* ------------------------------------------------------------ */
88      /** Run job.
89       * @return true 
90       */
91      public boolean dispatch(Runnable job) 
92      {  
93          if (!isRunning() || job==null)
94              return false;
95  
96          PoolThread thread=null;
97          boolean spawn=false;
98              
99          synchronized(_lock)
100         {
101             // Look for an idle thread
102             int idle=_idle.size();
103             if (idle>0)
104                 thread=(PoolThread)_idle.remove(idle-1);
105             else
106             {
107                 // queue the job
108                 _queued++;
109                 if (_queued>_maxQueued)
110                     _maxQueued=_queued;
111                 _jobs[_nextJobSlot++]=job;
112                 if (_nextJobSlot==_jobs.length)
113                     _nextJobSlot=0;
114                 if (_nextJobSlot==_nextJob)
115                 {
116                     // Grow the job queue
117                     Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
118                     int split=_jobs.length-_nextJob;
119                     if (split>0)
120                         System.arraycopy(_jobs,_nextJob,jobs,0,split);
121                     if (_nextJob!=0)
122                         System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
123                     
124                     _jobs=jobs;
125                     _nextJob=0;
126                     _nextJobSlot=_queued;
127                 }
128                   
129                 spawn=_queued>_spawnOrShrinkAt;
130             }
131         }
132         
133         if (thread!=null)
134         {
135             thread.dispatch(job);
136         }
137         else if (spawn)
138         {
139             newThread();
140         }
141         return true;
142     }
143 
144     /* ------------------------------------------------------------ */
145     /** Get the number of idle threads in the pool.
146      * @see #getThreads
147      * @return Number of threads
148      */
149     public int getIdleThreads()
150     {
151         return _idle==null?0:_idle.size();
152     }
153     
154     /* ------------------------------------------------------------ */
155     /**
156      * @return low resource threads threshhold
157      */
158     public int getLowThreads()
159     {
160         return _lowThreads;
161     }
162     
163     /* ------------------------------------------------------------ */
164     /**
165      * @return maximum queue size
166      */
167     public int getMaxQueued()
168     {
169         return _maxQueued;
170     }
171     
172     /* ------------------------------------------------------------ */
173     /** Get the maximum thread idle time.
174      * Delegated to the named or anonymous Pool.
175      * @see #setMaxIdleTimeMs
176      * @return Max idle time in ms.
177      */
178     public int getMaxIdleTimeMs()
179     {
180         return _maxIdleTimeMs;
181     }
182     
183     /* ------------------------------------------------------------ */
184     /** Set the maximum number of threads.
185      * Delegated to the named or anonymous Pool.
186      * @see #setMaxThreads
187      * @return maximum number of threads.
188      */
189     public int getMaxThreads()
190     {
191         return _maxThreads;
192     }
193 
194     /* ------------------------------------------------------------ */
195     /** Get the minimum number of threads.
196      * Delegated to the named or anonymous Pool.
197      * @see #setMinThreads
198      * @return minimum number of threads.
199      */
200     public int getMinThreads()
201     {
202         return _minThreads;
203     }
204 
205     /* ------------------------------------------------------------ */
206     /** 
207      * @return The name of the BoundedThreadPool.
208      */
209     public String getName()
210     {
211         return _name;
212     }
213 
214     /* ------------------------------------------------------------ */
215     /** Get the number of threads in the pool.
216      * @see #getIdleThreads
217      * @return Number of threads
218      */
219     public int getThreads()
220     {
221         return _threads.size();
222     }
223 
224     /* ------------------------------------------------------------ */
225     /** Get the priority of the pool threads.
226      *  @return the priority of the pool threads.
227      */
228     public int getThreadsPriority()
229     {
230         return _priority;
231     }
232 
233     /* ------------------------------------------------------------ */
234     public int getQueueSize()
235     {
236         return _queued;
237     }
238     
239     /* ------------------------------------------------------------ */
240     /**
241      * @return the spawnOrShrinkAt  The number of queued jobs (or idle threads) needed 
242      * before the thread pool is grown (or shrunk)
243      */
244     public int getSpawnOrShrinkAt()
245     {
246         return _spawnOrShrinkAt;
247     }
248 
249     /* ------------------------------------------------------------ */
250     /**
251      * @param spawnOrShrinkAt The number of queued jobs (or idle threads) needed 
252      * before the thread pool is grown (or shrunk)
253      */
254     public void setSpawnOrShrinkAt(int spawnOrShrinkAt)
255     {
256         _spawnOrShrinkAt=spawnOrShrinkAt;
257     }
258 
259     /* ------------------------------------------------------------ */
260     /**
261      * @return maximum total time that stop() will wait for threads to die.
262      */
263     public int getMaxStopTimeMs()
264     {
265         return _maxStopTimeMs;
266     }
267 
268     /* ------------------------------------------------------------ */
269     /**
270      * @param stopTimeMs maximum total time that stop() will wait for threads to die.
271      */
272     public void setMaxStopTimeMs(int stopTimeMs)
273     {
274         _maxStopTimeMs = stopTimeMs;
275     }
276 
277     /* ------------------------------------------------------------ */
278     /** 
279      * Delegated to the named or anonymous Pool.
280      */
281     public boolean isDaemon()
282     {
283         return _daemon;
284     }
285 
286     /* ------------------------------------------------------------ */
287     public boolean isLowOnThreads()
288     {
289         return _queued>_lowThreads;
290     }
291 
292     /* ------------------------------------------------------------ */
293     public void join() throws InterruptedException
294     {
295         synchronized (_joinLock)
296         {
297             while (isRunning())
298                 _joinLock.wait();
299         }
300         
301         // TODO remove this semi busy loop!
302         while (isStopping())
303             Thread.sleep(100);
304     }
305 
306     /* ------------------------------------------------------------ */
307     /** 
308      * Delegated to the named or anonymous Pool.
309      */
310     public void setDaemon(boolean daemon)
311     {
312         _daemon=daemon;
313     }
314 
315     /* ------------------------------------------------------------ */
316     /**
317      * @param lowThreads low resource threads threshhold
318      */
319     public void setLowThreads(int lowThreads)
320     {
321         _lowThreads = lowThreads;
322     }
323     
324     /* ------------------------------------------------------------ */
325     /** Set the maximum thread idle time.
326      * Threads that are idle for longer than this period may be
327      * stopped.
328      * Delegated to the named or anonymous Pool.
329      * @see #getMaxIdleTimeMs
330      * @param maxIdleTimeMs Max idle time in ms.
331      */
332     public void setMaxIdleTimeMs(int maxIdleTimeMs)
333     {
334         _maxIdleTimeMs=maxIdleTimeMs;
335     }
336 
337     /* ------------------------------------------------------------ */
338     /** Set the maximum number of threads.
339      * Delegated to the named or anonymous Pool.
340      * @see #getMaxThreads
341      * @param maxThreads maximum number of threads.
342      */
343     public void setMaxThreads(int maxThreads)
344     {
345         if (isStarted() && maxThreads<_minThreads)
346             throw new IllegalArgumentException("!minThreads<maxThreads");
347         _maxThreads=maxThreads;
348     }
349 
350     /* ------------------------------------------------------------ */
351     /** Set the minimum number of threads.
352      * Delegated to the named or anonymous Pool.
353      * @see #getMinThreads
354      * @param minThreads minimum number of threads
355      */
356     public void setMinThreads(int minThreads)
357     {
358         if (isStarted() && (minThreads<=0 || minThreads>_maxThreads))
359             throw new IllegalArgumentException("!0<=minThreads<maxThreads");
360         _minThreads=minThreads;
361         synchronized (_threadsLock)
362         {
363             while (isStarted() && _threads.size()<_minThreads)
364             {
365                 newThread();   
366             }
367         }
368     }
369 
370     /* ------------------------------------------------------------ */
371     /** 
372      * @param name Name of the BoundedThreadPool to use when naming Threads.
373      */
374     public void setName(String name)
375     {
376         _name= name;
377     }
378 
379     /* ------------------------------------------------------------ */
380     /** Set the priority of the pool threads.
381      *  @param priority the new thread priority.
382      */
383     public void setThreadsPriority(int priority)
384     {
385         _priority=priority;
386     }
387 
388     /* ------------------------------------------------------------ */
389     /* Start the BoundedThreadPool.
390      * Construct the minimum number of threads.
391      */
392     protected void doStart() throws Exception
393     {
394         if (_maxThreads<_minThreads || _minThreads<=0)
395             throw new IllegalArgumentException("!0<minThreads<maxThreads");
396         
397         _threads=new HashSet();
398         _idle=new ArrayList();
399         _jobs=new Runnable[_maxThreads];
400         
401         for (int i=0;i<_minThreads;i++)
402         {
403             newThread();
404         }   
405     }
406 
407     /* ------------------------------------------------------------ */
408     /** Stop the BoundedThreadPool.
409      * New jobs are no longer accepted,idle threads are interrupted
410      * and stopJob is called on active threads.
411      * The method then waits 
412      * min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
413      * stop, at which time killJob is called.
414      */
415     protected void doStop() throws Exception
416     {   
417         super.doStop();
418         
419         long start=System.currentTimeMillis();
420         for (int i=0;i<100;i++)
421         {
422             synchronized (_threadsLock)
423             {
424                 Iterator iter = _threads.iterator();
425                 while (iter.hasNext())
426                     ((Thread)iter.next()).interrupt();
427             }
428             
429             Thread.yield();
430             if (_threads.size()==0 || (_maxStopTimeMs>0 && _maxStopTimeMs < (System.currentTimeMillis()-start)))
431                break;
432             
433             try
434             {
435                 Thread.sleep(i*100);
436             }
437             catch(InterruptedException e){}
438             
439             
440         }
441 
442         // TODO perhaps force stops
443         if (_threads.size()>0)
444             Log.warn(_threads.size()+" threads could not be stopped");
445         
446         synchronized (_joinLock)
447         {
448             _joinLock.notifyAll();
449         }
450     }
451 
452     /* ------------------------------------------------------------ */
453     protected void newThread()
454     {
455         synchronized (_threadsLock)
456         {
457             if (_threads.size()<_maxThreads)
458             {
459                 PoolThread thread =new PoolThread();
460                 _threads.add(thread);
461                 thread.setName(thread.getId()+"@"+_name+"-"+_id++);
462                 thread.start(); 
463             }
464             else if (!_warned)    
465             {
466                 _warned=true;
467                 Log.debug("Max threads for {}",this);
468             }
469         }
470     }
471 
472     /* ------------------------------------------------------------ */
473     /** Stop a Job.
474      * This method is called by the Pool if a job needs to be stopped.
475      * The default implementation does nothing and should be extended by a
476      * derived thread pool class if special action is required.
477      * @param thread The thread allocated to the job, or null if no thread allocated.
478      * @param job The job object passed to run.
479      */
480     protected void stopJob(Thread thread, Object job)
481     {
482         thread.interrupt();
483     }
484     
485 
486     /* ------------------------------------------------------------ */
487     /** Pool Thread class.
488      * The PoolThread allows the threads job to be
489      * retrieved and active status to be indicated.
490      */
491     public class PoolThread extends Thread 
492     {
493         Runnable _job=null;
494 
495         /* ------------------------------------------------------------ */
496         PoolThread()
497         {
498             setDaemon(_daemon);
499             setPriority(_priority);
500         }
501         
502         /* ------------------------------------------------------------ */
503         /** BoundedThreadPool run.
504          * Loop getting jobs and handling them until idle or stopped.
505          */
506         public void run()
507         {
508             boolean idle=false;
509             Runnable job=null;
510             try
511             {
512                 while (isRunning())
513                 {   
514                     // Run any job that we have.
515                     if (job!=null)
516                     {
517                         final Runnable todo=job;
518                         job=null;
519                         idle=false;
520                         todo.run();
521                     }
522                     
523                     synchronized(_lock)
524                     {
525                         // is there a queued job?
526                         if (_queued>0)
527                         {
528                             _queued--;
529                             job=_jobs[_nextJob++];
530                             if (_nextJob==_jobs.length)
531                                 _nextJob=0;
532                             continue;
533                         }
534 
535                         // Should we shrink?
536                         final int threads=_threads.size();
537                         if (threads>_minThreads && 
538                             (threads>_maxThreads || 
539                              _idle.size()>_spawnOrShrinkAt))   
540                         {
541                             long now = System.currentTimeMillis();
542                             if ((now-_lastShrink)>getMaxIdleTimeMs())
543                             {
544                                 _lastShrink=now;
545                                 _idle.remove(this);
546                                 return;
547                             }
548                         }
549 
550                         if (!idle)
551                         {   
552                             // Add ourselves to the idle set.
553                             _idle.add(this);
554                             idle=true;
555                         }
556                     }
557 
558                     // We are idle
559                     // wait for a dispatched job
560                     synchronized (this)
561                     {
562                         if (_job==null)
563                             this.wait(getMaxIdleTimeMs());
564                         job=_job;
565                         _job=null;
566                     }
567                 }
568             }
569             catch (InterruptedException e)
570             {
571                 Log.ignore(e);
572             }
573             finally
574             {
575                 synchronized (_lock)
576                 {
577                     _idle.remove(this);
578                 }
579                 synchronized (_threadsLock)
580                 {
581                     _threads.remove(this);
582                 }
583                 synchronized (this)
584                 {
585                     job=_job;
586                 }
587                 
588                 // we died with a job! reschedule it
589                 if (job!=null)
590                 {
591                     OldQueuedThreadPool.this.dispatch(job);
592                 }
593             }
594         }
595         
596         /* ------------------------------------------------------------ */
597         void dispatch(Runnable job)
598         {
599             synchronized (this)
600             {
601                 _job=job;
602                 this.notify();
603             }
604         }
605     }
606 
607     private class Lock{}
608 }