View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.util.thread;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.List;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.RejectedExecutionException;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.atomic.AtomicInteger;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.eclipse.jetty.util.BlockingArrayQueue;
34  import org.eclipse.jetty.util.StringUtil;
35  import org.eclipse.jetty.util.annotation.ManagedAttribute;
36  import org.eclipse.jetty.util.annotation.ManagedObject;
37  import org.eclipse.jetty.util.annotation.ManagedOperation;
38  import org.eclipse.jetty.util.annotation.Name;
39  import org.eclipse.jetty.util.component.AbstractLifeCycle;
40  import org.eclipse.jetty.util.component.ContainerLifeCycle;
41  import org.eclipse.jetty.util.component.Dumpable;
42  import org.eclipse.jetty.util.component.LifeCycle;
43  import org.eclipse.jetty.util.log.Log;
44  import org.eclipse.jetty.util.log.Logger;
45  import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
46  
47  @ManagedObject("A thread pool with no max bound by default")
48  public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
49  {
50      private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
51  
52      private final AtomicInteger _threadsStarted = new AtomicInteger();
53      private final AtomicInteger _threadsIdle = new AtomicInteger();
54      private final AtomicLong _lastShrink = new AtomicLong();
55      private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
56      private final Object _joinLock = new Object();
57      private BlockingQueue<Runnable> _jobs;
58      private String _name = "qtp" + hashCode();
59      private int _idleTimeout;
60      private int _maxThreads;
61      private int _minThreads;
62      private int _priority = Thread.NORM_PRIORITY;
63      private boolean _daemon = false;
64      private boolean _detailedDump = false;
65  
66      public QueuedThreadPool()
67      {
68          this(200);
69      }
70  
71      public QueuedThreadPool(int maxThreads)
72      {
73          this(maxThreads, 8);
74      }
75  
76      public QueuedThreadPool(int maxThreads, int minThreads)
77      {
78          this(maxThreads, minThreads, 60000);
79      }
80  
81      public QueuedThreadPool(int maxThreads, int minThreads, int idleTimeout)
82      {
83          setMinThreads(minThreads);
84          setMaxThreads(maxThreads);
85          setIdleTimeout(idleTimeout);
86          setStopTimeout(5000);
87      }
88  
89      @Override
90      protected void doStart() throws Exception
91      {
92          super.doStart();
93          _threadsStarted.set(0);
94  
95          if (_jobs == null)
96              setQueue(new BlockingArrayQueue<Runnable>(_minThreads, _minThreads));
97  
98          startThreads(_minThreads);
99      }
100 
101     @Override
102     protected void doStop() throws Exception
103     {
104         super.doStop();
105 
106         long timeout = getStopTimeout();
107         BlockingQueue<Runnable> jobs = getQueue();
108 
109         // If no stop timeout, clear job queue
110         if (timeout <= 0)
111             jobs.clear();
112 
113         // Fill job Q with noop jobs to wakeup idle
114         Runnable noop = new Runnable()
115         {
116             @Override
117             public void run()
118             {
119             }
120         };
121         for (int i = _threadsStarted.get(); i-- > 0; )
122             jobs.offer(noop);
123 
124         // try to jobs complete naturally for half our stop time
125         long stopby = System.currentTimeMillis() + timeout / 2;
126         for (Thread thread : _threads)
127         {
128             long canwait = stopby - System.currentTimeMillis();
129             if (canwait > 0)
130                 thread.join(canwait);
131         }
132 
133         // If we still have threads running, get a bit more aggressive
134 
135         // interrupt remaining threads
136         if (_threadsStarted.get() > 0)
137             for (Thread thread : _threads)
138                 thread.interrupt();
139 
140         // wait again for the other half of our stop time
141         stopby = System.currentTimeMillis() + timeout / 2;
142         for (Thread thread : _threads)
143         {
144             long canwait = stopby - System.currentTimeMillis();
145             if (canwait > 0)
146                 thread.join(canwait);
147         }
148 
149         Thread.yield();
150         int size = _threads.size();
151         if (size > 0)
152         {
153             LOG.warn("{} threads could not be stopped", size);
154 
155             if ((size <= Runtime.getRuntime().availableProcessors()) || LOG.isDebugEnabled())
156             {
157                 for (Thread unstopped : _threads)
158                 {
159                     StringBuilder dmp = new StringBuilder();
160                     for (StackTraceElement element : unstopped.getStackTrace())
161                     {
162                         dmp.append(StringUtil.__LINE_SEPARATOR).append("\tat ").append(element);
163                     }
164                     LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
165                 }
166             }
167         }
168 
169         synchronized (_joinLock)
170         {
171             _joinLock.notifyAll();
172         }
173     }
174 
175     /**
176      * Delegated to the named or anonymous Pool.
177      */
178     public void setDaemon(boolean daemon)
179     {
180         _daemon = daemon;
181     }
182 
183     /**
184      * Set the maximum thread idle time.
185      * Threads that are idle for longer than this period may be
186      * stopped.
187      * Delegated to the named or anonymous Pool.
188      *
189      * @param idleTimeout Max idle time in ms.
190      * @see #getIdleTimeout
191      */
192     public void setIdleTimeout(int idleTimeout)
193     {
194         _idleTimeout = idleTimeout;
195     }
196 
197     /**
198      * Set the maximum number of threads.
199      * Delegated to the named or anonymous Pool.
200      *
201      * @param maxThreads maximum number of threads.
202      * @see #getMaxThreads
203      */
204     @Override
205     public void setMaxThreads(int maxThreads)
206     {
207         _maxThreads = maxThreads;
208         if (_minThreads > _maxThreads)
209             _minThreads = _maxThreads;
210     }
211 
212     /**
213      * Set the minimum number of threads.
214      * Delegated to the named or anonymous Pool.
215      *
216      * @param minThreads minimum number of threads
217      * @see #getMinThreads
218      */
219     @Override
220     public void setMinThreads(int minThreads)
221     {
222         _minThreads = minThreads;
223 
224         if (_minThreads > _maxThreads)
225             _maxThreads = _minThreads;
226 
227         int threads = _threadsStarted.get();
228         if (isStarted() && threads < _minThreads)
229             startThreads(_minThreads - threads);
230     }
231 
232     /**
233      * @param name Name of the BoundedThreadPool to use when naming Threads.
234      */
235     public void setName(String name)
236     {
237         if (isRunning())
238             throw new IllegalStateException("started");
239         _name = name;
240     }
241 
242     /**
243      * Set the priority of the pool threads.
244      *
245      * @param priority the new thread priority.
246      */
247     public void setThreadsPriority(int priority)
248     {
249         _priority = priority;
250     }
251 
252     /**
253      * Get the maximum thread idle time.
254      * Delegated to the named or anonymous Pool.
255      *
256      * @return Max idle time in ms.
257      * @see #setIdleTimeout
258      */
259     @ManagedAttribute("maximum time a thread may be idle in ms")
260     public int getIdleTimeout()
261     {
262         return _idleTimeout;
263     }
264 
265     /**
266      * Set the maximum number of threads.
267      * Delegated to the named or anonymous Pool.
268      *
269      * @return maximum number of threads.
270      * @see #setMaxThreads
271      */
272     @Override
273     @ManagedAttribute("maximum number of threads in the pool")
274     public int getMaxThreads()
275     {
276         return _maxThreads;
277     }
278 
279     /**
280      * Get the minimum number of threads.
281      * Delegated to the named or anonymous Pool.
282      *
283      * @return minimum number of threads.
284      * @see #setMinThreads
285      */
286     @Override
287     @ManagedAttribute("minimum number of threads in the pool")
288     public int getMinThreads()
289     {
290         return _minThreads;
291     }
292 
293     /**
294      * @return The name of the BoundedThreadPool.
295      */
296     @ManagedAttribute("name of the thread pool")
297     public String getName()
298     {
299         return _name;
300     }
301 
302     /**
303      * Get the priority of the pool threads.
304      *
305      * @return the priority of the pool threads.
306      */
307     @ManagedAttribute("priority of threads in the pool")
308     public int getThreadsPriority()
309     {
310         return _priority;
311     }
312 
313     /**
314      * Delegated to the named or anonymous Pool.
315      */
316     @ManagedAttribute("thead pool using a daemon thread")
317     public boolean isDaemon()
318     {
319         return _daemon;
320     }
321 
322     public boolean isDetailedDump()
323     {
324         return _detailedDump;
325     }
326 
327     public void setDetailedDump(boolean detailedDump)
328     {
329         _detailedDump = detailedDump;
330     }
331 
332     @Override
333     public boolean dispatch(Runnable job)
334     {
335         LOG.debug("{} dispatched {}", this, job);
336         return isRunning() && _jobs.offer(job);
337     }
338 
339     @Override
340     public void execute(Runnable job)
341     {
342         if (!dispatch(job))
343         {
344             LOG.warn("{} rejected {}", this, job);
345             throw new RejectedExecutionException(job.toString());
346         }
347     }
348 
349     /**
350      * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
351      */
352     @Override
353     public void join() throws InterruptedException
354     {
355         synchronized (_joinLock)
356         {
357             while (isRunning())
358                 _joinLock.wait();
359         }
360 
361         while (isStopping())
362             Thread.sleep(1);
363     }
364 
365     /**
366      * @return The total number of threads currently in the pool
367      */
368     @Override
369     @ManagedAttribute("total number of threads currently in the pool")
370     public int getThreads()
371     {
372         return _threadsStarted.get();
373     }
374 
375     /**
376      * @return The number of idle threads in the pool
377      */
378     @Override
379     @ManagedAttribute("total number of idle threads in the pool")
380     public int getIdleThreads()
381     {
382         return _threadsIdle.get();
383     }
384 
385     /**
386      * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
387      */
388     @Override
389     @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
390     public boolean isLowOnThreads()
391     {
392         return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
393     }
394 
395     private boolean startThreads(int threadsToStart)
396     {
397         while (threadsToStart > 0)
398         {
399             int threads = _threadsStarted.get();
400             if (threads >= _maxThreads)
401                 return false;
402 
403             if (!_threadsStarted.compareAndSet(threads, threads + 1))
404                 continue;
405 
406             boolean started = false;
407             try
408             {
409                 Thread thread = newThread(_runnable);
410                 thread.setDaemon(isDaemon());
411                 thread.setPriority(getThreadsPriority());
412                 thread.setName(_name + "-" + thread.getId());
413                 _threads.add(thread);
414 
415                 thread.start();
416                 started = true;
417             }
418             finally
419             {
420                 if (!started)
421                     _threadsStarted.decrementAndGet();
422             }
423             if (started)
424                 threadsToStart--;
425         }
426         return true;
427     }
428 
429     protected Thread newThread(Runnable runnable)
430     {
431         return new Thread(runnable);
432     }
433 
434 
435     @Override
436     @ManagedOperation("dump thread state")
437     public String dump()
438     {
439         return ContainerLifeCycle.dump(this);
440     }
441 
442     @Override
443     public void dump(Appendable out, String indent) throws IOException
444     {
445         List<Object> dump = new ArrayList<>(getMaxThreads());
446         for (final Thread thread : _threads)
447         {
448             final StackTraceElement[] trace = thread.getStackTrace();
449             boolean inIdleJobPoll = false;
450             for (StackTraceElement t : trace)
451             {
452                 if ("idleJobPoll".equals(t.getMethodName()))
453                 {
454                     inIdleJobPoll = true;
455                     break;
456                 }
457             }
458             final boolean idle = inIdleJobPoll;
459 
460             if (isDetailedDump())
461             {
462                 dump.add(new Dumpable()
463                 {
464                     @Override
465                     public void dump(Appendable out, String indent) throws IOException
466                     {
467                         out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "").append('\n');
468                         if (!idle)
469                             ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
470                     }
471 
472                     @Override
473                     public String dump()
474                     {
475                         return null;
476                     }
477                 });
478             }
479             else
480             {
481                 dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : ""));
482             }
483         }
484 
485         ContainerLifeCycle.dumpObject(out, this);
486         ContainerLifeCycle.dump(out, indent, dump);
487     }
488 
489     @Override
490     public String toString()
491     {
492         return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
493     }
494 
495     private Runnable idleJobPoll() throws InterruptedException
496     {
497         return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
498     }
499 
500     private Runnable _runnable = new Runnable()
501     {
502         @Override
503         public void run()
504         {
505             boolean shrink = false;
506             try
507             {
508                 Runnable job = _jobs.poll();
509 
510                 if (job != null && _threadsIdle.get() == 0)
511                 {
512                     startThreads(1);
513                 }
514 
515                 while (isRunning())
516                 {
517                     // Job loop
518                     while (job != null && isRunning())
519                     {
520                         runJob(job);
521                         job = _jobs.poll();
522                     }
523 
524                     // Idle loop
525                     try
526                     {
527                         _threadsIdle.incrementAndGet();
528 
529                         while (isRunning() && job == null)
530                         {
531                             if (_idleTimeout <= 0)
532                                 job = _jobs.take();
533                             else
534                             {
535                                 // maybe we should shrink?
536                                 final int size = _threadsStarted.get();
537                                 if (size > _minThreads)
538                                 {
539                                     long last = _lastShrink.get();
540                                     long now = System.currentTimeMillis();
541                                     if (last == 0 || (now - last) > _idleTimeout)
542                                     {
543                                         shrink = _lastShrink.compareAndSet(last, now) &&
544                                                 _threadsStarted.compareAndSet(size, size - 1);
545                                         if (shrink)
546                                         {
547                                             return;
548                                         }
549                                     }
550                                 }
551                                 job = idleJobPoll();
552                             }
553                         }
554                     }
555                     finally
556                     {
557                         if (_threadsIdle.decrementAndGet() == 0)
558                         {
559                             startThreads(1);
560                         }
561                     }
562                 }
563             }
564             catch (InterruptedException e)
565             {
566                 LOG.ignore(e);
567             }
568             catch (Throwable e)
569             {
570                 LOG.warn(e);
571             }
572             finally
573             {
574                 if (!shrink)
575                     _threadsStarted.decrementAndGet();
576                 _threads.remove(Thread.currentThread());
577             }
578         }
579     };
580 
581     /**
582      * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
583      * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
584      *
585      * @param job the job to run
586      */
587     protected void runJob(Runnable job)
588     {
589         job.run();
590     }
591 
592     /**
593      * @return the job queue
594      */
595     protected BlockingQueue<Runnable> getQueue()
596     {
597         return _jobs;
598     }
599 
600     /**
601      * @param queue the job queue
602      */
603     public void setQueue(BlockingQueue<Runnable> queue)
604     {
605         _jobs = queue;
606     }
607 
608     /**
609      * @param id The thread ID to interrupt.
610      * @return true if the thread was found and interrupted.
611      */
612     @ManagedOperation("interrupt a pool thread")
613     public boolean interruptThread(@Name("id") long id)
614     {
615         for (Thread thread : _threads)
616         {
617             if (thread.getId() == id)
618             {
619                 thread.interrupt();
620                 return true;
621             }
622         }
623         return false;
624     }
625 
626     /**
627      * @param id The thread ID to interrupt.
628      * @return true if the thread was found and interrupted.
629      */
630     @ManagedOperation("dump a pool thread stack")
631     public String dumpThread(@Name("id") long id)
632     {
633         for (Thread thread : _threads)
634         {
635             if (thread.getId() == id)
636             {
637                 StringBuilder buf = new StringBuilder();
638                 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
639                 for (StackTraceElement element : thread.getStackTrace())
640                     buf.append("  at ").append(element.toString()).append('\n');
641                 return buf.toString();
642             }
643         }
644         return null;
645     }
646 }