View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  
15  package org.eclipse.jetty.util.thread;
16  
17  import java.io.IOException;
18  import java.util.ArrayList;
19  import java.util.Arrays;
20  import java.util.List;
21  import java.util.concurrent.ArrayBlockingQueue;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.RejectedExecutionException;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.atomic.AtomicLong;
29  
30  import org.eclipse.jetty.util.BlockingArrayQueue;
31  import org.eclipse.jetty.util.component.AbstractLifeCycle;
32  import org.eclipse.jetty.util.component.AggregateLifeCycle;
33  import org.eclipse.jetty.util.component.Dumpable;
34  import org.eclipse.jetty.util.component.LifeCycle;
35  import org.eclipse.jetty.util.log.Log;
36  import org.eclipse.jetty.util.log.Logger;
37  import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
38  
39  public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable
40  {
41      private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
42  
43      private final AtomicInteger _threadsStarted = new AtomicInteger();
44      private final AtomicInteger _threadsIdle = new AtomicInteger();
45      private final AtomicLong _lastShrink = new AtomicLong();
46      private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
47      private final Object _joinLock = new Object();
48      private BlockingQueue<Runnable> _jobs;
49      private String _name;
50      private int _maxIdleTimeMs=60000;
51      private int _maxThreads=254;
52      private int _minThreads=8;
53      private int _maxQueued=-1;
54      private int _priority=Thread.NORM_PRIORITY;
55      private boolean _daemon=false;
56      private int _maxStopTime=100;
57      private boolean _detailedDump=false;
58  
59      /* ------------------------------------------------------------------- */
60      /** Construct
61       */
62      public QueuedThreadPool()
63      {
64          _name="qtp"+super.hashCode();
65      }
66  
67      /* ------------------------------------------------------------------- */
68      /** Construct
69       */
70      public QueuedThreadPool(int maxThreads)
71      {
72          this();
73          setMaxThreads(maxThreads);
74      }
75  
76      /* ------------------------------------------------------------------- */
77      /** Construct
78       */
79      public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
80      {
81          this();
82          _jobs=jobQ;
83          _jobs.clear();
84      }
85  
86  
87      /* ------------------------------------------------------------ */
88      @Override
89      protected void doStart() throws Exception
90      {
91          super.doStart();
92          _threadsStarted.set(0);
93  
94          if (_jobs==null)
95          {
96              _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
97                  :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
98          }
99  
100         int threads=_threadsStarted.get();
101         while (isRunning() && threads<_minThreads)
102         {
103             startThread(threads);
104             threads=_threadsStarted.get();
105         }
106     }
107 
108     /* ------------------------------------------------------------ */
109     @Override
110     protected void doStop() throws Exception
111     {
112         super.doStop();
113         long start=System.currentTimeMillis();
114 
115         // let jobs complete naturally for a while
116         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
117             Thread.sleep(1);
118 
119         // kill queued jobs and flush out idle jobs
120         _jobs.clear();
121         Runnable noop = new Runnable(){public void run(){}};
122         for  (int i=_threadsIdle.get();i-->0;)
123             _jobs.offer(noop);
124         Thread.yield();
125 
126         // interrupt remaining threads
127         if (_threadsStarted.get()>0)
128             for (Thread thread : _threads)
129                 thread.interrupt();
130 
131         // wait for remaining threads to die
132         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
133         {
134             Thread.sleep(1);
135         }
136         Thread.yield();
137         int size=_threads.size();
138         if (size>0)
139         {
140             LOG.warn(size+" threads could not be stopped");
141 
142             if (size==1 || LOG.isDebugEnabled())
143             {
144                 for (Thread unstopped : _threads)
145                 {
146                     LOG.info("Couldn't stop "+unstopped);
147                     for (StackTraceElement element : unstopped.getStackTrace())
148                     {
149                         LOG.info(" at "+element);
150                     }
151                 }
152             }
153         }
154 
155         synchronized (_joinLock)
156         {
157             _joinLock.notifyAll();
158         }
159     }
160 
161     /* ------------------------------------------------------------ */
162     /**
163      * Delegated to the named or anonymous Pool.
164      */
165     public void setDaemon(boolean daemon)
166     {
167         _daemon=daemon;
168     }
169 
170     /* ------------------------------------------------------------ */
171     /** Set the maximum thread idle time.
172      * Threads that are idle for longer than this period may be
173      * stopped.
174      * Delegated to the named or anonymous Pool.
175      * @see #getMaxIdleTimeMs
176      * @param maxIdleTimeMs Max idle time in ms.
177      */
178     public void setMaxIdleTimeMs(int maxIdleTimeMs)
179     {
180         _maxIdleTimeMs=maxIdleTimeMs;
181     }
182 
183     /* ------------------------------------------------------------ */
184     /**
185      * @param stopTimeMs maximum total time that stop() will wait for threads to die.
186      */
187     public void setMaxStopTimeMs(int stopTimeMs)
188     {
189         _maxStopTime = stopTimeMs;
190     }
191 
192     /* ------------------------------------------------------------ */
193     /** Set the maximum number of threads.
194      * Delegated to the named or anonymous Pool.
195      * @see #getMaxThreads
196      * @param maxThreads maximum number of threads.
197      */
198     public void setMaxThreads(int maxThreads)
199     {
200         _maxThreads=maxThreads;
201         if (_minThreads>_maxThreads)
202             _minThreads=_maxThreads;
203     }
204 
205     /* ------------------------------------------------------------ */
206     /** Set the minimum number of threads.
207      * Delegated to the named or anonymous Pool.
208      * @see #getMinThreads
209      * @param minThreads minimum number of threads
210      */
211     public void setMinThreads(int minThreads)
212     {
213         _minThreads=minThreads;
214 
215         if (_minThreads>_maxThreads)
216             _maxThreads=_minThreads;
217 
218         int threads=_threadsStarted.get();
219         while (isStarted() && threads<_minThreads)
220         {
221             startThread(threads);
222             threads=_threadsStarted.get();
223         }
224     }
225 
226     /* ------------------------------------------------------------ */
227     /**
228      * @param name Name of the BoundedThreadPool to use when naming Threads.
229      */
230     public void setName(String name)
231     {
232         if (isRunning())
233             throw new IllegalStateException("started");
234         _name= name;
235     }
236 
237     /* ------------------------------------------------------------ */
238     /** Set the priority of the pool threads.
239      *  @param priority the new thread priority.
240      */
241     public void setThreadsPriority(int priority)
242     {
243         _priority=priority;
244     }
245 
246     /* ------------------------------------------------------------ */
247     /**
248      * @return maximum queue size
249      */
250     public int getMaxQueued()
251     {
252         return _maxQueued;
253     }
254 
255     /* ------------------------------------------------------------ */
256     /**
257      * @param max job queue size
258      */
259     public void setMaxQueued(int max)
260     {
261         if (isRunning())
262             throw new IllegalStateException("started");
263         _maxQueued=max;
264     }
265 
266     /* ------------------------------------------------------------ */
267     /** Get the maximum thread idle time.
268      * Delegated to the named or anonymous Pool.
269      * @see #setMaxIdleTimeMs
270      * @return Max idle time in ms.
271      */
272     public int getMaxIdleTimeMs()
273     {
274         return _maxIdleTimeMs;
275     }
276 
277     /* ------------------------------------------------------------ */
278     /**
279      * @return maximum total time that stop() will wait for threads to die.
280      */
281     public int getMaxStopTimeMs()
282     {
283         return _maxStopTime;
284     }
285 
286     /* ------------------------------------------------------------ */
287     /** Set the maximum number of threads.
288      * Delegated to the named or anonymous Pool.
289      * @see #setMaxThreads
290      * @return maximum number of threads.
291      */
292     public int getMaxThreads()
293     {
294         return _maxThreads;
295     }
296 
297     /* ------------------------------------------------------------ */
298     /** Get the minimum number of threads.
299      * Delegated to the named or anonymous Pool.
300      * @see #setMinThreads
301      * @return minimum number of threads.
302      */
303     public int getMinThreads()
304     {
305         return _minThreads;
306     }
307 
308     /* ------------------------------------------------------------ */
309     /**
310      * @return The name of the BoundedThreadPool.
311      */
312     public String getName()
313     {
314         return _name;
315     }
316 
317     /* ------------------------------------------------------------ */
318     /** Get the priority of the pool threads.
319      *  @return the priority of the pool threads.
320      */
321     public int getThreadsPriority()
322     {
323         return _priority;
324     }
325 
326     /* ------------------------------------------------------------ */
327     /**
328      * Delegated to the named or anonymous Pool.
329      */
330     public boolean isDaemon()
331     {
332         return _daemon;
333     }
334 
335     /* ------------------------------------------------------------ */
336     public boolean isDetailedDump()
337     {
338         return _detailedDump;
339     }
340 
341     /* ------------------------------------------------------------ */
342     public void setDetailedDump(boolean detailedDump)
343     {
344         _detailedDump = detailedDump;
345     }
346 
347     /* ------------------------------------------------------------ */
348     public boolean dispatch(Runnable job)
349     {
350         if (isRunning())
351         {
352             final int jobQ = _jobs.size();
353             final int idle = getIdleThreads();
354             if(_jobs.offer(job))
355             {
356                 // If we had no idle threads or the jobQ is greater than the idle threads
357                 if (idle==0 || jobQ>idle)
358                 {
359                     int threads=_threadsStarted.get();
360                     if (threads<_maxThreads)
361                         startThread(threads);
362                 }
363                 return true;
364             }
365         }
366         LOG.debug("Dispatched {} to stopped {}",job,this);
367         return false;
368     }
369 
370     /* ------------------------------------------------------------ */
371     public void execute(Runnable job)
372     {
373         if (!dispatch(job))
374             throw new RejectedExecutionException();
375     }
376 
377     /* ------------------------------------------------------------ */
378     /**
379      * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
380      */
381     public void join() throws InterruptedException
382     {
383         synchronized (_joinLock)
384         {
385             while (isRunning())
386                 _joinLock.wait();
387         }
388 
389         while (isStopping())
390             Thread.sleep(1);
391     }
392 
393     /* ------------------------------------------------------------ */
394     /**
395      * @return The total number of threads currently in the pool
396      */
397     public int getThreads()
398     {
399         return _threadsStarted.get();
400     }
401 
402     /* ------------------------------------------------------------ */
403     /**
404      * @return The number of idle threads in the pool
405      */
406     public int getIdleThreads()
407     {
408         return _threadsIdle.get();
409     }
410 
411     /* ------------------------------------------------------------ */
412     /**
413      * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
414      */
415     public boolean isLowOnThreads()
416     {
417         return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
418     }
419 
420     /* ------------------------------------------------------------ */
421     private boolean startThread(int threads)
422     {
423         final int next=threads+1;
424         if (!_threadsStarted.compareAndSet(threads,next))
425             return false;
426 
427         boolean started=false;
428         try
429         {
430             Thread thread=newThread(_runnable);
431             thread.setDaemon(_daemon);
432             thread.setPriority(_priority);
433             thread.setName(_name+"-"+thread.getId());
434             _threads.add(thread);
435 
436             thread.start();
437             started=true;
438         }
439         finally
440         {
441             if (!started)
442                 _threadsStarted.decrementAndGet();
443         }
444         return started;
445     }
446 
447     /* ------------------------------------------------------------ */
448     protected Thread newThread(Runnable runnable)
449     {
450         return new Thread(runnable);
451     }
452 
453 
454     /* ------------------------------------------------------------ */
455     public String dump()
456     {
457         return AggregateLifeCycle.dump(this);
458     }
459 
460     /* ------------------------------------------------------------ */
461     public void dump(Appendable out, String indent) throws IOException
462     {
463         List<Object> dump = new ArrayList<Object>(getMaxThreads());
464         for (final Thread thread: _threads)
465         {
466             final StackTraceElement[] trace=thread.getStackTrace();
467             boolean inIdleJobPoll=false;
468             // trace can be null on early java 6 jvms
469             if (trace != null)
470             {
471                 for (StackTraceElement t : trace)
472                 {
473                     if ("idleJobPoll".equals(t.getMethodName()))
474                     {
475                         inIdleJobPoll = true;
476                         break;
477                     }
478                 }
479             }
480             final boolean idle=inIdleJobPoll;
481 
482             if (_detailedDump)
483             {
484                 dump.add(new Dumpable()
485                 {
486                     public void dump(Appendable out, String indent) throws IOException
487                     {
488                         out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
489                         if (!idle)
490                             AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
491                     }
492 
493                     public String dump()
494                     {
495                         return null;
496                     }
497                 });
498             }
499             else
500             {
501                 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":""));
502             }
503         }
504 
505         AggregateLifeCycle.dumpObject(out,this);
506         AggregateLifeCycle.dump(out,indent,dump);
507 
508     }
509 
510 
511     /* ------------------------------------------------------------ */
512     @Override
513     public String toString()
514     {
515         return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
516     }
517 
518     /* ------------------------------------------------------------ */
519     private Runnable idleJobPoll() throws InterruptedException
520     {
521         return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
522     }
523 
524     /* ------------------------------------------------------------ */
525     private Runnable _runnable = new Runnable()
526     {
527         public void run()
528         {
529             boolean shrink=false;
530             try
531             {
532                 Runnable job=_jobs.poll();
533                 while (isRunning())
534                 {
535                     // Job loop
536                     while (job!=null && isRunning())
537                     {
538                         runJob(job);
539                         job=_jobs.poll();
540                     }
541 
542                     // Idle loop
543                     try
544                     {
545                         _threadsIdle.incrementAndGet();
546 
547                         while (isRunning() && job==null)
548                         {
549                             if (_maxIdleTimeMs<=0)
550                                 job=_jobs.take();
551                             else
552                             {
553                                 // maybe we should shrink?
554                                 final int size=_threadsStarted.get();
555                                 if (size>_minThreads)
556                                 {
557                                     long last=_lastShrink.get();
558                                     long now=System.currentTimeMillis();
559                                     if (last==0 || (now-last)>_maxIdleTimeMs)
560                                     {
561                                         shrink=_lastShrink.compareAndSet(last,now) &&
562                                         _threadsStarted.compareAndSet(size,size-1);
563                                         if (shrink)
564                                             return;
565                                     }
566                                 }
567                                 job=idleJobPoll();
568                             }
569                         }
570                     }
571                     finally
572                     {
573                         _threadsIdle.decrementAndGet();
574                     }
575                 }
576             }
577             catch(InterruptedException e)
578             {
579                 LOG.ignore(e);
580             }
581             catch(Exception e)
582             {
583                 LOG.warn(e);
584             }
585             finally
586             {
587                 if (!shrink)
588                     _threadsStarted.decrementAndGet();
589                 _threads.remove(Thread.currentThread());
590             }
591         }
592     };
593 
594     /* ------------------------------------------------------------ */
595     /**
596      * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
597      * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
598      *
599      * @param job the job to run
600      */
601     protected void runJob(Runnable job)
602     {
603         job.run();
604     }
605 
606     /* ------------------------------------------------------------ */
607     /**
608      * @return the job queue
609      */
610     protected BlockingQueue<Runnable> getQueue()
611     {
612         return _jobs;
613     }
614 
615     /* ------------------------------------------------------------ */
616     /**
617      * @param id The thread ID to stop.
618      * @return true if the thread was found and stopped.
619      * @deprecated Use {@link #interruptThread(long)} in preference
620      */
621     @Deprecated
622     public boolean stopThread(long id)
623     {
624         for (Thread thread: _threads)
625         {
626             if (thread.getId()==id)
627             {
628                 thread.stop();
629                 return true;
630             }
631         }
632         return false;
633     }
634 
635     /* ------------------------------------------------------------ */
636     /**
637      * @param id The thread ID to interrupt.
638      * @return true if the thread was found and interrupted.
639      */
640     public boolean interruptThread(long id)
641     {
642         for (Thread thread: _threads)
643         {
644             if (thread.getId()==id)
645             {
646                 thread.interrupt();
647                 return true;
648             }
649         }
650         return false;
651     }
652 
653     /* ------------------------------------------------------------ */
654     /**
655      * @param id The thread ID to interrupt.
656      * @return true if the thread was found and interrupted.
657      */
658     public String dumpThread(long id)
659     {
660         for (Thread thread: _threads)
661         {
662             if (thread.getId()==id)
663             {
664                 StringBuilder buf = new StringBuilder();
665                 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
666                 for (StackTraceElement element : thread.getStackTrace())
667                     buf.append("  at ").append(element.toString()).append('\n');
668                 return buf.toString();
669             }
670         }
671         return null;
672     }
673 }