View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.util.thread;
15  
16  import java.io.Serializable;
17  import java.util.ArrayList;
18  import java.util.HashSet;
19  import java.util.Iterator;
20  import java.util.List;
21  import java.util.Set;
22  
23  import org.eclipse.jetty.util.component.AbstractLifeCycle;
24  import org.eclipse.jetty.util.log.Log;
25  
26  /* ------------------------------------------------------------ */
27  /** A pool of threads.
28   * <p>
29   * Avoids the expense of thread creation by pooling threads after
30   * their run methods exit for reuse.
31   * <p>
32   * If an idle thread is available a job is directly dispatched,
33   * otherwise the job is queued.  After queuing a job, if the total
34   * number of threads is less than the maximum pool size, a new thread 
35   * is spawned.
36   * <p>
37   * 
38   */
39  public class OldQueuedThreadPool extends AbstractLifeCycle implements Serializable, ThreadPool
40  {
41      private static int __id;
42      
43      private String _name;
44      private Set _threads;
45      private List _idle;
46      private Runnable[] _jobs;
47      private int _nextJob;
48      private int _nextJobSlot;
49      private int _queued;
50      private int _maxQueued;
51      
52      private boolean _daemon;
53      private int _id;
54  
55      private final Object _lock = new Lock();
56      private final Object _threadsLock = new Lock();
57      private final Object _joinLock = new Lock();
58  
59      private long _lastShrink;
60      private int _maxIdleTimeMs=60000;
61      private int _maxThreads=250;
62      private int _minThreads=2;
63      private boolean _warned=false;
64      private int _lowThreads=0;
65      private int _priority= Thread.NORM_PRIORITY;
66      private int _spawnOrShrinkAt=0;
67      private int _maxStopTimeMs;
68  
69      
70      /* ------------------------------------------------------------------- */
71      /* Construct
72       */
73      public OldQueuedThreadPool()
74      {
75          _name="qtp"+__id++;
76      }
77      
78      /* ------------------------------------------------------------------- */
79      /* Construct
80       */
81      public OldQueuedThreadPool(int maxThreads)
82      {
83          this();
84          setMaxThreads(maxThreads);
85      }
86  
87      /* ------------------------------------------------------------ */
88      /** Run job.
89       * @return true 
90       */
91      public boolean dispatch(Runnable job) 
92      {  
93          if (!isRunning() || job==null)
94              return false;
95  
96          PoolThread thread=null;
97          boolean spawn=false;
98              
99          synchronized(_lock)
100         {
101             // Look for an idle thread
102             int idle=_idle.size();
103             if (idle>0)
104                 thread=(PoolThread)_idle.remove(idle-1);
105             else
106             {
107                 // queue the job
108                 _queued++;
109                 if (_queued>_maxQueued)
110                     _maxQueued=_queued;
111                 _jobs[_nextJobSlot++]=job;
112                 if (_nextJobSlot==_jobs.length)
113                     _nextJobSlot=0;
114                 if (_nextJobSlot==_nextJob)
115                 {
116                     // Grow the job queue
117                     Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
118                     int split=_jobs.length-_nextJob;
119                     if (split>0)
120                         System.arraycopy(_jobs,_nextJob,jobs,0,split);
121                     if (_nextJob!=0)
122                         System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
123                     
124                     _jobs=jobs;
125                     _nextJob=0;
126                     _nextJobSlot=_queued;
127                 }
128                   
129                 spawn=_queued>_spawnOrShrinkAt;
130             }
131         }
132         
133         if (thread!=null)
134         {
135             thread.dispatch(job);
136         }
137         else if (spawn)
138         {
139             newThread();
140         }
141         return true;
142     }
143 
144     /* ------------------------------------------------------------ */
145     /** Get the number of idle threads in the pool.
146      * @see #getThreads
147      * @return Number of threads
148      */
149     public int getIdleThreads()
150     {
151         return _idle==null?0:_idle.size();
152     }
153     
154     /* ------------------------------------------------------------ */
155     /**
156      * @return low resource threads threshhold
157      */
158     public int getLowThreads()
159     {
160         return _lowThreads;
161     }
162     
163     /* ------------------------------------------------------------ */
164     /**
165      * @return maximum queue size
166      */
167     public int getMaxQueued()
168     {
169         return _maxQueued;
170     }
171     
172     /* ------------------------------------------------------------ */
173     /** Get the maximum thread idle time.
174      * Delegated to the named or anonymous Pool.
175      * @see #setMaxIdleTimeMs
176      * @return Max idle time in ms.
177      */
178     public int getMaxIdleTimeMs()
179     {
180         return _maxIdleTimeMs;
181     }
182     
183     /* ------------------------------------------------------------ */
184     /** Set the maximum number of threads.
185      * Delegated to the named or anonymous Pool.
186      * @see #setMaxThreads
187      * @return maximum number of threads.
188      */
189     public int getMaxThreads()
190     {
191         return _maxThreads;
192     }
193 
194     /* ------------------------------------------------------------ */
195     /** Get the minimum number of threads.
196      * Delegated to the named or anonymous Pool.
197      * @see #setMinThreads
198      * @return minimum number of threads.
199      */
200     public int getMinThreads()
201     {
202         return _minThreads;
203     }
204 
205     /* ------------------------------------------------------------ */
206     /** 
207      * @return The name of the BoundedThreadPool.
208      */
209     public String getName()
210     {
211         return _name;
212     }
213 
214     /* ------------------------------------------------------------ */
215     /** Get the number of threads in the pool.
216      * @see #getIdleThreads
217      * @return Number of threads
218      */
219     public int getThreads()
220     {
221         return _threads.size();
222     }
223 
224     /* ------------------------------------------------------------ */
225     /** Get the priority of the pool threads.
226      *  @return the priority of the pool threads.
227      */
228     public int getThreadsPriority()
229     {
230         return _priority;
231     }
232 
233     /* ------------------------------------------------------------ */
234     public int getQueueSize()
235     {
236         return _queued;
237     }
238     
239     /* ------------------------------------------------------------ */
240     /**
241      * @return the spawnOrShrinkAt  The number of queued jobs (or idle threads) needed 
242      * before the thread pool is grown (or shrunk)
243      */
244     public int getSpawnOrShrinkAt()
245     {
246         return _spawnOrShrinkAt;
247     }
248 
249     /* ------------------------------------------------------------ */
250     /**
251      * @param spawnOrShrinkAt The number of queued jobs (or idle threads) needed 
252      * before the thread pool is grown (or shrunk)
253      */
254     public void setSpawnOrShrinkAt(int spawnOrShrinkAt)
255     {
256         _spawnOrShrinkAt=spawnOrShrinkAt;
257     }
258 
259     /* ------------------------------------------------------------ */
260     /**
261      * @return maximum total time that stop() will wait for threads to die.
262      */
263     public int getMaxStopTimeMs()
264     {
265         return _maxStopTimeMs;
266     }
267 
268     /* ------------------------------------------------------------ */
269     /**
270      * @param stopTimeMs maximum total time that stop() will wait for threads to die.
271      */
272     public void setMaxStopTimeMs(int stopTimeMs)
273     {
274         _maxStopTimeMs = stopTimeMs;
275     }
276 
277     /* ------------------------------------------------------------ */
278     /** 
279      * Delegated to the named or anonymous Pool.
280      */
281     public boolean isDaemon()
282     {
283         return _daemon;
284     }
285 
286     /* ------------------------------------------------------------ */
287     public boolean isLowOnThreads()
288     {
289         return _queued>_lowThreads;
290     }
291 
292     /* ------------------------------------------------------------ */
293     public void join() throws InterruptedException
294     {
295         synchronized (_joinLock)
296         {
297             while (isRunning())
298                 _joinLock.wait();
299         }
300         
301         // TODO remove this semi busy loop!
302         while (isStopping())
303             Thread.sleep(100);
304     }
305 
306     /* ------------------------------------------------------------ */
307     /** 
308      * Delegated to the named or anonymous Pool.
309      */
310     public void setDaemon(boolean daemon)
311     {
312         _daemon=daemon;
313     }
314 
315     /* ------------------------------------------------------------ */
316     /**
317      * @param lowThreads low resource threads threshhold
318      */
319     public void setLowThreads(int lowThreads)
320     {
321         _lowThreads = lowThreads;
322     }
323     
324     /* ------------------------------------------------------------ */
325     /** Set the maximum thread idle time.
326      * Threads that are idle for longer than this period may be
327      * stopped.
328      * Delegated to the named or anonymous Pool.
329      * @see #getMaxIdleTimeMs
330      * @param maxIdleTimeMs Max idle time in ms.
331      */
332     public void setMaxIdleTimeMs(int maxIdleTimeMs)
333     {
334         _maxIdleTimeMs=maxIdleTimeMs;
335     }
336 
337     /* ------------------------------------------------------------ */
338     /** Set the maximum number of threads.
339      * Delegated to the named or anonymous Pool.
340      * @see #getMaxThreads
341      * @param maxThreads maximum number of threads.
342      */
343     public void setMaxThreads(int maxThreads)
344     {
345         if (isStarted() && maxThreads<_minThreads)
346             throw new IllegalArgumentException("!minThreads<maxThreads");
347         _maxThreads=maxThreads;
348     }
349 
350     /* ------------------------------------------------------------ */
351     /** Set the minimum number of threads.
352      * Delegated to the named or anonymous Pool.
353      * @see #getMinThreads
354      * @param minThreads minimum number of threads
355      */
356     public void setMinThreads(int minThreads)
357     {
358         if (isStarted() && (minThreads<=0 || minThreads>_maxThreads))
359             throw new IllegalArgumentException("!0<=minThreads<maxThreads");
360         _minThreads=minThreads;
361         synchronized (_threadsLock)
362         {
363             while (isStarted() && _threads.size()<_minThreads)
364             {
365                 newThread();   
366             }
367         }
368     }
369 
370     /* ------------------------------------------------------------ */
371     /** 
372      * @param name Name of the BoundedThreadPool to use when naming Threads.
373      */
374     public void setName(String name)
375     {
376         _name= name;
377     }
378 
379     /* ------------------------------------------------------------ */
380     /** Set the priority of the pool threads.
381      *  @param priority the new thread priority.
382      */
383     public void setThreadsPriority(int priority)
384     {
385         _priority=priority;
386     }
387 
388     /* ------------------------------------------------------------ */
389     /* Start the BoundedThreadPool.
390      * Construct the minimum number of threads.
391      */
392     @Override
393     protected void doStart() throws Exception
394     {
395         if (_maxThreads<_minThreads || _minThreads<=0)
396             throw new IllegalArgumentException("!0<minThreads<maxThreads");
397         
398         _threads=new HashSet();
399         _idle=new ArrayList();
400         _jobs=new Runnable[_maxThreads];
401         
402         for (int i=0;i<_minThreads;i++)
403         {
404             newThread();
405         }   
406     }
407 
408     /* ------------------------------------------------------------ */
409     /** Stop the BoundedThreadPool.
410      * New jobs are no longer accepted,idle threads are interrupted
411      * and stopJob is called on active threads.
412      * The method then waits 
413      * min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
414      * stop, at which time killJob is called.
415      */
416     @Override
417     protected void doStop() throws Exception
418     {   
419         super.doStop();
420         
421         long start=System.currentTimeMillis();
422         for (int i=0;i<100;i++)
423         {
424             synchronized (_threadsLock)
425             {
426                 Iterator iter = _threads.iterator();
427                 while (iter.hasNext())
428                     ((Thread)iter.next()).interrupt();
429             }
430             
431             Thread.yield();
432             if (_threads.size()==0 || (_maxStopTimeMs>0 && _maxStopTimeMs < (System.currentTimeMillis()-start)))
433                break;
434             
435             try
436             {
437                 Thread.sleep(i*100);
438             }
439             catch(InterruptedException e){}
440             
441             
442         }
443 
444         // TODO perhaps force stops
445         int size=_threads.size();
446         if (size>0)
447             Log.warn(size+" threads could not be stopped");
448         
449         synchronized (_joinLock)
450         {
451             _joinLock.notifyAll();
452         }
453     }
454 
455     /* ------------------------------------------------------------ */
456     protected void newThread()
457     {
458         synchronized (_threadsLock)
459         {
460             if (_threads.size()<_maxThreads)
461             {
462                 PoolThread thread =new PoolThread();
463                 _threads.add(thread);
464                 thread.setName(thread.getId()+"@"+_name+"-"+_id++);
465                 thread.start(); 
466             }
467             else if (!_warned)    
468             {
469                 _warned=true;
470                 Log.debug("Max threads for {}",this);
471             }
472         }
473     }
474 
475     /* ------------------------------------------------------------ */
476     /** Stop a Job.
477      * This method is called by the Pool if a job needs to be stopped.
478      * The default implementation does nothing and should be extended by a
479      * derived thread pool class if special action is required.
480      * @param thread The thread allocated to the job, or null if no thread allocated.
481      * @param job The job object passed to run.
482      */
483     protected void stopJob(Thread thread, Object job)
484     {
485         thread.interrupt();
486     }
487     
488 
489     /* ------------------------------------------------------------ */
490     /** Pool Thread class.
491      * The PoolThread allows the threads job to be
492      * retrieved and active status to be indicated.
493      */
494     public class PoolThread extends Thread 
495     {
496         Runnable _job=null;
497 
498         /* ------------------------------------------------------------ */
499         PoolThread()
500         {
501             setDaemon(_daemon);
502             setPriority(_priority);
503         }
504         
505         /* ------------------------------------------------------------ */
506         /** BoundedThreadPool run.
507          * Loop getting jobs and handling them until idle or stopped.
508          */
509         @Override
510         public void run()
511         {
512             boolean idle=false;
513             Runnable job=null;
514             try
515             {
516                 while (isRunning())
517                 {   
518                     // Run any job that we have.
519                     if (job!=null)
520                     {
521                         final Runnable todo=job;
522                         job=null;
523                         idle=false;
524                         todo.run();
525                     }
526                     
527                     synchronized(_lock)
528                     {
529                         // is there a queued job?
530                         if (_queued>0)
531                         {
532                             _queued--;
533                             job=_jobs[_nextJob++];
534                             if (_nextJob==_jobs.length)
535                                 _nextJob=0;
536                             continue;
537                         }
538 
539                         // Should we shrink?
540                         final int threads=_threads.size();
541                         if (threads>_minThreads && 
542                             (threads>_maxThreads || 
543                              _idle.size()>_spawnOrShrinkAt))   
544                         {
545                             long now = System.currentTimeMillis();
546                             if ((now-_lastShrink)>getMaxIdleTimeMs())
547                             {
548                                 _lastShrink=now;
549                                 _idle.remove(this);
550                                 return;
551                             }
552                         }
553 
554                         if (!idle)
555                         {   
556                             // Add ourselves to the idle set.
557                             _idle.add(this);
558                             idle=true;
559                         }
560                     }
561 
562                     // We are idle
563                     // wait for a dispatched job
564                     synchronized (this)
565                     {
566                         if (_job==null)
567                             this.wait(getMaxIdleTimeMs());
568                         job=_job;
569                         _job=null;
570                     }
571                 }
572             }
573             catch (InterruptedException e)
574             {
575                 Log.ignore(e);
576             }
577             finally
578             {
579                 synchronized (_lock)
580                 {
581                     _idle.remove(this);
582                 }
583                 synchronized (_threadsLock)
584                 {
585                     _threads.remove(this);
586                 }
587                 synchronized (this)
588                 {
589                     job=_job;
590                 }
591                 
592                 // we died with a job! reschedule it
593                 if (job!=null)
594                 {
595                     OldQueuedThreadPool.this.dispatch(job);
596                 }
597             }
598         }
599         
600         /* ------------------------------------------------------------ */
601         void dispatch(Runnable job)
602         {
603             synchronized (this)
604             {
605                 _job=job;
606                 this.notify();
607             }
608         }
609     }
610 
611     private class Lock{}
612 }