View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  
15  package org.eclipse.jetty.util.thread;
16  
17  import java.util.concurrent.ArrayBlockingQueue;
18  import java.util.concurrent.BlockingQueue;
19  import java.util.concurrent.ConcurrentLinkedQueue;
20  import java.util.concurrent.Executor;
21  import java.util.concurrent.RejectedExecutionException;
22  import java.util.concurrent.TimeUnit;
23  import java.util.concurrent.atomic.AtomicInteger;
24  import java.util.concurrent.atomic.AtomicLong;
25  
26  import org.eclipse.jetty.util.BlockingArrayQueue;
27  import org.eclipse.jetty.util.component.AbstractLifeCycle;
28  import org.eclipse.jetty.util.component.LifeCycle;
29  import org.eclipse.jetty.util.log.Log;
30  
31  public class QueuedThreadPool extends AbstractLifeCycle implements ThreadPool, Executor
32  {
33      private final AtomicInteger _threadsStarted = new AtomicInteger();
34      private final AtomicInteger _threadsIdle = new AtomicInteger();
35      private final AtomicLong _lastShrink = new AtomicLong();
36      private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
37      private final Object _joinLock = new Object();
38      private BlockingQueue<Runnable> _jobs;
39      private String _name;
40      private int _maxIdleTimeMs=60000;
41      private int _maxThreads=254;
42      private int _minThreads=8;
43      private int _maxQueued=-1;
44      private int _priority=Thread.NORM_PRIORITY;
45      private boolean _daemon=false;
46      private int _maxStopTime=100;
47  
48      /* ------------------------------------------------------------------- */
49      /** Construct
50       */
51      public QueuedThreadPool()
52      {
53          _name="qtp"+super.hashCode();
54      }
55      
56      /* ------------------------------------------------------------------- */
57      /** Construct
58       */
59      public QueuedThreadPool(int maxThreads)
60      {
61          this();
62          setMaxThreads(maxThreads);
63      }
64      
65      /* ------------------------------------------------------------------- */
66      /** Construct
67       */
68      public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
69      {
70          this();
71          _jobs=jobQ;
72          _jobs.clear();
73      }
74      
75      
76      /* ------------------------------------------------------------ */
77      @Override
78      protected void doStart() throws Exception
79      {
80          super.doStart();
81          _threadsStarted.set(0);
82  
83          if (_jobs==null)
84          {
85              _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
86                  :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
87          }
88  
89          int threads=_threadsStarted.get();
90          while (isRunning() && threads<_minThreads)
91          {
92              startThread(threads); 
93              threads=_threadsStarted.get();  
94          }
95      }
96  
97      /* ------------------------------------------------------------ */
98      @Override
99      protected void doStop() throws Exception
100     {
101         super.doStop();
102         long start=System.currentTimeMillis();
103 
104         // let jobs complete naturally for a while
105         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
106             Thread.sleep(1);
107         
108         // kill queued jobs and flush out idle jobs
109         _jobs.clear();
110         Runnable noop = new Runnable(){public void run(){}};
111         for  (int i=_threadsIdle.get();i-->0;)
112             _jobs.offer(noop);
113         Thread.yield();
114 
115         // interrupt remaining threads
116         if (_threadsStarted.get()>0)
117             for (Thread thread : _threads)
118                 thread.interrupt();
119         
120         // wait for remaining threads to die
121         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
122         {
123             Thread.sleep(1);
124         }
125         Thread.yield();
126         int size=_threads.size();
127         if (size>0)
128             Log.warn(size+" threads could not be stopped");
129         
130         synchronized (_joinLock)
131         {
132             _joinLock.notifyAll();
133         }
134     }
135 
136     /* ------------------------------------------------------------ */
137     /** 
138      * Delegated to the named or anonymous Pool.
139      */
140     public void setDaemon(boolean daemon)
141     {
142         _daemon=daemon;
143     }
144     
145     /* ------------------------------------------------------------ */
146     /** Set the maximum thread idle time.
147      * Threads that are idle for longer than this period may be
148      * stopped.
149      * Delegated to the named or anonymous Pool.
150      * @see #getMaxIdleTimeMs
151      * @param maxIdleTimeMs Max idle time in ms.
152      */
153     public void setMaxIdleTimeMs(int maxIdleTimeMs)
154     {
155         _maxIdleTimeMs=maxIdleTimeMs;
156     }
157 
158     /* ------------------------------------------------------------ */
159     /**
160      * @param stopTimeMs maximum total time that stop() will wait for threads to die.
161      */
162     public void setMaxStopTimeMs(int stopTimeMs)
163     {
164         _maxStopTime = stopTimeMs;
165     }
166 
167     /* ------------------------------------------------------------ */
168     /** Set the maximum number of threads.
169      * Delegated to the named or anonymous Pool.
170      * @see #getMaxThreads
171      * @param maxThreads maximum number of threads.
172      */
173     public void setMaxThreads(int maxThreads)
174     {
175         if (isStarted() && maxThreads<_minThreads)
176             throw new IllegalArgumentException("!minThreads<maxThreads");
177         _maxThreads=maxThreads;
178     }
179 
180     /* ------------------------------------------------------------ */
181     /** Set the minimum number of threads.
182      * Delegated to the named or anonymous Pool.
183      * @see #getMinThreads
184      * @param minThreads minimum number of threads
185      */
186     public void setMinThreads(int minThreads)
187     {
188         if (isStarted() && (minThreads<=0 || minThreads>_maxThreads))
189             throw new IllegalArgumentException("!0<=minThreads<maxThreads");
190         _minThreads=minThreads;
191         
192         int threads=_threadsStarted.get();
193         while (isStarted() && threads<_minThreads)
194         {
195             startThread(threads); 
196             threads=_threadsStarted.get();  
197         }
198     }
199 
200     /* ------------------------------------------------------------ */
201     /** 
202      * @param name Name of the BoundedThreadPool to use when naming Threads.
203      */
204     public void setName(String name)
205     {
206         if (isRunning())
207             throw new IllegalStateException("started");
208         _name= name;
209     }
210 
211     /* ------------------------------------------------------------ */
212     /** Set the priority of the pool threads.
213      *  @param priority the new thread priority.
214      */
215     public void setThreadsPriority(int priority)
216     {
217         _priority=priority;
218     }
219     
220     /* ------------------------------------------------------------ */
221     /**
222      * @return maximum queue size
223      */
224     public int getMaxQueued()
225     {
226         return _maxQueued;
227     }
228     
229     /* ------------------------------------------------------------ */
230     /**
231      * @param max job queue size
232      */
233     public void setMaxQueued(int max)
234     {
235         if (isRunning())
236             throw new IllegalStateException("started");
237         _maxQueued=max;
238     }
239     
240     /* ------------------------------------------------------------ */
241     /** Get the maximum thread idle time.
242      * Delegated to the named or anonymous Pool.
243      * @see #setMaxIdleTimeMs
244      * @return Max idle time in ms.
245      */
246     public int getMaxIdleTimeMs()
247     {
248         return _maxIdleTimeMs;
249     } 
250     
251     /* ------------------------------------------------------------ */
252     /**
253      * @return maximum total time that stop() will wait for threads to die.
254      */
255     public int getMaxStopTimeMs()
256     {
257         return _maxStopTime;
258     }
259     
260     /* ------------------------------------------------------------ */
261     /** Set the maximum number of threads.
262      * Delegated to the named or anonymous Pool.
263      * @see #setMaxThreads
264      * @return maximum number of threads.
265      */
266     public int getMaxThreads()
267     {
268         return _maxThreads;
269     }
270 
271     /* ------------------------------------------------------------ */
272     /** Get the minimum number of threads.
273      * Delegated to the named or anonymous Pool.
274      * @see #setMinThreads
275      * @return minimum number of threads.
276      */
277     public int getMinThreads()
278     {
279         return _minThreads;
280     }
281 
282     /* ------------------------------------------------------------ */
283     /** 
284      * @return The name of the BoundedThreadPool.
285      */
286     public String getName()
287     {
288         return _name;
289     }
290 
291     /* ------------------------------------------------------------ */
292     /** Get the priority of the pool threads.
293      *  @return the priority of the pool threads.
294      */
295     public int getThreadsPriority()
296     {
297         return _priority;
298     }
299     
300     /* ------------------------------------------------------------ */
301     /** 
302      * Delegated to the named or anonymous Pool.
303      */
304     public boolean isDaemon()
305     {
306         return _daemon;
307     }
308 
309     
310     /* ------------------------------------------------------------ */
311     public boolean dispatch(Runnable job)
312     {
313         if (isRunning())
314         {
315             final int jobQ = _jobs.size();
316             final int idle = getIdleThreads();
317             if(_jobs.offer(job))
318             {
319                 // If we had no idle threads or the jobQ is greater than the idle threads
320                 if (idle==0 || jobQ>idle)
321                 {
322                     int threads=_threadsStarted.get();
323                     if (threads<_maxThreads)
324                         startThread(threads);
325                 }
326                 return true;
327             }
328         }
329         return false;
330     }
331     
332     /* ------------------------------------------------------------ */
333     public void execute(Runnable job)
334     {
335         if (!dispatch(job))
336             throw new RejectedExecutionException();
337     }
338 
339     /* ------------------------------------------------------------ */
340     /**
341      * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
342      */
343     public void join() throws InterruptedException
344     {   
345         synchronized (_joinLock)
346         {
347             while (isRunning())
348                 _joinLock.wait();
349         }
350         
351         while (isStopping())
352             Thread.sleep(1);
353     }
354 
355     /* ------------------------------------------------------------ */
356     /**
357      * @return The total number of threads currently in the pool
358      */
359     public int getThreads()
360     {
361         return _threadsStarted.get();
362     }
363 
364     /* ------------------------------------------------------------ */
365     /**
366      * @return The number of idle threads in the pool
367      */
368     public int getIdleThreads()
369     {
370         return _threadsIdle.get();
371     }
372     
373     /* ------------------------------------------------------------ */
374     /**
375      * @return True if the pool is at maxThreads and there are more queued jobs than idle threads
376      */
377     public boolean isLowOnThreads()
378     {
379         return _threadsStarted.get()==_maxThreads && _jobs.size()>_threadsIdle.get();
380     }
381 
382     /* ------------------------------------------------------------ */
383     private boolean startThread(int threads)
384     {
385         final int next=threads+1;
386         if (!_threadsStarted.compareAndSet(threads,next))
387             return false;
388         
389         boolean started=false;
390         try
391         {
392             Thread thread=newThread(_runnable);
393             thread.setDaemon(_daemon);
394             thread.setPriority(_priority);
395             thread.setName(_name+"-"+thread.getId());
396             _threads.add(thread);
397             
398             thread.start();
399             started=true;
400         }
401         finally
402         {
403             if (!started)
404                 _threadsStarted.decrementAndGet();
405         }
406         return started;
407     }
408     
409     /* ------------------------------------------------------------ */
410     protected Thread newThread(Runnable runnable)
411     {
412         return new Thread(runnable);
413     }
414 
415     /* ------------------------------------------------------------ */
416     @Override
417     public String toString()
418     {
419         return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
420     }
421     
422     /* ------------------------------------------------------------ */
423     private Runnable _runnable = new Runnable()
424     {
425         public void run()
426         {
427             boolean shrink=false;
428             try
429             {
430                 Runnable job=_jobs.poll();
431                 while (isRunning())
432                 {
433                     // Job loop
434                     while (job!=null && isRunning())
435                     {
436                         job.run();
437                         job=_jobs.poll();
438                     }
439                         
440                     // Idle loop
441                     try
442                     {
443                         _threadsIdle.incrementAndGet();
444 
445                         while (isRunning() && job==null)
446                         {
447                             if (_maxIdleTimeMs<=0)
448                                 job=_jobs.take();
449                             else
450                             {
451                                 // maybe we should shrink?
452                                 final int size=_threadsStarted.get();
453                                 if (size>_minThreads)
454                                 {
455                                     long last=_lastShrink.get();
456                                     long now=System.currentTimeMillis();
457                                     if (last==0 || (now-last)>_maxIdleTimeMs)
458                                     {
459                                         shrink=_lastShrink.compareAndSet(last,now) &&
460                                         _threadsStarted.compareAndSet(size,size-1);
461                                         if (shrink)
462                                             return;
463                                     }
464                                 }
465                                 job=_jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
466                             }
467                         }
468                     }
469                     finally
470                     {
471                         _threadsIdle.decrementAndGet();
472                     }
473                 }
474             }
475             catch(InterruptedException e)
476             {
477                 Log.ignore(e);
478             }
479             catch(Exception e)
480             {
481                 Log.warn(e);
482             }
483             finally
484             {
485                 if (!shrink)
486                     _threadsStarted.decrementAndGet();
487                 _threads.remove(Thread.currentThread());
488             }
489         }
490     };
491     
492     public String dump()
493     {
494         StringBuilder buf = new StringBuilder();
495         
496         for (Thread thread: _threads)
497         {
498             buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
499             for (StackTraceElement element : thread.getStackTrace())
500                 buf.append("  at ").append(element.toString()).append('\n');
501         }
502         
503         return buf.toString();
504     }
505     
506     /* ------------------------------------------------------------ */
507     /**
508      * @param id The thread ID to stop.
509      * @return true if the thread was found and stopped.
510      * @deprecated Use {@link #interruptThread(long)} in preference
511      */
512     @SuppressWarnings("deprecation")
513     public boolean stopThread(long id)
514     {
515         for (Thread thread: _threads)
516         {
517             if (thread.getId()==id)
518             {
519                 thread.stop();
520                 return true;
521             }
522         }
523         return false;
524     }
525     
526     /* ------------------------------------------------------------ */
527     /**
528      * @param id The thread ID to interrupt.
529      * @return true if the thread was found and interrupted.
530      */
531     public boolean interruptThread(long id)
532     {
533         for (Thread thread: _threads)
534         {
535             if (thread.getId()==id)
536             {
537                 thread.interrupt();
538                 return true;
539             }
540         }
541         return false;
542     }
543     
544     /* ------------------------------------------------------------ */
545     /**
546      * @param id The thread ID to interrupt.
547      * @return true if the thread was found and interrupted.
548      */
549     public String dumpThread(long id)
550     {
551         for (Thread thread: _threads)
552         {
553             if (thread.getId()==id)
554             {
555                 StringBuilder buf = new StringBuilder();
556                 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
557                 for (StackTraceElement element : thread.getStackTrace())
558                     buf.append("  at ").append(element.toString()).append('\n');
559                 return buf.toString();
560             }
561         }
562         return null;
563     }
564 }