View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  
15  package org.eclipse.jetty.util.thread;
16  
17  import java.util.concurrent.ArrayBlockingQueue;
18  import java.util.concurrent.BlockingQueue;
19  import java.util.concurrent.ConcurrentLinkedQueue;
20  import java.util.concurrent.Executor;
21  import java.util.concurrent.RejectedExecutionException;
22  import java.util.concurrent.TimeUnit;
23  import java.util.concurrent.atomic.AtomicInteger;
24  import java.util.concurrent.atomic.AtomicLong;
25  
26  import org.eclipse.jetty.util.BlockingArrayQueue;
27  import org.eclipse.jetty.util.component.AbstractLifeCycle;
28  import org.eclipse.jetty.util.component.LifeCycle;
29  import org.eclipse.jetty.util.log.Log;
30  
31  
32  public class QueuedThreadPool extends AbstractLifeCycle implements ThreadPool, Executor
33  {
34      private final AtomicInteger _threadsStarted = new AtomicInteger();
35      private final AtomicInteger _threadsIdle = new AtomicInteger();
36      private final AtomicLong _lastShrink = new AtomicLong();
37      private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
38      private final Object _joinLock = new Object();
39      private BlockingQueue<Runnable> _jobs;
40      private String _name;
41      private int _maxIdleTimeMs=60000;
42      private int _maxThreads=254;
43      private int _minThreads=8;
44      private int _maxQueued=-1;
45      private int _priority=Thread.NORM_PRIORITY;
46      private boolean _daemon=false;
47      private int _maxStopTime=100;
48  
49      /* ------------------------------------------------------------------- */
50      /* Construct
51       */
52      public QueuedThreadPool()
53      {
54          _name="qtp"+super.hashCode();
55      }
56      
57      /* ------------------------------------------------------------------- */
58      /* Construct
59       */
60      public QueuedThreadPool(int maxThreads)
61      {
62          this();
63          setMaxThreads(maxThreads);
64      }
65      
66      /* ------------------------------------------------------------------- */
67      /* Construct
68       */
69      public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
70      {
71          this();
72          _jobs=jobQ;
73          _jobs.clear();
74      }
75      
76      
77      /* ------------------------------------------------------------ */
78      @Override
79      protected void doStart() throws Exception
80      {
81          super.doStart();
82          _threadsStarted.set(0);
83  
84          if (_jobs==null)
85          {
86              _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
87                  :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
88          }
89  
90          int threads=_threadsStarted.get();
91          while (isRunning() && threads<_minThreads)
92          {
93              startThread(threads); 
94              threads=_threadsStarted.get();  
95          }
96      }
97  
98      /* ------------------------------------------------------------ */
99      @Override
100     protected void doStop() throws Exception
101     {
102         super.doStop();
103         long start=System.currentTimeMillis();
104 
105         // let jobs complete naturally for a while
106         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
107             Thread.sleep(1);
108         
109         // kill queued jobs and flush out idle jobs
110         _jobs.clear();
111         Runnable noop = new Runnable(){public void run(){}};
112         for  (int i=_threadsIdle.get();i-->0;)
113             _jobs.offer(noop);
114         Thread.yield();
115 
116         // interrupt remaining threads
117         if (_threadsStarted.get()>0)
118             for (Thread thread : _threads)
119                 thread.interrupt();
120         
121         // wait for remaining threads to die
122         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
123         {
124             Thread.sleep(1);
125         }
126             
127         if (_threads.size()>0)
128             Log.warn(_threads.size()+" threads could not be stopped");
129         
130         synchronized (_joinLock)
131         {
132             _joinLock.notifyAll();
133         }
134     }
135 
136     /* ------------------------------------------------------------ */
137     /** 
138      * Delegated to the named or anonymous Pool.
139      */
140     public void setDaemon(boolean daemon)
141     {
142         _daemon=daemon;
143     }
144     
145     /* ------------------------------------------------------------ */
146     /** Set the maximum thread idle time.
147      * Threads that are idle for longer than this period may be
148      * stopped.
149      * Delegated to the named or anonymous Pool.
150      * @see #getMaxIdleTimeMs
151      * @param maxIdleTimeMs Max idle time in ms.
152      */
153     public void setMaxIdleTimeMs(int maxIdleTimeMs)
154     {
155         _maxIdleTimeMs=maxIdleTimeMs;
156     }
157 
158     /* ------------------------------------------------------------ */
159     /**
160      * @param stopTimeMs maximum total time that stop() will wait for threads to die.
161      */
162     public void setMaxStopTimeMs(int stopTimeMs)
163     {
164         _maxStopTime = stopTimeMs;
165     }
166 
167     /* ------------------------------------------------------------ */
168     /** Set the maximum number of threads.
169      * Delegated to the named or anonymous Pool.
170      * @see #getMaxThreads
171      * @param maxThreads maximum number of threads.
172      */
173     public void setMaxThreads(int maxThreads)
174     {
175         if (isStarted() && maxThreads<_minThreads)
176             throw new IllegalArgumentException("!minThreads<maxThreads");
177         _maxThreads=maxThreads;
178     }
179 
180     /* ------------------------------------------------------------ */
181     /** Set the minimum number of threads.
182      * Delegated to the named or anonymous Pool.
183      * @see #getMinThreads
184      * @param minThreads minimum number of threads
185      */
186     public void setMinThreads(int minThreads)
187     {
188         if (isStarted() && (minThreads<=0 || minThreads>_maxThreads))
189             throw new IllegalArgumentException("!0<=minThreads<maxThreads");
190         _minThreads=minThreads;
191         
192         int threads=_threadsStarted.get();
193         while (isStarted() && threads<_minThreads)
194         {
195             startThread(threads); 
196             threads=_threadsStarted.get();  
197         }
198     }
199 
200     /* ------------------------------------------------------------ */
201     /** 
202      * @param name Name of the BoundedThreadPool to use when naming Threads.
203      */
204     public void setName(String name)
205     {
206         if (isRunning())
207             throw new IllegalStateException("started");
208         _name= name;
209     }
210 
211     /* ------------------------------------------------------------ */
212     /** Set the priority of the pool threads.
213      *  @param priority the new thread priority.
214      */
215     public void setThreadsPriority(int priority)
216     {
217         _priority=priority;
218     }
219     
220     /* ------------------------------------------------------------ */
221     /**
222      * @return maximum queue size
223      */
224     public int getMaxQueued()
225     {
226         return _maxQueued;
227     }
228     
229     /* ------------------------------------------------------------ */
230     /**
231      * @param max job queue size
232      */
233     public void setMaxQueued(int max)
234     {
235         if (isRunning())
236             throw new IllegalStateException("started");
237         _maxQueued=max;
238     }
239     
240     /* ------------------------------------------------------------ */
241     /** Get the maximum thread idle time.
242      * Delegated to the named or anonymous Pool.
243      * @see #setMaxIdleTimeMs
244      * @return Max idle time in ms.
245      */
246     public int getMaxIdleTimeMs()
247     {
248         return _maxIdleTimeMs;
249     } 
250     
251     /* ------------------------------------------------------------ */
252     /**
253      * @return maximum total time that stop() will wait for threads to die.
254      */
255     public int getMaxStopTimeMs()
256     {
257         return _maxStopTime;
258     }
259     
260     /* ------------------------------------------------------------ */
261     /** Set the maximum number of threads.
262      * Delegated to the named or anonymous Pool.
263      * @see #setMaxThreads
264      * @return maximum number of threads.
265      */
266     public int getMaxThreads()
267     {
268         return _maxThreads;
269     }
270 
271     /* ------------------------------------------------------------ */
272     /** Get the minimum number of threads.
273      * Delegated to the named or anonymous Pool.
274      * @see #setMinThreads
275      * @return minimum number of threads.
276      */
277     public int getMinThreads()
278     {
279         return _minThreads;
280     }
281 
282     /* ------------------------------------------------------------ */
283     /** 
284      * @return The name of the BoundedThreadPool.
285      */
286     public String getName()
287     {
288         return _name;
289     }
290 
291     /* ------------------------------------------------------------ */
292     /** Get the priority of the pool threads.
293      *  @return the priority of the pool threads.
294      */
295     public int getThreadsPriority()
296     {
297         return _priority;
298     }
299     
300     /* ------------------------------------------------------------ */
301     /** 
302      * Delegated to the named or anonymous Pool.
303      */
304     public boolean isDaemon()
305     {
306         return _daemon;
307     }
308 
309     
310     /* ------------------------------------------------------------ */
311     public boolean dispatch(Runnable job)
312     {
313         if (isRunning())
314         {
315             final int jobQ = _jobs.size();
316             final int idle = getIdleThreads();
317             if(_jobs.offer(job))
318             {
319                 // If we had no idle threads or the jobQ is greater than the idle threads
320                 if (idle==0 || jobQ>idle)
321                 {
322                     int threads=_threadsStarted.get();
323                     if (threads<_maxThreads)
324                         startThread(threads);
325                 }
326                 return true;
327             }
328         }
329         return false;
330     }
331     
332     /* ------------------------------------------------------------ */
333     public void execute(Runnable job)
334     {
335         if (!dispatch(job))
336             throw new RejectedExecutionException();
337     }
338 
339     /* ------------------------------------------------------------ */
340     /**
341      * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
342      */
343     public void join() throws InterruptedException
344     {   
345         synchronized (_joinLock)
346         {
347             while (isRunning())
348                 _joinLock.wait();
349         }
350         
351         while (isStopping())
352             Thread.sleep(1);
353     }
354 
355     /* ------------------------------------------------------------ */
356     /**
357      * @return The total number of threads currently in the pool
358      */
359     public int getThreads()
360     {
361         return _threadsStarted.get();
362     }
363 
364     /* ------------------------------------------------------------ */
365     /**
366      * @return The number of idle threads in the pool
367      */
368     public int getIdleThreads()
369     {
370         return _threadsIdle.get();
371     }
372     
373     /* ------------------------------------------------------------ */
374     /**
375      * @return True if the pool is at maxThreads and there are more queued jobs than idle threads
376      */
377     public boolean isLowOnThreads()
378     {
379         return _threadsStarted.get()==_maxThreads && _jobs.size()>_threadsIdle.get();
380     }
381 
382     /* ------------------------------------------------------------ */
383     private boolean startThread(int threads)
384     {
385         final int next=threads+1;
386         if (!_threadsStarted.compareAndSet(threads,next))
387             return false;
388         
389         boolean started=false;
390         try
391         {
392             Thread thread=newThread(_runnable);
393             thread.setDaemon(_daemon);
394             thread.setPriority(_priority);
395             thread.setName(_name+"-"+thread.getId());
396             _threads.add(thread);
397             
398             thread.start();
399             started=true;
400         }
401         finally
402         {
403             if (!started)
404                 _threadsStarted.decrementAndGet();
405         }
406         return started;
407     }
408     
409     /* ------------------------------------------------------------ */
410     protected Thread newThread(Runnable runnable)
411     {
412         return new Thread(runnable);
413     }
414 
415     /* ------------------------------------------------------------ */
416     public String toString()
417     {
418         return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
419     }
420     
421     /* ------------------------------------------------------------ */
422     private Runnable _runnable = new Runnable()
423     {
424         public void run()
425         {
426             boolean shrink=false;
427             try
428             {
429                 Runnable job=_jobs.poll();
430                 while (isRunning())
431                 {
432                     // Job loop
433                     while (job!=null && isRunning())
434                     {
435                         job.run();
436                         job=_jobs.poll();
437                     }
438                         
439                     // Idle loop
440                     try
441                     {
442                         _threadsIdle.incrementAndGet();
443 
444                         while (isRunning() && job==null)
445                         {
446                             if (_maxIdleTimeMs<=0)
447                                 job=_jobs.take();
448                             else
449                             {
450                                 job=_jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
451                                 
452                                 if (job==null)
453                                 {
454                                     // maybe we should shrink?
455                                     final int size=_threadsStarted.get();
456                                     if (size>_minThreads)
457                                     {
458                                         long last=_lastShrink.get();
459                                         long now=System.currentTimeMillis();
460                                         if (last==0 || (now-last)>_maxIdleTimeMs)
461                                         {
462                                             shrink=_lastShrink.compareAndSet(last,now) &&
463                                             _threadsStarted.compareAndSet(size,size-1);
464                                             if (shrink)
465                                                 return;
466                                         }
467                                     }
468                                 }
469                             }
470                         }
471                     }
472                     finally
473                     {
474                         _threadsIdle.decrementAndGet();
475                     }
476                 }
477             }
478             catch(InterruptedException e)
479             {
480                 Log.ignore(e);
481             }
482             catch(Exception e)
483             {
484                 Log.warn(e);
485             }
486             finally
487             {
488                 if (!shrink)
489                     _threadsStarted.decrementAndGet();
490                 _threads.remove(Thread.currentThread());
491             }
492         }
493     };
494     
495     public String dump()
496     {
497         StringBuilder buf = new StringBuilder();
498         
499         for (Thread thread: _threads)
500         {
501             buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
502             for (StackTraceElement element : thread.getStackTrace())
503                 buf.append("  at ").append(element.toString()).append('\n');
504         }
505         
506         return buf.toString();
507     }
508     
509     /* ------------------------------------------------------------ */
510     /**
511      * @param id The thread ID to stop.
512      * @return true if the thread was found and stopped.
513      * @Deprecated Use {@link #interruptThread(long)} in preference
514      */
515     @SuppressWarnings("deprecation")
516     public boolean stopThread(long id)
517     {
518         for (Thread thread: _threads)
519         {
520             if (thread.getId()==id)
521             {
522                 thread.stop();
523                 return true;
524             }
525         }
526         return false;
527     }
528     
529     /* ------------------------------------------------------------ */
530     /**
531      * @param id The thread ID to interrupt.
532      * @return true if the thread was found and interrupted.
533      */
534     public boolean interruptThread(long id)
535     {
536         for (Thread thread: _threads)
537         {
538             if (thread.getId()==id)
539             {
540                 thread.interrupt();
541                 return true;
542             }
543         }
544         return false;
545     }
546     
547     /* ------------------------------------------------------------ */
548     /**
549      * @param id The thread ID to interrupt.
550      * @return true if the thread was found and interrupted.
551      */
552     public String dumpThread(long id)
553     {
554         for (Thread thread: _threads)
555         {
556             if (thread.getId()==id)
557             {
558                 StringBuilder buf = new StringBuilder();
559                 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
560                 for (StackTraceElement element : thread.getStackTrace())
561                     buf.append("  at ").append(element.toString()).append('\n');
562                 return buf.toString();
563             }
564         }
565         return null;
566     }
567 }