View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  
15  package org.eclipse.jetty.util.thread;
16  
17  import java.io.IOException;
18  import java.util.ArrayList;
19  import java.util.Arrays;
20  import java.util.List;
21  import java.util.concurrent.ArrayBlockingQueue;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.RejectedExecutionException;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.atomic.AtomicLong;
29  
30  import org.eclipse.jetty.util.BlockingArrayQueue;
31  import org.eclipse.jetty.util.component.AbstractLifeCycle;
32  import org.eclipse.jetty.util.component.AggregateLifeCycle;
33  import org.eclipse.jetty.util.component.Dumpable;
34  import org.eclipse.jetty.util.component.LifeCycle;
35  import org.eclipse.jetty.util.log.Log;
36  import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
37  
38  public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable
39  {
40      private final AtomicInteger _threadsStarted = new AtomicInteger();
41      private final AtomicInteger _threadsIdle = new AtomicInteger();
42      private final AtomicLong _lastShrink = new AtomicLong();
43      private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
44      private final Object _joinLock = new Object();
45      private BlockingQueue<Runnable> _jobs;
46      private String _name;
47      private int _maxIdleTimeMs=60000;
48      private int _maxThreads=254;
49      private int _minThreads=8;
50      private int _maxQueued=-1;
51      private int _priority=Thread.NORM_PRIORITY;
52      private boolean _daemon=false;
53      private int _maxStopTime=100;
54      private boolean _detailedDump=false;
55  
56      /* ------------------------------------------------------------------- */
57      /** Construct
58       */
59      public QueuedThreadPool()
60      {
61          _name="qtp"+super.hashCode();
62      }
63      
64      /* ------------------------------------------------------------------- */
65      /** Construct
66       */
67      public QueuedThreadPool(int maxThreads)
68      {
69          this();
70          setMaxThreads(maxThreads);
71      }
72      
73      /* ------------------------------------------------------------------- */
74      /** Construct
75       */
76      public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
77      {
78          this();
79          _jobs=jobQ;
80          _jobs.clear();
81      }
82      
83      
84      /* ------------------------------------------------------------ */
85      @Override
86      protected void doStart() throws Exception
87      {
88          super.doStart();
89          _threadsStarted.set(0);
90  
91          if (_jobs==null)
92          {
93              _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
94                  :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
95          }
96  
97          int threads=_threadsStarted.get();
98          while (isRunning() && threads<_minThreads)
99          {
100             startThread(threads); 
101             threads=_threadsStarted.get();  
102         }
103     }
104 
105     /* ------------------------------------------------------------ */
106     @Override
107     protected void doStop() throws Exception
108     {
109         super.doStop();
110         long start=System.currentTimeMillis();
111 
112         // let jobs complete naturally for a while
113         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
114             Thread.sleep(1);
115         
116         // kill queued jobs and flush out idle jobs
117         _jobs.clear();
118         Runnable noop = new Runnable(){public void run(){}};
119         for  (int i=_threadsIdle.get();i-->0;)
120             _jobs.offer(noop);
121         Thread.yield();
122 
123         // interrupt remaining threads
124         if (_threadsStarted.get()>0)
125             for (Thread thread : _threads)
126                 thread.interrupt();
127         
128         // wait for remaining threads to die
129         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
130         {
131             Thread.sleep(1);
132         }
133         Thread.yield();
134         int size=_threads.size();
135         if (size>0)
136         {
137             Log.warn(size+" threads could not be stopped");
138             
139             if (Log.isDebugEnabled())
140             {
141                 for (Thread unstopped : _threads)
142                 {
143                     Log.debug("Couldn't stop "+unstopped);
144                     for (StackTraceElement element : unstopped.getStackTrace())
145                     {
146                         Log.debug(" at "+element);
147                     }
148                 }
149             }
150         }
151         
152         synchronized (_joinLock)
153         {
154             _joinLock.notifyAll();
155         }
156     }
157 
158     /* ------------------------------------------------------------ */
159     /** 
160      * Delegated to the named or anonymous Pool.
161      */
162     public void setDaemon(boolean daemon)
163     {
164         _daemon=daemon;
165     }
166     
167     /* ------------------------------------------------------------ */
168     /** Set the maximum thread idle time.
169      * Threads that are idle for longer than this period may be
170      * stopped.
171      * Delegated to the named or anonymous Pool.
172      * @see #getMaxIdleTimeMs
173      * @param maxIdleTimeMs Max idle time in ms.
174      */
175     public void setMaxIdleTimeMs(int maxIdleTimeMs)
176     {
177         _maxIdleTimeMs=maxIdleTimeMs;
178     }
179 
180     /* ------------------------------------------------------------ */
181     /**
182      * @param stopTimeMs maximum total time that stop() will wait for threads to die.
183      */
184     public void setMaxStopTimeMs(int stopTimeMs)
185     {
186         _maxStopTime = stopTimeMs;
187     }
188 
189     /* ------------------------------------------------------------ */
190     /** Set the maximum number of threads.
191      * Delegated to the named or anonymous Pool.
192      * @see #getMaxThreads
193      * @param maxThreads maximum number of threads.
194      */
195     public void setMaxThreads(int maxThreads)
196     {
197         _maxThreads=maxThreads;
198         if (_minThreads>_maxThreads)
199             _minThreads=_maxThreads;
200     }
201 
202     /* ------------------------------------------------------------ */
203     /** Set the minimum number of threads.
204      * Delegated to the named or anonymous Pool.
205      * @see #getMinThreads
206      * @param minThreads minimum number of threads
207      */
208     public void setMinThreads(int minThreads)
209     {
210         _minThreads=minThreads;
211 
212         if (_minThreads>_maxThreads)
213             _maxThreads=_minThreads;
214         
215         int threads=_threadsStarted.get();
216         while (isStarted() && threads<_minThreads)
217         {
218             startThread(threads); 
219             threads=_threadsStarted.get();  
220         }
221     }
222 
223     /* ------------------------------------------------------------ */
224     /** 
225      * @param name Name of the BoundedThreadPool to use when naming Threads.
226      */
227     public void setName(String name)
228     {
229         if (isRunning())
230             throw new IllegalStateException("started");
231         _name= name;
232     }
233 
234     /* ------------------------------------------------------------ */
235     /** Set the priority of the pool threads.
236      *  @param priority the new thread priority.
237      */
238     public void setThreadsPriority(int priority)
239     {
240         _priority=priority;
241     }
242     
243     /* ------------------------------------------------------------ */
244     /**
245      * @return maximum queue size
246      */
247     public int getMaxQueued()
248     {
249         return _maxQueued;
250     }
251     
252     /* ------------------------------------------------------------ */
253     /**
254      * @param max job queue size
255      */
256     public void setMaxQueued(int max)
257     {
258         if (isRunning())
259             throw new IllegalStateException("started");
260         _maxQueued=max;
261     }
262     
263     /* ------------------------------------------------------------ */
264     /** Get the maximum thread idle time.
265      * Delegated to the named or anonymous Pool.
266      * @see #setMaxIdleTimeMs
267      * @return Max idle time in ms.
268      */
269     public int getMaxIdleTimeMs()
270     {
271         return _maxIdleTimeMs;
272     } 
273     
274     /* ------------------------------------------------------------ */
275     /**
276      * @return maximum total time that stop() will wait for threads to die.
277      */
278     public int getMaxStopTimeMs()
279     {
280         return _maxStopTime;
281     }
282     
283     /* ------------------------------------------------------------ */
284     /** Set the maximum number of threads.
285      * Delegated to the named or anonymous Pool.
286      * @see #setMaxThreads
287      * @return maximum number of threads.
288      */
289     public int getMaxThreads()
290     {
291         return _maxThreads;
292     }
293 
294     /* ------------------------------------------------------------ */
295     /** Get the minimum number of threads.
296      * Delegated to the named or anonymous Pool.
297      * @see #setMinThreads
298      * @return minimum number of threads.
299      */
300     public int getMinThreads()
301     {
302         return _minThreads;
303     }
304 
305     /* ------------------------------------------------------------ */
306     /** 
307      * @return The name of the BoundedThreadPool.
308      */
309     public String getName()
310     {
311         return _name;
312     }
313 
314     /* ------------------------------------------------------------ */
315     /** Get the priority of the pool threads.
316      *  @return the priority of the pool threads.
317      */
318     public int getThreadsPriority()
319     {
320         return _priority;
321     }
322     
323     /* ------------------------------------------------------------ */
324     /** 
325      * Delegated to the named or anonymous Pool.
326      */
327     public boolean isDaemon()
328     {
329         return _daemon;
330     }
331 
332     /* ------------------------------------------------------------ */
333     public boolean isDetailedDump()
334     {
335         return _detailedDump;
336     }
337 
338     /* ------------------------------------------------------------ */
339     public void setDetailedDump(boolean detailedDump)
340     {
341         _detailedDump = detailedDump;
342     }
343 
344     /* ------------------------------------------------------------ */
345     public boolean dispatch(Runnable job)
346     {
347         if (isRunning())
348         {
349             final int jobQ = _jobs.size();
350             final int idle = getIdleThreads();
351             if(_jobs.offer(job))
352             {
353                 // If we had no idle threads or the jobQ is greater than the idle threads
354                 if (idle==0 || jobQ>idle)
355                 {
356                     int threads=_threadsStarted.get();
357                     if (threads<_maxThreads)
358                         startThread(threads);
359                 }
360                 return true;
361             }
362         }
363         return false;
364     }
365     
366     /* ------------------------------------------------------------ */
367     public void execute(Runnable job)
368     {
369         if (!dispatch(job))
370             throw new RejectedExecutionException();
371     }
372 
373     /* ------------------------------------------------------------ */
374     /**
375      * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
376      */
377     public void join() throws InterruptedException
378     {   
379         synchronized (_joinLock)
380         {
381             while (isRunning())
382                 _joinLock.wait();
383         }
384         
385         while (isStopping())
386             Thread.sleep(1);
387     }
388 
389     /* ------------------------------------------------------------ */
390     /**
391      * @return The total number of threads currently in the pool
392      */
393     public int getThreads()
394     {
395         return _threadsStarted.get();
396     }
397 
398     /* ------------------------------------------------------------ */
399     /**
400      * @return The number of idle threads in the pool
401      */
402     public int getIdleThreads()
403     {
404         return _threadsIdle.get();
405     }
406     
407     /* ------------------------------------------------------------ */
408     /**
409      * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
410      */
411     public boolean isLowOnThreads()
412     {
413         return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
414     }
415 
416     /* ------------------------------------------------------------ */
417     private boolean startThread(int threads)
418     {
419         final int next=threads+1;
420         if (!_threadsStarted.compareAndSet(threads,next))
421             return false;
422         
423         boolean started=false;
424         try
425         {
426             Thread thread=newThread(_runnable);
427             thread.setDaemon(_daemon);
428             thread.setPriority(_priority);
429             thread.setName(_name+"-"+thread.getId());
430             _threads.add(thread);
431             
432             thread.start();
433             started=true;
434         }
435         finally
436         {
437             if (!started)
438                 _threadsStarted.decrementAndGet();
439         }
440         return started;
441     }
442     
443     /* ------------------------------------------------------------ */
444     protected Thread newThread(Runnable runnable)
445     {
446         return new Thread(runnable);
447     }
448 
449 
450     /* ------------------------------------------------------------ */
451     public String dump()
452     {
453         return AggregateLifeCycle.dump(this);
454     }
455 
456     /* ------------------------------------------------------------ */
457     public void dump(Appendable out, String indent) throws IOException
458     {  
459         List<Object> dump = new ArrayList<Object>(getMaxThreads());        
460         for (final Thread thread: _threads)
461         {
462             final StackTraceElement[] trace=thread.getStackTrace();
463             boolean inIdleJobPoll=false;
464             for (StackTraceElement t : trace)
465             {
466                 if ("idleJobPoll".equals(t.getMethodName()))
467                 {
468                     inIdleJobPoll=true;
469                     break;
470                 }
471             }
472             final boolean idle=inIdleJobPoll;
473                 
474             if (_detailedDump)
475             {
476                 dump.add(new Dumpable()
477                 {
478                     public void dump(Appendable out, String indent) throws IOException
479                     {
480                         out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
481                         if (!idle)
482                             AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
483                     }
484                     
485                     public String dump()
486                     {
487                         return null;
488                     }
489                 });
490             }
491             else
492             {
493                 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":""));
494             }
495         }
496 
497         out.append(String.valueOf(this)).append("\n");
498         AggregateLifeCycle.dump(out,indent,dump);
499         
500     }
501     
502     
503     /* ------------------------------------------------------------ */
504     @Override
505     public String toString()
506     {
507         return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
508     }
509    
510     private Runnable idleJobPoll() throws InterruptedException
511     {
512         return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
513     }
514     
515     /* ------------------------------------------------------------ */
516     private Runnable _runnable = new Runnable()
517     {
518         public void run()
519         {
520             boolean shrink=false;
521             try
522             {
523                 Runnable job=_jobs.poll();
524                 while (isRunning())
525                 {
526                     // Job loop
527                     while (job!=null && isRunning())
528                     {
529                         job.run();
530                         job=_jobs.poll();
531                     }
532                         
533                     // Idle loop
534                     try
535                     {
536                         _threadsIdle.incrementAndGet();
537 
538                         while (isRunning() && job==null)
539                         {
540                             if (_maxIdleTimeMs<=0)
541                                 job=_jobs.take();
542                             else
543                             {
544                                 // maybe we should shrink?
545                                 final int size=_threadsStarted.get();
546                                 if (size>_minThreads)
547                                 {
548                                     long last=_lastShrink.get();
549                                     long now=System.currentTimeMillis();
550                                     if (last==0 || (now-last)>_maxIdleTimeMs)
551                                     {
552                                         shrink=_lastShrink.compareAndSet(last,now) &&
553                                         _threadsStarted.compareAndSet(size,size-1);
554                                         if (shrink)
555                                             return;
556                                     }
557                                 }
558                                 job=idleJobPoll();
559                             }
560                         }
561                     }
562                     finally
563                     {
564                         _threadsIdle.decrementAndGet();
565                     }
566                 }
567             }
568             catch(InterruptedException e)
569             {
570                 Log.ignore(e);
571             }
572             catch(Exception e)
573             {
574                 Log.warn(e);
575             }
576             finally
577             {
578                 if (!shrink)
579                     _threadsStarted.decrementAndGet();
580                 _threads.remove(Thread.currentThread());
581             }
582         }
583     };
584     
585     /* ------------------------------------------------------------ */
586     /**
587      * @param id The thread ID to stop.
588      * @return true if the thread was found and stopped.
589      * @deprecated Use {@link #interruptThread(long)} in preference
590      */
591     @SuppressWarnings("deprecation")
592     public boolean stopThread(long id)
593     {
594         for (Thread thread: _threads)
595         {
596             if (thread.getId()==id)
597             {
598                 thread.stop();
599                 return true;
600             }
601         }
602         return false;
603     }
604     
605     /* ------------------------------------------------------------ */
606     /**
607      * @param id The thread ID to interrupt.
608      * @return true if the thread was found and interrupted.
609      */
610     public boolean interruptThread(long id)
611     {
612         for (Thread thread: _threads)
613         {
614             if (thread.getId()==id)
615             {
616                 thread.interrupt();
617                 return true;
618             }
619         }
620         return false;
621     }
622     
623     /* ------------------------------------------------------------ */
624     /**
625      * @param id The thread ID to interrupt.
626      * @return true if the thread was found and interrupted.
627      */
628     public String dumpThread(long id)
629     {
630         for (Thread thread: _threads)
631         {
632             if (thread.getId()==id)
633             {
634                 StringBuilder buf = new StringBuilder();
635                 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
636                 for (StackTraceElement element : thread.getStackTrace())
637                     buf.append("  at ").append(element.toString()).append('\n');
638                 return buf.toString();
639             }
640         }
641         return null;
642     }
643 }