View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  
15  package org.eclipse.jetty.util.thread;
16  
17  import java.io.IOException;
18  import java.util.ArrayList;
19  import java.util.Arrays;
20  import java.util.List;
21  import java.util.concurrent.ArrayBlockingQueue;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.RejectedExecutionException;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.atomic.AtomicLong;
29  
30  import org.eclipse.jetty.util.BlockingArrayQueue;
31  import org.eclipse.jetty.util.component.AbstractLifeCycle;
32  import org.eclipse.jetty.util.component.AggregateLifeCycle;
33  import org.eclipse.jetty.util.component.Dumpable;
34  import org.eclipse.jetty.util.component.LifeCycle;
35  import org.eclipse.jetty.util.log.Log;
36  
37  public class QueuedThreadPool extends AbstractLifeCycle implements ThreadPool, Executor, Dumpable
38  {
39      private final AtomicInteger _threadsStarted = new AtomicInteger();
40      private final AtomicInteger _threadsIdle = new AtomicInteger();
41      private final AtomicLong _lastShrink = new AtomicLong();
42      private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
43      private final Object _joinLock = new Object();
44      private BlockingQueue<Runnable> _jobs;
45      private String _name;
46      private int _maxIdleTimeMs=60000;
47      private int _maxThreads=254;
48      private int _minThreads=8;
49      private int _maxQueued=-1;
50      private int _priority=Thread.NORM_PRIORITY;
51      private boolean _daemon=false;
52      private int _maxStopTime=100;
53      private boolean _detailedDump=false;
54  
55      /* ------------------------------------------------------------------- */
56      /** Construct
57       */
58      public QueuedThreadPool()
59      {
60          _name="qtp"+super.hashCode();
61      }
62      
63      /* ------------------------------------------------------------------- */
64      /** Construct
65       */
66      public QueuedThreadPool(int maxThreads)
67      {
68          this();
69          setMaxThreads(maxThreads);
70      }
71      
72      /* ------------------------------------------------------------------- */
73      /** Construct
74       */
75      public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
76      {
77          this();
78          _jobs=jobQ;
79          _jobs.clear();
80      }
81      
82      
83      /* ------------------------------------------------------------ */
84      @Override
85      protected void doStart() throws Exception
86      {
87          super.doStart();
88          _threadsStarted.set(0);
89  
90          if (_jobs==null)
91          {
92              _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
93                  :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
94          }
95  
96          int threads=_threadsStarted.get();
97          while (isRunning() && threads<_minThreads)
98          {
99              startThread(threads); 
100             threads=_threadsStarted.get();  
101         }
102     }
103 
104     /* ------------------------------------------------------------ */
105     @Override
106     protected void doStop() throws Exception
107     {
108         super.doStop();
109         long start=System.currentTimeMillis();
110 
111         // let jobs complete naturally for a while
112         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
113             Thread.sleep(1);
114         
115         // kill queued jobs and flush out idle jobs
116         _jobs.clear();
117         Runnable noop = new Runnable(){public void run(){}};
118         for  (int i=_threadsIdle.get();i-->0;)
119             _jobs.offer(noop);
120         Thread.yield();
121 
122         // interrupt remaining threads
123         if (_threadsStarted.get()>0)
124             for (Thread thread : _threads)
125                 thread.interrupt();
126         
127         // wait for remaining threads to die
128         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
129         {
130             Thread.sleep(1);
131         }
132         Thread.yield();
133         int size=_threads.size();
134         if (size>0)
135         {
136             Log.warn(size+" threads could not be stopped");
137             
138             if (Log.isDebugEnabled())
139             {
140                 for (Thread unstopped : _threads)
141                 {
142                     Log.debug("Couldn't stop "+unstopped);
143                     for (StackTraceElement element : unstopped.getStackTrace())
144                     {
145                         Log.debug(" at "+element);
146                     }
147                 }
148             }
149         }
150         
151         synchronized (_joinLock)
152         {
153             _joinLock.notifyAll();
154         }
155     }
156 
157     /* ------------------------------------------------------------ */
158     /** 
159      * Delegated to the named or anonymous Pool.
160      */
161     public void setDaemon(boolean daemon)
162     {
163         _daemon=daemon;
164     }
165     
166     /* ------------------------------------------------------------ */
167     /** Set the maximum thread idle time.
168      * Threads that are idle for longer than this period may be
169      * stopped.
170      * Delegated to the named or anonymous Pool.
171      * @see #getMaxIdleTimeMs
172      * @param maxIdleTimeMs Max idle time in ms.
173      */
174     public void setMaxIdleTimeMs(int maxIdleTimeMs)
175     {
176         _maxIdleTimeMs=maxIdleTimeMs;
177     }
178 
179     /* ------------------------------------------------------------ */
180     /**
181      * @param stopTimeMs maximum total time that stop() will wait for threads to die.
182      */
183     public void setMaxStopTimeMs(int stopTimeMs)
184     {
185         _maxStopTime = stopTimeMs;
186     }
187 
188     /* ------------------------------------------------------------ */
189     /** Set the maximum number of threads.
190      * Delegated to the named or anonymous Pool.
191      * @see #getMaxThreads
192      * @param maxThreads maximum number of threads.
193      */
194     public void setMaxThreads(int maxThreads)
195     {
196         _maxThreads=maxThreads;
197         if (_minThreads>_maxThreads)
198             _minThreads=_maxThreads;
199     }
200 
201     /* ------------------------------------------------------------ */
202     /** Set the minimum number of threads.
203      * Delegated to the named or anonymous Pool.
204      * @see #getMinThreads
205      * @param minThreads minimum number of threads
206      */
207     public void setMinThreads(int minThreads)
208     {
209         _minThreads=minThreads;
210 
211         if (_minThreads>_maxThreads)
212             _maxThreads=_minThreads;
213         
214         int threads=_threadsStarted.get();
215         while (isStarted() && threads<_minThreads)
216         {
217             startThread(threads); 
218             threads=_threadsStarted.get();  
219         }
220     }
221 
222     /* ------------------------------------------------------------ */
223     /** 
224      * @param name Name of the BoundedThreadPool to use when naming Threads.
225      */
226     public void setName(String name)
227     {
228         if (isRunning())
229             throw new IllegalStateException("started");
230         _name= name;
231     }
232 
233     /* ------------------------------------------------------------ */
234     /** Set the priority of the pool threads.
235      *  @param priority the new thread priority.
236      */
237     public void setThreadsPriority(int priority)
238     {
239         _priority=priority;
240     }
241     
242     /* ------------------------------------------------------------ */
243     /**
244      * @return maximum queue size
245      */
246     public int getMaxQueued()
247     {
248         return _maxQueued;
249     }
250     
251     /* ------------------------------------------------------------ */
252     /**
253      * @param max job queue size
254      */
255     public void setMaxQueued(int max)
256     {
257         if (isRunning())
258             throw new IllegalStateException("started");
259         _maxQueued=max;
260     }
261     
262     /* ------------------------------------------------------------ */
263     /** Get the maximum thread idle time.
264      * Delegated to the named or anonymous Pool.
265      * @see #setMaxIdleTimeMs
266      * @return Max idle time in ms.
267      */
268     public int getMaxIdleTimeMs()
269     {
270         return _maxIdleTimeMs;
271     } 
272     
273     /* ------------------------------------------------------------ */
274     /**
275      * @return maximum total time that stop() will wait for threads to die.
276      */
277     public int getMaxStopTimeMs()
278     {
279         return _maxStopTime;
280     }
281     
282     /* ------------------------------------------------------------ */
283     /** Set the maximum number of threads.
284      * Delegated to the named or anonymous Pool.
285      * @see #setMaxThreads
286      * @return maximum number of threads.
287      */
288     public int getMaxThreads()
289     {
290         return _maxThreads;
291     }
292 
293     /* ------------------------------------------------------------ */
294     /** Get the minimum number of threads.
295      * Delegated to the named or anonymous Pool.
296      * @see #setMinThreads
297      * @return minimum number of threads.
298      */
299     public int getMinThreads()
300     {
301         return _minThreads;
302     }
303 
304     /* ------------------------------------------------------------ */
305     /** 
306      * @return The name of the BoundedThreadPool.
307      */
308     public String getName()
309     {
310         return _name;
311     }
312 
313     /* ------------------------------------------------------------ */
314     /** Get the priority of the pool threads.
315      *  @return the priority of the pool threads.
316      */
317     public int getThreadsPriority()
318     {
319         return _priority;
320     }
321     
322     /* ------------------------------------------------------------ */
323     /** 
324      * Delegated to the named or anonymous Pool.
325      */
326     public boolean isDaemon()
327     {
328         return _daemon;
329     }
330 
331     /* ------------------------------------------------------------ */
332     public boolean isDetailedDump()
333     {
334         return _detailedDump;
335     }
336 
337     /* ------------------------------------------------------------ */
338     public void setDetailedDump(boolean detailedDump)
339     {
340         _detailedDump = detailedDump;
341     }
342 
343     /* ------------------------------------------------------------ */
344     public boolean dispatch(Runnable job)
345     {
346         if (isRunning())
347         {
348             final int jobQ = _jobs.size();
349             final int idle = getIdleThreads();
350             if(_jobs.offer(job))
351             {
352                 // If we had no idle threads or the jobQ is greater than the idle threads
353                 if (idle==0 || jobQ>idle)
354                 {
355                     int threads=_threadsStarted.get();
356                     if (threads<_maxThreads)
357                         startThread(threads);
358                 }
359                 return true;
360             }
361         }
362         return false;
363     }
364     
365     /* ------------------------------------------------------------ */
366     public void execute(Runnable job)
367     {
368         if (!dispatch(job))
369             throw new RejectedExecutionException();
370     }
371 
372     /* ------------------------------------------------------------ */
373     /**
374      * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
375      */
376     public void join() throws InterruptedException
377     {   
378         synchronized (_joinLock)
379         {
380             while (isRunning())
381                 _joinLock.wait();
382         }
383         
384         while (isStopping())
385             Thread.sleep(1);
386     }
387 
388     /* ------------------------------------------------------------ */
389     /**
390      * @return The total number of threads currently in the pool
391      */
392     public int getThreads()
393     {
394         return _threadsStarted.get();
395     }
396 
397     /* ------------------------------------------------------------ */
398     /**
399      * @return The number of idle threads in the pool
400      */
401     public int getIdleThreads()
402     {
403         return _threadsIdle.get();
404     }
405     
406     /* ------------------------------------------------------------ */
407     /**
408      * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
409      */
410     public boolean isLowOnThreads()
411     {
412         return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
413     }
414 
415     /* ------------------------------------------------------------ */
416     private boolean startThread(int threads)
417     {
418         final int next=threads+1;
419         if (!_threadsStarted.compareAndSet(threads,next))
420             return false;
421         
422         boolean started=false;
423         try
424         {
425             Thread thread=newThread(_runnable);
426             thread.setDaemon(_daemon);
427             thread.setPriority(_priority);
428             thread.setName(_name+"-"+thread.getId());
429             _threads.add(thread);
430             
431             thread.start();
432             started=true;
433         }
434         finally
435         {
436             if (!started)
437                 _threadsStarted.decrementAndGet();
438         }
439         return started;
440     }
441     
442     /* ------------------------------------------------------------ */
443     protected Thread newThread(Runnable runnable)
444     {
445         return new Thread(runnable);
446     }
447 
448 
449     /* ------------------------------------------------------------ */
450     public String dump()
451     {
452         return AggregateLifeCycle.dump(this);
453     }
454 
455     /* ------------------------------------------------------------ */
456     public void dump(Appendable out, String indent) throws IOException
457     {  
458         List<Object> dump = new ArrayList<Object>(getMaxThreads());        
459         for (final Thread thread: _threads)
460         {
461             final StackTraceElement[] trace=thread.getStackTrace();
462             boolean inIdleJobPoll=false;
463             for (StackTraceElement t : trace)
464             {
465                 if ("idleJobPoll".equals(t.getMethodName()))
466                 {
467                     inIdleJobPoll=true;
468                     break;
469                 }
470             }
471             final boolean idle=inIdleJobPoll;
472                 
473             if (_detailedDump)
474             {
475                 dump.add(new Dumpable()
476                 {
477                     public void dump(Appendable out, String indent) throws IOException
478                     {
479                         out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
480                         if (!idle)
481                             AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
482                     }
483                     
484                     public String dump()
485                     {
486                         return null;
487                     }
488                 });
489             }
490             else
491             {
492                 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+trace[0]+(idle?" IDLE":""));
493             }
494         }
495 
496         out.append(String.valueOf(this)).append("\n");
497         AggregateLifeCycle.dump(out,indent,dump);
498         
499     }
500     
501     
502     /* ------------------------------------------------------------ */
503     @Override
504     public String toString()
505     {
506         return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
507     }
508    
509     private Runnable idleJobPoll() throws InterruptedException
510     {
511         return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
512     }
513     
514     /* ------------------------------------------------------------ */
515     private Runnable _runnable = new Runnable()
516     {
517         public void run()
518         {
519             boolean shrink=false;
520             try
521             {
522                 Runnable job=_jobs.poll();
523                 while (isRunning())
524                 {
525                     // Job loop
526                     while (job!=null && isRunning())
527                     {
528                         job.run();
529                         job=_jobs.poll();
530                     }
531                         
532                     // Idle loop
533                     try
534                     {
535                         _threadsIdle.incrementAndGet();
536 
537                         while (isRunning() && job==null)
538                         {
539                             if (_maxIdleTimeMs<=0)
540                                 job=_jobs.take();
541                             else
542                             {
543                                 // maybe we should shrink?
544                                 final int size=_threadsStarted.get();
545                                 if (size>_minThreads)
546                                 {
547                                     long last=_lastShrink.get();
548                                     long now=System.currentTimeMillis();
549                                     if (last==0 || (now-last)>_maxIdleTimeMs)
550                                     {
551                                         shrink=_lastShrink.compareAndSet(last,now) &&
552                                         _threadsStarted.compareAndSet(size,size-1);
553                                         if (shrink)
554                                             return;
555                                     }
556                                 }
557                                 job=idleJobPoll();
558                             }
559                         }
560                     }
561                     finally
562                     {
563                         _threadsIdle.decrementAndGet();
564                     }
565                 }
566             }
567             catch(InterruptedException e)
568             {
569                 Log.ignore(e);
570             }
571             catch(Exception e)
572             {
573                 Log.warn(e);
574             }
575             finally
576             {
577                 if (!shrink)
578                     _threadsStarted.decrementAndGet();
579                 _threads.remove(Thread.currentThread());
580             }
581         }
582     };
583     
584     /* ------------------------------------------------------------ */
585     /**
586      * @param id The thread ID to stop.
587      * @return true if the thread was found and stopped.
588      * @deprecated Use {@link #interruptThread(long)} in preference
589      */
590     @SuppressWarnings("deprecation")
591     public boolean stopThread(long id)
592     {
593         for (Thread thread: _threads)
594         {
595             if (thread.getId()==id)
596             {
597                 thread.stop();
598                 return true;
599             }
600         }
601         return false;
602     }
603     
604     /* ------------------------------------------------------------ */
605     /**
606      * @param id The thread ID to interrupt.
607      * @return true if the thread was found and interrupted.
608      */
609     public boolean interruptThread(long id)
610     {
611         for (Thread thread: _threads)
612         {
613             if (thread.getId()==id)
614             {
615                 thread.interrupt();
616                 return true;
617             }
618         }
619         return false;
620     }
621     
622     /* ------------------------------------------------------------ */
623     /**
624      * @param id The thread ID to interrupt.
625      * @return true if the thread was found and interrupted.
626      */
627     public String dumpThread(long id)
628     {
629         for (Thread thread: _threads)
630         {
631             if (thread.getId()==id)
632             {
633                 StringBuilder buf = new StringBuilder();
634                 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
635                 for (StackTraceElement element : thread.getStackTrace())
636                     buf.append("  at ").append(element.toString()).append('\n');
637                 return buf.toString();
638             }
639         }
640         return null;
641     }
642 }