View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.util.thread;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.List;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.RejectedExecutionException;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.atomic.AtomicInteger;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.eclipse.jetty.util.BlockingArrayQueue;
34  import org.eclipse.jetty.util.StringUtil;
35  import org.eclipse.jetty.util.annotation.ManagedAttribute;
36  import org.eclipse.jetty.util.annotation.ManagedObject;
37  import org.eclipse.jetty.util.annotation.ManagedOperation;
38  import org.eclipse.jetty.util.annotation.Name;
39  import org.eclipse.jetty.util.component.AbstractLifeCycle;
40  import org.eclipse.jetty.util.component.ContainerLifeCycle;
41  import org.eclipse.jetty.util.component.Dumpable;
42  import org.eclipse.jetty.util.log.Log;
43  import org.eclipse.jetty.util.log.Logger;
44  import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
45  
46  @ManagedObject("A thread pool with no max bound by default")
47  public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
48  {
49      private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
50  
51      private final AtomicInteger _threadsStarted = new AtomicInteger();
52      private final AtomicInteger _threadsIdle = new AtomicInteger();
53      private final AtomicLong _lastShrink = new AtomicLong();
54      private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
55      private final Object _joinLock = new Object();
56      private final BlockingQueue<Runnable> _jobs;
57      private String _name = "qtp" + hashCode();
58      private int _idleTimeout;
59      private int _maxThreads;
60      private int _minThreads;
61      private int _priority = Thread.NORM_PRIORITY;
62      private boolean _daemon = false;
63      private boolean _detailedDump = false;
64  
65      public QueuedThreadPool()
66      {
67          this(200);
68      }
69  
70      public QueuedThreadPool(@Name("maxThreads") int maxThreads)
71      {
72          this(maxThreads, 8);
73      }
74  
75      public QueuedThreadPool(@Name("maxThreads") int maxThreads,  @Name("minThreads") int minThreads)
76      {
77          this(maxThreads, minThreads, 60000);
78      }
79  
80      public QueuedThreadPool(@Name("maxThreads") int maxThreads,  @Name("minThreads") int minThreads, @Name("idleTimeout")int idleTimeout)
81      {
82          this(maxThreads, minThreads, idleTimeout, null);
83      }
84  
85      public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
86      {
87          setMinThreads(minThreads);
88          setMaxThreads(maxThreads);
89          setIdleTimeout(idleTimeout);
90          setStopTimeout(5000);
91  
92          if (queue==null)
93              queue=new BlockingArrayQueue<>(_minThreads, _minThreads);
94          _jobs=queue;
95  
96      }
97  
98      @Override
99      protected void doStart() throws Exception
100     {
101         super.doStart();
102         _threadsStarted.set(0);
103 
104         startThreads(_minThreads);
105     }
106 
107     @Override
108     protected void doStop() throws Exception
109     {
110         super.doStop();
111 
112         long timeout = getStopTimeout();
113         BlockingQueue<Runnable> jobs = getQueue();
114 
115         // If no stop timeout, clear job queue
116         if (timeout <= 0)
117             jobs.clear();
118 
119         // Fill job Q with noop jobs to wakeup idle
120         Runnable noop = new Runnable()
121         {
122             @Override
123             public void run()
124             {
125             }
126         };
127         for (int i = _threadsStarted.get(); i-- > 0; )
128             jobs.offer(noop);
129 
130         // try to jobs complete naturally for half our stop time
131         long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
132         for (Thread thread : _threads)
133         {
134             long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
135             if (canwait > 0)
136                 thread.join(canwait);
137         }
138 
139         // If we still have threads running, get a bit more aggressive
140 
141         // interrupt remaining threads
142         if (_threadsStarted.get() > 0)
143             for (Thread thread : _threads)
144                 thread.interrupt();
145 
146         // wait again for the other half of our stop time
147         stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
148         for (Thread thread : _threads)
149         {
150             long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
151             if (canwait > 0)
152                 thread.join(canwait);
153         }
154 
155         Thread.yield();
156         int size = _threads.size();
157         if (size > 0)
158         {
159             Thread.yield();
160             
161             if (LOG.isDebugEnabled())
162             {
163                 for (Thread unstopped : _threads)
164                 {
165                     StringBuilder dmp = new StringBuilder();
166                     for (StackTraceElement element : unstopped.getStackTrace())
167                     {
168                         dmp.append(StringUtil.__LINE_SEPARATOR).append("\tat ").append(element);
169                     }
170                     LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
171                 }
172             }
173             else
174             {
175                 for (Thread unstopped : _threads)
176                     LOG.warn("{} Couldn't stop {}",this,unstopped);
177             }
178         }
179 
180         synchronized (_joinLock)
181         {
182             _joinLock.notifyAll();
183         }
184     }
185 
186     /**
187      * Delegated to the named or anonymous Pool.
188      */
189     public void setDaemon(boolean daemon)
190     {
191         _daemon = daemon;
192     }
193 
194     /**
195      * Set the maximum thread idle time.
196      * Threads that are idle for longer than this period may be
197      * stopped.
198      * Delegated to the named or anonymous Pool.
199      *
200      * @param idleTimeout Max idle time in ms.
201      * @see #getIdleTimeout
202      */
203     public void setIdleTimeout(int idleTimeout)
204     {
205         _idleTimeout = idleTimeout;
206     }
207 
208     /**
209      * Set the maximum number of threads.
210      * Delegated to the named or anonymous Pool.
211      *
212      * @param maxThreads maximum number of threads.
213      * @see #getMaxThreads
214      */
215     @Override
216     public void setMaxThreads(int maxThreads)
217     {
218         _maxThreads = maxThreads;
219         if (_minThreads > _maxThreads)
220             _minThreads = _maxThreads;
221     }
222 
223     /**
224      * Set the minimum number of threads.
225      * Delegated to the named or anonymous Pool.
226      *
227      * @param minThreads minimum number of threads
228      * @see #getMinThreads
229      */
230     @Override
231     public void setMinThreads(int minThreads)
232     {
233         _minThreads = minThreads;
234 
235         if (_minThreads > _maxThreads)
236             _maxThreads = _minThreads;
237 
238         int threads = _threadsStarted.get();
239         if (isStarted() && threads < _minThreads)
240             startThreads(_minThreads - threads);
241     }
242 
243     /**
244      * @param name Name of this thread pool to use when naming threads.
245      */
246     public void setName(String name)
247     {
248         if (isRunning())
249             throw new IllegalStateException("started");
250         _name = name;
251     }
252 
253     /**
254      * Set the priority of the pool threads.
255      *
256      * @param priority the new thread priority.
257      */
258     public void setThreadsPriority(int priority)
259     {
260         _priority = priority;
261     }
262 
263     /**
264      * Get the maximum thread idle time.
265      * Delegated to the named or anonymous Pool.
266      *
267      * @return Max idle time in ms.
268      * @see #setIdleTimeout
269      */
270     @ManagedAttribute("maximum time a thread may be idle in ms")
271     public int getIdleTimeout()
272     {
273         return _idleTimeout;
274     }
275 
276     /**
277      * Set the maximum number of threads.
278      * Delegated to the named or anonymous Pool.
279      *
280      * @return maximum number of threads.
281      * @see #setMaxThreads
282      */
283     @Override
284     @ManagedAttribute("maximum number of threads in the pool")
285     public int getMaxThreads()
286     {
287         return _maxThreads;
288     }
289 
290     /**
291      * Get the minimum number of threads.
292      * Delegated to the named or anonymous Pool.
293      *
294      * @return minimum number of threads.
295      * @see #setMinThreads
296      */
297     @Override
298     @ManagedAttribute("minimum number of threads in the pool")
299     public int getMinThreads()
300     {
301         return _minThreads;
302     }
303 
304     /**
305      * @return The name of the this thread pool
306      */
307     @ManagedAttribute("name of the thread pool")
308     public String getName()
309     {
310         return _name;
311     }
312 
313     /**
314      * Get the priority of the pool threads.
315      *
316      * @return the priority of the pool threads.
317      */
318     @ManagedAttribute("priority of threads in the pool")
319     public int getThreadsPriority()
320     {
321         return _priority;
322     }
323     
324     /**
325      * Get the size of the job queue.
326      * 
327      * @return Number of jobs queued waiting for a thread
328      */
329     @ManagedAttribute("Size of the job queue")
330     public int getQueueSize()
331     {
332         return _jobs.size();
333     }
334 
335     /**
336      * Delegated to the named or anonymous Pool.
337      */
338     @ManagedAttribute("thead pool using a daemon thread")
339     public boolean isDaemon()
340     {
341         return _daemon;
342     }
343 
344     public boolean isDetailedDump()
345     {
346         return _detailedDump;
347     }
348 
349     public void setDetailedDump(boolean detailedDump)
350     {
351         _detailedDump = detailedDump;
352     }
353     
354     @Override
355     public void execute(Runnable job)
356     {
357         if (!isRunning() || !_jobs.offer(job))
358         {
359             LOG.warn("{} rejected {}", this, job);
360             throw new RejectedExecutionException(job.toString());
361         }
362     }
363 
364     /**
365      * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
366      */
367     @Override
368     public void join() throws InterruptedException
369     {
370         synchronized (_joinLock)
371         {
372             while (isRunning())
373                 _joinLock.wait();
374         }
375 
376         while (isStopping())
377             Thread.sleep(1);
378     }
379 
380     /**
381      * @return The total number of threads currently in the pool
382      */
383     @Override
384     @ManagedAttribute("total number of threads currently in the pool")
385     public int getThreads()
386     {
387         return _threadsStarted.get();
388     }
389 
390     /**
391      * @return The number of idle threads in the pool
392      */
393     @Override
394     @ManagedAttribute("total number of idle threads in the pool")
395     public int getIdleThreads()
396     {
397         return _threadsIdle.get();
398     }
399 
400     /**
401      * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
402      */
403     @Override
404     @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
405     public boolean isLowOnThreads()
406     {
407         return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
408     }
409 
410     private boolean startThreads(int threadsToStart)
411     {
412         while (threadsToStart > 0 && isRunning())
413         {
414             int threads = _threadsStarted.get();
415             if (threads >= _maxThreads)
416                 return false;
417 
418             if (!_threadsStarted.compareAndSet(threads, threads + 1))
419                 continue;
420 
421             boolean started = false;
422             try
423             {
424                 Thread thread = newThread(_runnable);
425                 thread.setDaemon(isDaemon());
426                 thread.setPriority(getThreadsPriority());
427                 thread.setName(_name + "-" + thread.getId());
428                 _threads.add(thread);
429 
430                 thread.start();
431                 started = true;
432             }
433             finally
434             {
435                 if (!started)
436                     _threadsStarted.decrementAndGet();
437             }
438             if (started)
439                 threadsToStart--;
440         }
441         return true;
442     }
443 
444     protected Thread newThread(Runnable runnable)
445     {
446         return new Thread(runnable);
447     }
448 
449 
450     @Override
451     @ManagedOperation("dump thread state")
452     public String dump()
453     {
454         return ContainerLifeCycle.dump(this);
455     }
456 
457     @Override
458     public void dump(Appendable out, String indent) throws IOException
459     {
460         List<Object> dump = new ArrayList<>(getMaxThreads());
461         for (final Thread thread : _threads)
462         {
463             final StackTraceElement[] trace = thread.getStackTrace();
464             boolean inIdleJobPoll = false;
465             for (StackTraceElement t : trace)
466             {
467                 if ("idleJobPoll".equals(t.getMethodName()))
468                 {
469                     inIdleJobPoll = true;
470                     break;
471                 }
472             }
473             final boolean idle = inIdleJobPoll;
474 
475             if (isDetailedDump())
476             {
477                 dump.add(new Dumpable()
478                 {
479                     @Override
480                     public void dump(Appendable out, String indent) throws IOException
481                     {
482                         out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "");
483                         if (thread.getPriority()!=Thread.NORM_PRIORITY)
484                             out.append(" prio="+thread.getPriority());
485                         out.append('\n');
486                         if (!idle)
487                             ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
488                     }
489 
490                     @Override
491                     public String dump()
492                     {
493                         return null;
494                     }
495                 });
496             }
497             else
498             {
499                 int p=thread.getPriority();
500                 dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p)));
501             }
502         }
503 
504         ContainerLifeCycle.dumpObject(out, this);
505         ContainerLifeCycle.dump(out, indent, dump);
506     }
507 
508     @Override
509     public String toString()
510     {
511         return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
512     }
513 
514     private Runnable idleJobPoll() throws InterruptedException
515     {
516         return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
517     }
518 
519     private Runnable _runnable = new Runnable()
520     {
521         @Override
522         public void run()
523         {
524             boolean shrink = false;
525             boolean ignore = false;
526             try
527             {
528                 Runnable job = _jobs.poll();
529 
530                 if (job != null && _threadsIdle.get() == 0)
531                 {
532                     startThreads(1);
533                 }
534 
535                 loop: while (isRunning())
536                 {
537                     // Job loop
538                     while (job != null && isRunning())
539                     {
540                         runJob(job);
541                         if (Thread.interrupted())
542                         {
543                             ignore=true;
544                             break loop;
545                         }
546                         job = _jobs.poll();
547                     }
548 
549                     // Idle loop
550                     try
551                     {
552                         _threadsIdle.incrementAndGet();
553 
554                         while (isRunning() && job == null)
555                         {
556                             if (_idleTimeout <= 0)
557                                 job = _jobs.take();
558                             else
559                             {
560                                 // maybe we should shrink?
561                                 final int size = _threadsStarted.get();
562                                 if (size > _minThreads)
563                                 {
564                                     long last = _lastShrink.get();
565                                     long now = System.nanoTime();
566                                     if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
567                                     {
568                                         if (_lastShrink.compareAndSet(last, now) && _threadsStarted.compareAndSet(size, size - 1))
569                                         {
570                                             shrink=true;
571                                             break loop;
572                                         }
573                                     }
574                                 }
575                                 job = idleJobPoll();
576                             }
577                         }
578                     }
579                     finally
580                     {
581                         if (_threadsIdle.decrementAndGet() == 0)
582                         {
583                             startThreads(1);
584                         }
585                     }
586                 }
587             }
588             catch (InterruptedException e)
589             {
590                 ignore=true;
591                 LOG.ignore(e);
592             }
593             catch (Throwable e)
594             {
595                 LOG.warn(e);
596             }
597             finally
598             {
599                 if (!shrink && isRunning())
600                 {
601                     if (!ignore)
602                         LOG.warn("Unexpected thread death: {} in {}",this,QueuedThreadPool.this);
603                     // This is an unexpected thread death!
604                     if (_threadsStarted.decrementAndGet()<getMaxThreads())
605                         startThreads(1);
606                 }
607                 _threads.remove(Thread.currentThread());
608             }
609         }
610     };
611 
612     /**
613      * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
614      * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
615      *
616      * @param job the job to run
617      */
618     protected void runJob(Runnable job)
619     {
620         job.run();
621     }
622 
623     /**
624      * @return the job queue
625      */
626     protected BlockingQueue<Runnable> getQueue()
627     {
628         return _jobs;
629     }
630 
631     /**
632      * @param queue the job queue
633      */
634     public void setQueue(BlockingQueue<Runnable> queue)
635     {
636         throw new UnsupportedOperationException("Use constructor injection");
637     }
638 
639     /**
640      * @param id The thread ID to interrupt.
641      * @return true if the thread was found and interrupted.
642      */
643     @ManagedOperation("interrupt a pool thread")
644     public boolean interruptThread(@Name("id") long id)
645     {
646         for (Thread thread : _threads)
647         {
648             if (thread.getId() == id)
649             {
650                 thread.interrupt();
651                 return true;
652             }
653         }
654         return false;
655     }
656 
657     /**
658      * @param id The thread ID to interrupt.
659      * @return true if the thread was found and interrupted.
660      */
661     @ManagedOperation("dump a pool thread stack")
662     public String dumpThread(@Name("id") long id)
663     {
664         for (Thread thread : _threads)
665         {
666             if (thread.getId() == id)
667             {
668                 StringBuilder buf = new StringBuilder();
669                 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
670                 for (StackTraceElement element : thread.getStackTrace())
671                     buf.append("  at ").append(element.toString()).append('\n');
672                 return buf.toString();
673             }
674         }
675         return null;
676     }
677     
678 
679 }