View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.util.thread;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.List;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.RejectedExecutionException;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicInteger;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import org.eclipse.jetty.util.BlockingArrayQueue;
33  import org.eclipse.jetty.util.ConcurrentHashSet;
34  import org.eclipse.jetty.util.annotation.ManagedAttribute;
35  import org.eclipse.jetty.util.annotation.ManagedObject;
36  import org.eclipse.jetty.util.annotation.ManagedOperation;
37  import org.eclipse.jetty.util.annotation.Name;
38  import org.eclipse.jetty.util.component.AbstractLifeCycle;
39  import org.eclipse.jetty.util.component.ContainerLifeCycle;
40  import org.eclipse.jetty.util.component.Dumpable;
41  import org.eclipse.jetty.util.component.LifeCycle;
42  import org.eclipse.jetty.util.log.Log;
43  import org.eclipse.jetty.util.log.Logger;
44  import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
45  
46  @ManagedObject("A thread pool with no max bound by default")
47  public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
48  {
49      private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
50  
51      private final AtomicInteger _threadsStarted = new AtomicInteger();
52      private final AtomicInteger _threadsIdle = new AtomicInteger();
53      private final AtomicLong _lastShrink = new AtomicLong();
54      private final ConcurrentHashSet<Thread> _threads=new ConcurrentHashSet<Thread>();
55      private final Object _joinLock = new Object();
56      private final BlockingQueue<Runnable> _jobs;
57      private final ThreadGroup _threadGroup;
58      private String _name = "qtp" + hashCode();
59      private int _idleTimeout;
60      private int _maxThreads;
61      private int _minThreads;
62      private int _priority = Thread.NORM_PRIORITY;
63      private boolean _daemon = false;
64      private boolean _detailedDump = false;
65  
66      public QueuedThreadPool()
67      {
68          this(200);
69      }
70  
71      public QueuedThreadPool(@Name("maxThreads") int maxThreads)
72      {
73          this(maxThreads, 8);
74      }
75  
76      public QueuedThreadPool(@Name("maxThreads") int maxThreads,  @Name("minThreads") int minThreads)
77      {
78          this(maxThreads, minThreads, 60000);
79      }
80  
81      public QueuedThreadPool(@Name("maxThreads") int maxThreads,  @Name("minThreads") int minThreads, @Name("idleTimeout")int idleTimeout)
82      {
83          this(maxThreads, minThreads, idleTimeout, null);
84      }
85  
86      public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
87      {
88          this(maxThreads, minThreads, idleTimeout, queue, null);
89      }
90  
91      public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
92      {
93          setMinThreads(minThreads);
94          setMaxThreads(maxThreads);
95          setIdleTimeout(idleTimeout);
96          setStopTimeout(5000);
97  
98          if (queue==null)
99          {
100             int capacity=Math.max(_minThreads, 8);
101             queue=new BlockingArrayQueue<>(capacity, capacity);
102         }
103         _jobs=queue;
104         _threadGroup=threadGroup;
105     }
106 
107     @Override
108     protected void doStart() throws Exception
109     {
110         super.doStart();
111         _threadsStarted.set(0);
112 
113         startThreads(_minThreads);
114     }
115 
116     @Override
117     protected void doStop() throws Exception
118     {
119         super.doStop();
120 
121         long timeout = getStopTimeout();
122         BlockingQueue<Runnable> jobs = getQueue();
123 
124         // If no stop timeout, clear job queue
125         if (timeout <= 0)
126             jobs.clear();
127 
128         // Fill job Q with noop jobs to wakeup idle
129         Runnable noop = new Runnable()
130         {
131             @Override
132             public void run()
133             {
134             }
135         };
136         for (int i = _threadsStarted.get(); i-- > 0; )
137             jobs.offer(noop);
138 
139         // try to jobs complete naturally for half our stop time
140         long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
141         for (Thread thread : _threads)
142         {
143             long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
144             if (canwait > 0)
145                 thread.join(canwait);
146         }
147 
148         // If we still have threads running, get a bit more aggressive
149 
150         // interrupt remaining threads
151         if (_threadsStarted.get() > 0)
152             for (Thread thread : _threads)
153                 thread.interrupt();
154 
155         // wait again for the other half of our stop time
156         stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
157         for (Thread thread : _threads)
158         {
159             long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
160             if (canwait > 0)
161                 thread.join(canwait);
162         }
163 
164         Thread.yield();
165         int size = _threads.size();
166         if (size > 0)
167         {
168             Thread.yield();
169             
170             if (LOG.isDebugEnabled())
171             {
172                 for (Thread unstopped : _threads)
173                 {
174                     StringBuilder dmp = new StringBuilder();
175                     for (StackTraceElement element : unstopped.getStackTrace())
176                     {
177                         dmp.append(System.lineSeparator()).append("\tat ").append(element);
178                     }
179                     LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
180                 }
181             }
182             else
183             {
184                 for (Thread unstopped : _threads)
185                     LOG.warn("{} Couldn't stop {}",this,unstopped);
186             }
187         }
188 
189         synchronized (_joinLock)
190         {
191             _joinLock.notifyAll();
192         }
193     }
194 
195     /**
196      * Thread Pool should use Daemon Threading. 
197      *
198      * @param daemon true to enable delegation
199      * @see Thread#setDaemon(boolean)
200      */
201     public void setDaemon(boolean daemon)
202     {
203         _daemon = daemon;
204     }
205 
206     /**
207      * Set the maximum thread idle time.
208      * Threads that are idle for longer than this period may be
209      * stopped.
210      * Delegated to the named or anonymous Pool.
211      *
212      * @param idleTimeout Max idle time in ms.
213      * @see #getIdleTimeout
214      */
215     public void setIdleTimeout(int idleTimeout)
216     {
217         _idleTimeout = idleTimeout;
218     }
219 
220     /**
221      * Set the maximum number of threads.
222      * Delegated to the named or anonymous Pool.
223      *
224      * @param maxThreads maximum number of threads.
225      * @see #getMaxThreads
226      */
227     @Override
228     public void setMaxThreads(int maxThreads)
229     {
230         _maxThreads = maxThreads;
231         if (_minThreads > _maxThreads)
232             _minThreads = _maxThreads;
233     }
234 
235     /**
236      * Set the minimum number of threads.
237      * Delegated to the named or anonymous Pool.
238      *
239      * @param minThreads minimum number of threads
240      * @see #getMinThreads
241      */
242     @Override
243     public void setMinThreads(int minThreads)
244     {
245         _minThreads = minThreads;
246 
247         if (_minThreads > _maxThreads)
248             _maxThreads = _minThreads;
249 
250         int threads = _threadsStarted.get();
251         if (isStarted() && threads < _minThreads)
252             startThreads(_minThreads - threads);
253     }
254 
255     /**
256      * @param name Name of this thread pool to use when naming threads.
257      */
258     public void setName(String name)
259     {
260         if (isRunning())
261             throw new IllegalStateException("started");
262         _name = name;
263     }
264 
265     /**
266      * Set the priority of the pool threads.
267      *
268      * @param priority the new thread priority.
269      */
270     public void setThreadsPriority(int priority)
271     {
272         _priority = priority;
273     }
274 
275     /**
276      * Get the maximum thread idle time.
277      * Delegated to the named or anonymous Pool.
278      *
279      * @return Max idle time in ms.
280      * @see #setIdleTimeout
281      */
282     @ManagedAttribute("maximum time a thread may be idle in ms")
283     public int getIdleTimeout()
284     {
285         return _idleTimeout;
286     }
287 
288     /**
289      * Set the maximum number of threads.
290      * Delegated to the named or anonymous Pool.
291      *
292      * @return maximum number of threads.
293      * @see #setMaxThreads
294      */
295     @Override
296     @ManagedAttribute("maximum number of threads in the pool")
297     public int getMaxThreads()
298     {
299         return _maxThreads;
300     }
301 
302     /**
303      * Get the minimum number of threads.
304      * Delegated to the named or anonymous Pool.
305      *
306      * @return minimum number of threads.
307      * @see #setMinThreads
308      */
309     @Override
310     @ManagedAttribute("minimum number of threads in the pool")
311     public int getMinThreads()
312     {
313         return _minThreads;
314     }
315 
316     /**
317      * @return The name of the this thread pool
318      */
319     @ManagedAttribute("name of the thread pool")
320     public String getName()
321     {
322         return _name;
323     }
324 
325     /**
326      * Get the priority of the pool threads.
327      *
328      * @return the priority of the pool threads.
329      */
330     @ManagedAttribute("priority of threads in the pool")
331     public int getThreadsPriority()
332     {
333         return _priority;
334     }
335     
336     /**
337      * Get the size of the job queue.
338      * 
339      * @return Number of jobs queued waiting for a thread
340      */
341     @ManagedAttribute("Size of the job queue")
342     public int getQueueSize()
343     {
344         return _jobs.size();
345     }
346 
347     /**
348      * Is thread pool using daemon threading
349      * 
350      * @return true if delegating to named or anonymous pool
351      * @see Thread#setDaemon(boolean)
352      */
353     @ManagedAttribute("thead pool using a daemon thread")
354     public boolean isDaemon()
355     {
356         return _daemon;
357     }
358 
359     public boolean isDetailedDump()
360     {
361         return _detailedDump;
362     }
363 
364     public void setDetailedDump(boolean detailedDump)
365     {
366         _detailedDump = detailedDump;
367     }
368     
369     @Override
370     public void execute(Runnable job)
371     {
372         if (LOG.isDebugEnabled())
373             LOG.debug("queue {}",job);
374         if (!isRunning() || !_jobs.offer(job))
375         {
376             LOG.warn("{} rejected {}", this, job);
377             throw new RejectedExecutionException(job.toString());
378         }
379         else
380         {
381             // Make sure there is at least one thread executing the job.
382             if (getThreads() == 0)
383                 startThreads(1);
384         }
385     }
386 
387     /**
388      * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
389      */
390     @Override
391     public void join() throws InterruptedException
392     {
393         synchronized (_joinLock)
394         {
395             while (isRunning())
396                 _joinLock.wait();
397         }
398 
399         while (isStopping())
400             Thread.sleep(1);
401     }
402 
403     /**
404      * @return The total number of threads currently in the pool
405      */
406     @Override
407     @ManagedAttribute("total number of threads currently in the pool")
408     public int getThreads()
409     {
410         return _threadsStarted.get();
411     }
412 
413     /**
414      * @return The number of idle threads in the pool
415      */
416     @Override
417     @ManagedAttribute("total number of idle threads in the pool")
418     public int getIdleThreads()
419     {
420         return _threadsIdle.get();
421     }
422 
423     /**
424      * @return The number of busy threads in the pool
425      */
426     @ManagedAttribute("total number of busy threads in the pool")
427     public int getBusyThreads()
428     {
429         return getThreads() - getIdleThreads();
430     }
431     
432     /**
433      * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
434      */
435     @Override
436     @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
437     public boolean isLowOnThreads()
438     {
439         return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
440     }
441 
442     private boolean startThreads(int threadsToStart)
443     {
444         while (threadsToStart > 0 && isRunning())
445         {
446             int threads = _threadsStarted.get();
447             if (threads >= _maxThreads)
448                 return false;
449 
450             if (!_threadsStarted.compareAndSet(threads, threads + 1))
451                 continue;
452 
453             boolean started = false;
454             try
455             {
456                 Thread thread = newThread(_runnable);
457                 thread.setDaemon(isDaemon());
458                 thread.setPriority(getThreadsPriority());
459                 thread.setName(_name + "-" + thread.getId());
460                 _threads.add(thread);
461 
462                 thread.start();
463                 started = true;
464                 --threadsToStart;
465             }
466             finally
467             {
468                 if (!started)
469                     _threadsStarted.decrementAndGet();
470             }
471         }
472         return true;
473     }
474 
475     protected Thread newThread(Runnable runnable)
476     {
477         return new Thread(_threadGroup, runnable);
478     }
479 
480     @Override
481     @ManagedOperation("dump thread state")
482     public String dump()
483     {
484         return ContainerLifeCycle.dump(this);
485     }
486 
487     @Override
488     public void dump(Appendable out, String indent) throws IOException
489     {
490         List<Object> dump = new ArrayList<>(getMaxThreads());
491         for (final Thread thread : _threads)
492         {
493             final StackTraceElement[] trace = thread.getStackTrace();
494             boolean inIdleJobPoll = false;
495             for (StackTraceElement t : trace)
496             {
497                 if ("idleJobPoll".equals(t.getMethodName()))
498                 {
499                     inIdleJobPoll = true;
500                     break;
501                 }
502             }
503             final boolean idle = inIdleJobPoll;
504 
505             if (isDetailedDump())
506             {
507                 dump.add(new Dumpable()
508                 {
509                     @Override
510                     public void dump(Appendable out, String indent) throws IOException
511                     {
512                         out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "");
513                         if (thread.getPriority()!=Thread.NORM_PRIORITY)
514                             out.append(" prio=").append(String.valueOf(thread.getPriority()));
515                         out.append(System.lineSeparator());
516                         if (!idle)
517                             ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
518                     }
519 
520                     @Override
521                     public String dump()
522                     {
523                         return null;
524                     }
525                 });
526             }
527             else
528             {
529                 int p=thread.getPriority();
530                 dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p)));
531             }
532         }
533 
534         ContainerLifeCycle.dumpObject(out, this);
535         ContainerLifeCycle.dump(out, indent, dump);
536     }
537 
538     @Override
539     public String toString()
540     {
541         return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
542     }
543 
544     private Runnable idleJobPoll() throws InterruptedException
545     {
546         return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
547     }
548 
549     private Runnable _runnable = new Runnable()
550     {
551         @Override
552         public void run()
553         {
554             boolean shrink = false;
555             boolean ignore = false;
556             try
557             {
558                 Runnable job = _jobs.poll();
559 
560                 if (job != null && _threadsIdle.get() == 0)
561                 {
562                     startThreads(1);
563                 }
564 
565                 loop: while (isRunning())
566                 {
567                     // Job loop
568                     while (job != null && isRunning())
569                     {
570                         if (LOG.isDebugEnabled())
571                             LOG.debug("run {}",job);
572                         runJob(job);
573                         if (LOG.isDebugEnabled())
574                             LOG.debug("ran {}",job);
575                         if (Thread.interrupted())
576                         {
577                             ignore=true;
578                             break loop;
579                         }
580                         job = _jobs.poll();
581                     }
582 
583                     // Idle loop
584                     try
585                     {
586                         _threadsIdle.incrementAndGet();
587 
588                         while (isRunning() && job == null)
589                         {
590                             if (_idleTimeout <= 0)
591                                 job = _jobs.take();
592                             else
593                             {
594                                 // maybe we should shrink?
595                                 final int size = _threadsStarted.get();
596                                 if (size > _minThreads)
597                                 {
598                                     long last = _lastShrink.get();
599                                     long now = System.nanoTime();
600                                     if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
601                                     {
602                                         if (_lastShrink.compareAndSet(last, now) && _threadsStarted.compareAndSet(size, size - 1))
603                                         {
604                                             shrink=true;
605                                             break loop;
606                                         }
607                                     }
608                                 }
609                                 job = idleJobPoll();
610                             }
611                         }
612                     }
613                     finally
614                     {
615                         if (_threadsIdle.decrementAndGet() == 0)
616                         {
617                             startThreads(1);
618                         }
619                     }
620                 }
621             }
622             catch (InterruptedException e)
623             {
624                 ignore=true;
625                 LOG.ignore(e);
626             }
627             catch (Throwable e)
628             {
629                 LOG.warn(e);
630             }
631             finally
632             {
633                 if (!shrink && isRunning())
634                 {
635                     if (!ignore)
636                         LOG.warn("Unexpected thread death: {} in {}",this,QueuedThreadPool.this);
637                     // This is an unexpected thread death!
638                     if (_threadsStarted.decrementAndGet()<getMaxThreads())
639                         startThreads(1);
640                 }
641                 _threads.remove(Thread.currentThread());
642             }
643         }
644     };
645 
646     /**
647      * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
648      * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
649      *
650      * @param job the job to run
651      */
652     protected void runJob(Runnable job)
653     {
654         job.run();
655     }
656 
657     /**
658      * @return the job queue
659      */
660     protected BlockingQueue<Runnable> getQueue()
661     {
662         return _jobs;
663     }
664 
665     /**
666      * @param queue the job queue
667      */
668     public void setQueue(BlockingQueue<Runnable> queue)
669     {
670         throw new UnsupportedOperationException("Use constructor injection");
671     }
672 
673     /**
674      * @param id The thread ID to interrupt.
675      * @return true if the thread was found and interrupted.
676      */
677     @ManagedOperation("interrupt a pool thread")
678     public boolean interruptThread(@Name("id") long id)
679     {
680         for (Thread thread : _threads)
681         {
682             if (thread.getId() == id)
683             {
684                 thread.interrupt();
685                 return true;
686             }
687         }
688         return false;
689     }
690 
691     /**
692      * @param id The thread ID to interrupt.
693      * @return true if the thread was found and interrupted.
694      */
695     @ManagedOperation("dump a pool thread stack")
696     public String dumpThread(@Name("id") long id)
697     {
698         for (Thread thread : _threads)
699         {
700             if (thread.getId() == id)
701             {
702                 StringBuilder buf = new StringBuilder();
703                 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
704                 buf.append(thread.getState()).append(":").append(System.lineSeparator());
705                 for (StackTraceElement element : thread.getStackTrace())
706                     buf.append("  at ").append(element.toString()).append(System.lineSeparator());
707                 return buf.toString();
708             }
709         }
710         return null;
711     }
712 }