View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  
15  package org.eclipse.jetty.util.thread;
16  
17  import java.io.IOException;
18  import java.util.ArrayList;
19  import java.util.Arrays;
20  import java.util.List;
21  import java.util.concurrent.ArrayBlockingQueue;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.RejectedExecutionException;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.atomic.AtomicLong;
29  
30  import org.eclipse.jetty.util.BlockingArrayQueue;
31  import org.eclipse.jetty.util.component.AbstractLifeCycle;
32  import org.eclipse.jetty.util.component.AggregateLifeCycle;
33  import org.eclipse.jetty.util.component.Dumpable;
34  import org.eclipse.jetty.util.component.LifeCycle;
35  import org.eclipse.jetty.util.log.Log;
36  import org.eclipse.jetty.util.log.Logger;
37  import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
38  
39  public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable
40  {
41      private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
42  
43      private final AtomicInteger _threadsStarted = new AtomicInteger();
44      private final AtomicInteger _threadsIdle = new AtomicInteger();
45      private final AtomicLong _lastShrink = new AtomicLong();
46      private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
47      private final Object _joinLock = new Object();
48      private BlockingQueue<Runnable> _jobs;
49      private String _name;
50      private int _maxIdleTimeMs=60000;
51      private int _maxThreads=254;
52      private int _minThreads=8;
53      private int _maxQueued=-1;
54      private int _priority=Thread.NORM_PRIORITY;
55      private boolean _daemon=false;
56      private int _maxStopTime=100;
57      private boolean _detailedDump=false;
58  
59      /* ------------------------------------------------------------------- */
60      /** Construct
61       */
62      public QueuedThreadPool()
63      {
64          _name="qtp"+super.hashCode();
65      }
66  
67      /* ------------------------------------------------------------------- */
68      /** Construct
69       */
70      public QueuedThreadPool(int maxThreads)
71      {
72          this();
73          setMaxThreads(maxThreads);
74      }
75  
76      /* ------------------------------------------------------------------- */
77      /** Construct
78       */
79      public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
80      {
81          this();
82          _jobs=jobQ;
83          _jobs.clear();
84      }
85  
86  
87      /* ------------------------------------------------------------ */
88      @Override
89      protected void doStart() throws Exception
90      {
91          super.doStart();
92          _threadsStarted.set(0);
93  
94          if (_jobs==null)
95          {
96              _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
97                  :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
98          }
99  
100         int threads=_threadsStarted.get();
101         while (isRunning() && threads<_minThreads)
102         {
103             startThread(threads);
104             threads=_threadsStarted.get();
105         }
106     }
107 
108     /* ------------------------------------------------------------ */
109     @Override
110     protected void doStop() throws Exception
111     {
112         super.doStop();
113         long start=System.currentTimeMillis();
114 
115         // let jobs complete naturally for a while
116         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
117             Thread.sleep(1);
118 
119         // kill queued jobs and flush out idle jobs
120         _jobs.clear();
121         Runnable noop = new Runnable(){public void run(){}};
122         for  (int i=_threadsIdle.get();i-->0;)
123             _jobs.offer(noop);
124         Thread.yield();
125 
126         // interrupt remaining threads
127         if (_threadsStarted.get()>0)
128             for (Thread thread : _threads)
129                 thread.interrupt();
130 
131         // wait for remaining threads to die
132         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
133         {
134             Thread.sleep(1);
135         }
136         Thread.yield();
137         int size=_threads.size();
138         if (size>0)
139         {
140             LOG.warn(size+" threads could not be stopped");
141 
142             if (size==1 || LOG.isDebugEnabled())
143             {
144                 for (Thread unstopped : _threads)
145                 {
146                     LOG.info("Couldn't stop "+unstopped);
147                     for (StackTraceElement element : unstopped.getStackTrace())
148                     {
149                         LOG.info(" at "+element);
150                     }
151                 }
152             }
153         }
154 
155         synchronized (_joinLock)
156         {
157             _joinLock.notifyAll();
158         }
159     }
160 
161     /* ------------------------------------------------------------ */
162     /**
163      * Delegated to the named or anonymous Pool.
164      */
165     public void setDaemon(boolean daemon)
166     {
167         _daemon=daemon;
168     }
169 
170     /* ------------------------------------------------------------ */
171     /** Set the maximum thread idle time.
172      * Threads that are idle for longer than this period may be
173      * stopped.
174      * Delegated to the named or anonymous Pool.
175      * @see #getMaxIdleTimeMs
176      * @param maxIdleTimeMs Max idle time in ms.
177      */
178     public void setMaxIdleTimeMs(int maxIdleTimeMs)
179     {
180         _maxIdleTimeMs=maxIdleTimeMs;
181     }
182 
183     /* ------------------------------------------------------------ */
184     /**
185      * @param stopTimeMs maximum total time that stop() will wait for threads to die.
186      */
187     public void setMaxStopTimeMs(int stopTimeMs)
188     {
189         _maxStopTime = stopTimeMs;
190     }
191 
192     /* ------------------------------------------------------------ */
193     /** Set the maximum number of threads.
194      * Delegated to the named or anonymous Pool.
195      * @see #getMaxThreads
196      * @param maxThreads maximum number of threads.
197      */
198     public void setMaxThreads(int maxThreads)
199     {
200         _maxThreads=maxThreads;
201         if (_minThreads>_maxThreads)
202             _minThreads=_maxThreads;
203     }
204 
205     /* ------------------------------------------------------------ */
206     /** Set the minimum number of threads.
207      * Delegated to the named or anonymous Pool.
208      * @see #getMinThreads
209      * @param minThreads minimum number of threads
210      */
211     public void setMinThreads(int minThreads)
212     {
213         _minThreads=minThreads;
214 
215         if (_minThreads>_maxThreads)
216             _maxThreads=_minThreads;
217 
218         int threads=_threadsStarted.get();
219         while (isStarted() && threads<_minThreads)
220         {
221             startThread(threads);
222             threads=_threadsStarted.get();
223         }
224     }
225 
226     /* ------------------------------------------------------------ */
227     /**
228      * @param name Name of the BoundedThreadPool to use when naming Threads.
229      */
230     public void setName(String name)
231     {
232         if (isRunning())
233             throw new IllegalStateException("started");
234         _name= name;
235     }
236 
237     /* ------------------------------------------------------------ */
238     /** Set the priority of the pool threads.
239      *  @param priority the new thread priority.
240      */
241     public void setThreadsPriority(int priority)
242     {
243         _priority=priority;
244     }
245 
246     /* ------------------------------------------------------------ */
247     /**
248      * @return maximum queue size
249      */
250     public int getMaxQueued()
251     {
252         return _maxQueued;
253     }
254 
255     /* ------------------------------------------------------------ */
256     /**
257      * @param max job queue size
258      */
259     public void setMaxQueued(int max)
260     {
261         if (isRunning())
262             throw new IllegalStateException("started");
263         _maxQueued=max;
264     }
265 
266     /* ------------------------------------------------------------ */
267     /** Get the maximum thread idle time.
268      * Delegated to the named or anonymous Pool.
269      * @see #setMaxIdleTimeMs
270      * @return Max idle time in ms.
271      */
272     public int getMaxIdleTimeMs()
273     {
274         return _maxIdleTimeMs;
275     }
276 
277     /* ------------------------------------------------------------ */
278     /**
279      * @return maximum total time that stop() will wait for threads to die.
280      */
281     public int getMaxStopTimeMs()
282     {
283         return _maxStopTime;
284     }
285 
286     /* ------------------------------------------------------------ */
287     /** Set the maximum number of threads.
288      * Delegated to the named or anonymous Pool.
289      * @see #setMaxThreads
290      * @return maximum number of threads.
291      */
292     public int getMaxThreads()
293     {
294         return _maxThreads;
295     }
296 
297     /* ------------------------------------------------------------ */
298     /** Get the minimum number of threads.
299      * Delegated to the named or anonymous Pool.
300      * @see #setMinThreads
301      * @return minimum number of threads.
302      */
303     public int getMinThreads()
304     {
305         return _minThreads;
306     }
307 
308     /* ------------------------------------------------------------ */
309     /**
310      * @return The name of the BoundedThreadPool.
311      */
312     public String getName()
313     {
314         return _name;
315     }
316 
317     /* ------------------------------------------------------------ */
318     /** Get the priority of the pool threads.
319      *  @return the priority of the pool threads.
320      */
321     public int getThreadsPriority()
322     {
323         return _priority;
324     }
325 
326     /* ------------------------------------------------------------ */
327     /**
328      * Delegated to the named or anonymous Pool.
329      */
330     public boolean isDaemon()
331     {
332         return _daemon;
333     }
334 
335     /* ------------------------------------------------------------ */
336     public boolean isDetailedDump()
337     {
338         return _detailedDump;
339     }
340 
341     /* ------------------------------------------------------------ */
342     public void setDetailedDump(boolean detailedDump)
343     {
344         _detailedDump = detailedDump;
345     }
346 
347     /* ------------------------------------------------------------ */
348     public boolean dispatch(Runnable job)
349     {
350         if (isRunning())
351         {
352             final int jobQ = _jobs.size();
353             final int idle = getIdleThreads();
354             if(_jobs.offer(job))
355             {
356                 // If we had no idle threads or the jobQ is greater than the idle threads
357                 if (idle==0 || jobQ>idle)
358                 {
359                     int threads=_threadsStarted.get();
360                     if (threads<_maxThreads)
361                         startThread(threads);
362                 }
363                 return true;
364             }
365         }
366         LOG.debug("Dispatched {} to stopped {}",job,this);
367         return false;
368     }
369 
370     /* ------------------------------------------------------------ */
371     public void execute(Runnable job)
372     {
373         if (!dispatch(job))
374             throw new RejectedExecutionException();
375     }
376 
377     /* ------------------------------------------------------------ */
378     /**
379      * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
380      */
381     public void join() throws InterruptedException
382     {
383         synchronized (_joinLock)
384         {
385             while (isRunning())
386                 _joinLock.wait();
387         }
388 
389         while (isStopping())
390             Thread.sleep(1);
391     }
392 
393     /* ------------------------------------------------------------ */
394     /**
395      * @return The total number of threads currently in the pool
396      */
397     public int getThreads()
398     {
399         return _threadsStarted.get();
400     }
401 
402     /* ------------------------------------------------------------ */
403     /**
404      * @return The number of idle threads in the pool
405      */
406     public int getIdleThreads()
407     {
408         return _threadsIdle.get();
409     }
410 
411     /* ------------------------------------------------------------ */
412     /**
413      * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
414      */
415     public boolean isLowOnThreads()
416     {
417         return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
418     }
419 
420     /* ------------------------------------------------------------ */
421     private boolean startThread(int threads)
422     {
423         final int next=threads+1;
424         if (!_threadsStarted.compareAndSet(threads,next))
425             return false;
426 
427         boolean started=false;
428         try
429         {
430             Thread thread=newThread(_runnable);
431             thread.setDaemon(_daemon);
432             thread.setPriority(_priority);
433             thread.setName(_name+"-"+thread.getId());
434             _threads.add(thread);
435 
436             thread.start();
437             started=true;
438         }
439         finally
440         {
441             if (!started)
442                 _threadsStarted.decrementAndGet();
443         }
444         return started;
445     }
446 
447     /* ------------------------------------------------------------ */
448     protected Thread newThread(Runnable runnable)
449     {
450         return new Thread(runnable);
451     }
452 
453 
454     /* ------------------------------------------------------------ */
455     public String dump()
456     {
457         return AggregateLifeCycle.dump(this);
458     }
459 
460     /* ------------------------------------------------------------ */
461     public void dump(Appendable out, String indent) throws IOException
462     {
463         List<Object> dump = new ArrayList<Object>(getMaxThreads());
464         for (final Thread thread: _threads)
465         {
466             final StackTraceElement[] trace=thread.getStackTrace();
467             boolean inIdleJobPoll=false;
468             for (StackTraceElement t : trace)
469             {
470                 if ("idleJobPoll".equals(t.getMethodName()))
471                 {
472                     inIdleJobPoll=true;
473                     break;
474                 }
475             }
476             final boolean idle=inIdleJobPoll;
477 
478             if (_detailedDump)
479             {
480                 dump.add(new Dumpable()
481                 {
482                     public void dump(Appendable out, String indent) throws IOException
483                     {
484                         out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
485                         if (!idle)
486                             AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
487                     }
488 
489                     public String dump()
490                     {
491                         return null;
492                     }
493                 });
494             }
495             else
496             {
497                 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":""));
498             }
499         }
500 
501         AggregateLifeCycle.dumpObject(out,this);
502         AggregateLifeCycle.dump(out,indent,dump);
503 
504     }
505 
506 
507     /* ------------------------------------------------------------ */
508     @Override
509     public String toString()
510     {
511         return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
512     }
513 
514     /* ------------------------------------------------------------ */
515     private Runnable idleJobPoll() throws InterruptedException
516     {
517         return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
518     }
519 
520     /* ------------------------------------------------------------ */
521     private Runnable _runnable = new Runnable()
522     {
523         public void run()
524         {
525             boolean shrink=false;
526             try
527             {
528                 Runnable job=_jobs.poll();
529                 while (isRunning())
530                 {
531                     // Job loop
532                     while (job!=null && isRunning())
533                     {
534                         runJob(job);
535                         job=_jobs.poll();
536                     }
537 
538                     // Idle loop
539                     try
540                     {
541                         _threadsIdle.incrementAndGet();
542 
543                         while (isRunning() && job==null)
544                         {
545                             if (_maxIdleTimeMs<=0)
546                                 job=_jobs.take();
547                             else
548                             {
549                                 // maybe we should shrink?
550                                 final int size=_threadsStarted.get();
551                                 if (size>_minThreads)
552                                 {
553                                     long last=_lastShrink.get();
554                                     long now=System.currentTimeMillis();
555                                     if (last==0 || (now-last)>_maxIdleTimeMs)
556                                     {
557                                         shrink=_lastShrink.compareAndSet(last,now) &&
558                                         _threadsStarted.compareAndSet(size,size-1);
559                                         if (shrink)
560                                             return;
561                                     }
562                                 }
563                                 job=idleJobPoll();
564                             }
565                         }
566                     }
567                     finally
568                     {
569                         _threadsIdle.decrementAndGet();
570                     }
571                 }
572             }
573             catch(InterruptedException e)
574             {
575                 LOG.ignore(e);
576             }
577             catch(Exception e)
578             {
579                 LOG.warn(e);
580             }
581             finally
582             {
583                 if (!shrink)
584                     _threadsStarted.decrementAndGet();
585                 _threads.remove(Thread.currentThread());
586             }
587         }
588     };
589 
590     /* ------------------------------------------------------------ */
591     /**
592      * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
593      * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
594      *
595      * @param job the job to run
596      */
597     protected void runJob(Runnable job)
598     {
599         job.run();
600     }
601 
602     /* ------------------------------------------------------------ */
603     /**
604      * @return the job queue
605      */
606     protected BlockingQueue<Runnable> getQueue()
607     {
608         return _jobs;
609     }
610 
611     /* ------------------------------------------------------------ */
612     /**
613      * @param id The thread ID to stop.
614      * @return true if the thread was found and stopped.
615      * @deprecated Use {@link #interruptThread(long)} in preference
616      */
617     @Deprecated
618     public boolean stopThread(long id)
619     {
620         for (Thread thread: _threads)
621         {
622             if (thread.getId()==id)
623             {
624                 thread.stop();
625                 return true;
626             }
627         }
628         return false;
629     }
630 
631     /* ------------------------------------------------------------ */
632     /**
633      * @param id The thread ID to interrupt.
634      * @return true if the thread was found and interrupted.
635      */
636     public boolean interruptThread(long id)
637     {
638         for (Thread thread: _threads)
639         {
640             if (thread.getId()==id)
641             {
642                 thread.interrupt();
643                 return true;
644             }
645         }
646         return false;
647     }
648 
649     /* ------------------------------------------------------------ */
650     /**
651      * @param id The thread ID to interrupt.
652      * @return true if the thread was found and interrupted.
653      */
654     public String dumpThread(long id)
655     {
656         for (Thread thread: _threads)
657         {
658             if (thread.getId()==id)
659             {
660                 StringBuilder buf = new StringBuilder();
661                 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
662                 for (StackTraceElement element : thread.getStackTrace())
663                     buf.append("  at ").append(element.toString()).append('\n');
664                 return buf.toString();
665             }
666         }
667         return null;
668     }
669 }