View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.util.thread;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.List;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.RejectedExecutionException;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.atomic.AtomicInteger;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.eclipse.jetty.util.BlockingArrayQueue;
34  import org.eclipse.jetty.util.StringUtil;
35  import org.eclipse.jetty.util.annotation.ManagedAttribute;
36  import org.eclipse.jetty.util.annotation.ManagedObject;
37  import org.eclipse.jetty.util.annotation.ManagedOperation;
38  import org.eclipse.jetty.util.annotation.Name;
39  import org.eclipse.jetty.util.component.AbstractLifeCycle;
40  import org.eclipse.jetty.util.component.ContainerLifeCycle;
41  import org.eclipse.jetty.util.component.Dumpable;
42  import org.eclipse.jetty.util.component.LifeCycle;
43  import org.eclipse.jetty.util.log.Log;
44  import org.eclipse.jetty.util.log.Logger;
45  import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
46  
47  @ManagedObject("A thread pool with no max bound by default")
48  public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
49  {
50      private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
51  
52      private final AtomicInteger _threadsStarted = new AtomicInteger();
53      private final AtomicInteger _threadsIdle = new AtomicInteger();
54      private final AtomicLong _lastShrink = new AtomicLong();
55      private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
56      private final Object _joinLock = new Object();
57      private final BlockingQueue<Runnable> _jobs;
58      private String _name = "qtp" + hashCode();
59      private int _idleTimeout;
60      private int _maxThreads;
61      private int _minThreads;
62      private int _priority = Thread.NORM_PRIORITY;
63      private boolean _daemon = false;
64      private boolean _detailedDump = false;
65  
66      public QueuedThreadPool()
67      {
68          this(200);
69      }
70  
71      public QueuedThreadPool(@Name("maxThreads") int maxThreads)
72      {
73          this(maxThreads, 8);
74      }
75  
76      public QueuedThreadPool(@Name("maxThreads") int maxThreads,  @Name("minThreads") int minThreads)
77      {
78          this(maxThreads, minThreads, 60000);
79      }
80  
81      public QueuedThreadPool(@Name("maxThreads") int maxThreads,  @Name("minThreads") int minThreads, @Name("idleTimeout")int idleTimeout)
82      {
83          this(maxThreads, minThreads, idleTimeout, null);
84      }
85  
86      public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
87      {
88          setMinThreads(minThreads);
89          setMaxThreads(maxThreads);
90          setIdleTimeout(idleTimeout);
91          setStopTimeout(5000);
92  
93          if (queue==null)
94              queue=new BlockingArrayQueue<>(_minThreads, _minThreads);
95          _jobs=queue;
96  
97      }
98  
99      @Override
100     protected void doStart() throws Exception
101     {
102         super.doStart();
103         _threadsStarted.set(0);
104 
105         startThreads(_minThreads);
106     }
107 
108     @Override
109     protected void doStop() throws Exception
110     {
111         super.doStop();
112 
113         long timeout = getStopTimeout();
114         BlockingQueue<Runnable> jobs = getQueue();
115 
116         // If no stop timeout, clear job queue
117         if (timeout <= 0)
118             jobs.clear();
119 
120         // Fill job Q with noop jobs to wakeup idle
121         Runnable noop = new Runnable()
122         {
123             @Override
124             public void run()
125             {
126             }
127         };
128         for (int i = _threadsStarted.get(); i-- > 0; )
129             jobs.offer(noop);
130 
131         // try to jobs complete naturally for half our stop time
132         long stopby = System.currentTimeMillis() + timeout / 2;
133         for (Thread thread : _threads)
134         {
135             long canwait = stopby - System.currentTimeMillis();
136             if (canwait > 0)
137                 thread.join(canwait);
138         }
139 
140         // If we still have threads running, get a bit more aggressive
141 
142         // interrupt remaining threads
143         if (_threadsStarted.get() > 0)
144             for (Thread thread : _threads)
145                 thread.interrupt();
146 
147         // wait again for the other half of our stop time
148         stopby = System.currentTimeMillis() + timeout / 2;
149         for (Thread thread : _threads)
150         {
151             long canwait = stopby - System.currentTimeMillis();
152             if (canwait > 0)
153                 thread.join(canwait);
154         }
155 
156         Thread.yield();
157         int size = _threads.size();
158         if (size > 0)
159         {
160             Thread.yield();
161             
162             if (LOG.isDebugEnabled())
163             {
164                 for (Thread unstopped : _threads)
165                 {
166                     StringBuilder dmp = new StringBuilder();
167                     for (StackTraceElement element : unstopped.getStackTrace())
168                     {
169                         dmp.append(StringUtil.__LINE_SEPARATOR).append("\tat ").append(element);
170                     }
171                     LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
172                 }
173             }
174             else
175             {
176                 for (Thread unstopped : _threads)
177                     LOG.warn("{} Couldn't stop {}",this,unstopped);
178             }
179         }
180 
181         synchronized (_joinLock)
182         {
183             _joinLock.notifyAll();
184         }
185     }
186 
187     /**
188      * Delegated to the named or anonymous Pool.
189      */
190     public void setDaemon(boolean daemon)
191     {
192         _daemon = daemon;
193     }
194 
195     /**
196      * Set the maximum thread idle time.
197      * Threads that are idle for longer than this period may be
198      * stopped.
199      * Delegated to the named or anonymous Pool.
200      *
201      * @param idleTimeout Max idle time in ms.
202      * @see #getIdleTimeout
203      */
204     public void setIdleTimeout(int idleTimeout)
205     {
206         _idleTimeout = idleTimeout;
207     }
208 
209     /**
210      * Set the maximum number of threads.
211      * Delegated to the named or anonymous Pool.
212      *
213      * @param maxThreads maximum number of threads.
214      * @see #getMaxThreads
215      */
216     @Override
217     public void setMaxThreads(int maxThreads)
218     {
219         _maxThreads = maxThreads;
220         if (_minThreads > _maxThreads)
221             _minThreads = _maxThreads;
222     }
223 
224     /**
225      * Set the minimum number of threads.
226      * Delegated to the named or anonymous Pool.
227      *
228      * @param minThreads minimum number of threads
229      * @see #getMinThreads
230      */
231     @Override
232     public void setMinThreads(int minThreads)
233     {
234         _minThreads = minThreads;
235 
236         if (_minThreads > _maxThreads)
237             _maxThreads = _minThreads;
238 
239         int threads = _threadsStarted.get();
240         if (isStarted() && threads < _minThreads)
241             startThreads(_minThreads - threads);
242     }
243 
244     /**
245      * @param name Name of the BoundedThreadPool to use when naming Threads.
246      */
247     public void setName(String name)
248     {
249         if (isRunning())
250             throw new IllegalStateException("started");
251         _name = name;
252     }
253 
254     /**
255      * Set the priority of the pool threads.
256      *
257      * @param priority the new thread priority.
258      */
259     public void setThreadsPriority(int priority)
260     {
261         _priority = priority;
262     }
263 
264     /**
265      * Get the maximum thread idle time.
266      * Delegated to the named or anonymous Pool.
267      *
268      * @return Max idle time in ms.
269      * @see #setIdleTimeout
270      */
271     @ManagedAttribute("maximum time a thread may be idle in ms")
272     public int getIdleTimeout()
273     {
274         return _idleTimeout;
275     }
276 
277     /**
278      * Set the maximum number of threads.
279      * Delegated to the named or anonymous Pool.
280      *
281      * @return maximum number of threads.
282      * @see #setMaxThreads
283      */
284     @Override
285     @ManagedAttribute("maximum number of threads in the pool")
286     public int getMaxThreads()
287     {
288         return _maxThreads;
289     }
290 
291     /**
292      * Get the minimum number of threads.
293      * Delegated to the named or anonymous Pool.
294      *
295      * @return minimum number of threads.
296      * @see #setMinThreads
297      */
298     @Override
299     @ManagedAttribute("minimum number of threads in the pool")
300     public int getMinThreads()
301     {
302         return _minThreads;
303     }
304 
305     /**
306      * @return The name of the BoundedThreadPool.
307      */
308     @ManagedAttribute("name of the thread pool")
309     public String getName()
310     {
311         return _name;
312     }
313 
314     /**
315      * Get the priority of the pool threads.
316      *
317      * @return the priority of the pool threads.
318      */
319     @ManagedAttribute("priority of threads in the pool")
320     public int getThreadsPriority()
321     {
322         return _priority;
323     }
324 
325     /**
326      * Delegated to the named or anonymous Pool.
327      */
328     @ManagedAttribute("thead pool using a daemon thread")
329     public boolean isDaemon()
330     {
331         return _daemon;
332     }
333 
334     public boolean isDetailedDump()
335     {
336         return _detailedDump;
337     }
338 
339     public void setDetailedDump(boolean detailedDump)
340     {
341         _detailedDump = detailedDump;
342     }
343 
344     @Override
345     public boolean dispatch(Runnable job)
346     {
347         LOG.debug("{} dispatched {}", this, job);
348         return isRunning() && _jobs.offer(job);
349     }
350 
351     @Override
352     public void execute(Runnable job)
353     {
354         if (!dispatch(job))
355         {
356             LOG.warn("{} rejected {}", this, job);
357             throw new RejectedExecutionException(job.toString());
358         }
359     }
360 
361     /**
362      * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
363      */
364     @Override
365     public void join() throws InterruptedException
366     {
367         synchronized (_joinLock)
368         {
369             while (isRunning())
370                 _joinLock.wait();
371         }
372 
373         while (isStopping())
374             Thread.sleep(1);
375     }
376 
377     /**
378      * @return The total number of threads currently in the pool
379      */
380     @Override
381     @ManagedAttribute("total number of threads currently in the pool")
382     public int getThreads()
383     {
384         return _threadsStarted.get();
385     }
386 
387     /**
388      * @return The number of idle threads in the pool
389      */
390     @Override
391     @ManagedAttribute("total number of idle threads in the pool")
392     public int getIdleThreads()
393     {
394         return _threadsIdle.get();
395     }
396 
397     /**
398      * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
399      */
400     @Override
401     @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
402     public boolean isLowOnThreads()
403     {
404         return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
405     }
406 
407     private boolean startThreads(int threadsToStart)
408     {
409         while (threadsToStart > 0)
410         {
411             int threads = _threadsStarted.get();
412             if (threads >= _maxThreads)
413                 return false;
414 
415             if (!_threadsStarted.compareAndSet(threads, threads + 1))
416                 continue;
417 
418             boolean started = false;
419             try
420             {
421                 Thread thread = newThread(_runnable);
422                 thread.setDaemon(isDaemon());
423                 thread.setPriority(getThreadsPriority());
424                 thread.setName(_name + "-" + thread.getId());
425                 _threads.add(thread);
426 
427                 thread.start();
428                 started = true;
429             }
430             finally
431             {
432                 if (!started)
433                     _threadsStarted.decrementAndGet();
434             }
435             if (started)
436                 threadsToStart--;
437         }
438         return true;
439     }
440 
441     protected Thread newThread(Runnable runnable)
442     {
443         return new Thread(runnable);
444     }
445 
446 
447     @Override
448     @ManagedOperation("dump thread state")
449     public String dump()
450     {
451         return ContainerLifeCycle.dump(this);
452     }
453 
454     @Override
455     public void dump(Appendable out, String indent) throws IOException
456     {
457         List<Object> dump = new ArrayList<>(getMaxThreads());
458         for (final Thread thread : _threads)
459         {
460             final StackTraceElement[] trace = thread.getStackTrace();
461             boolean inIdleJobPoll = false;
462             for (StackTraceElement t : trace)
463             {
464                 if ("idleJobPoll".equals(t.getMethodName()))
465                 {
466                     inIdleJobPoll = true;
467                     break;
468                 }
469             }
470             final boolean idle = inIdleJobPoll;
471 
472             if (isDetailedDump())
473             {
474                 dump.add(new Dumpable()
475                 {
476                     @Override
477                     public void dump(Appendable out, String indent) throws IOException
478                     {
479                         out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "").append('\n');
480                         if (!idle)
481                             ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
482                     }
483 
484                     @Override
485                     public String dump()
486                     {
487                         return null;
488                     }
489                 });
490             }
491             else
492             {
493                 dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : ""));
494             }
495         }
496 
497         ContainerLifeCycle.dumpObject(out, this);
498         ContainerLifeCycle.dump(out, indent, dump);
499     }
500 
501     @Override
502     public String toString()
503     {
504         return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
505     }
506 
507     private Runnable idleJobPoll() throws InterruptedException
508     {
509         return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
510     }
511 
512     private Runnable _runnable = new Runnable()
513     {
514         @Override
515         public void run()
516         {
517             boolean shrink = false;
518             try
519             {
520                 Runnable job = _jobs.poll();
521 
522                 if (job != null && _threadsIdle.get() == 0)
523                 {
524                     startThreads(1);
525                 }
526 
527                 while (isRunning())
528                 {
529                     // Job loop
530                     while (job != null && isRunning())
531                     {
532                         runJob(job);
533                         job = _jobs.poll();
534                     }
535 
536                     // Idle loop
537                     try
538                     {
539                         _threadsIdle.incrementAndGet();
540 
541                         while (isRunning() && job == null)
542                         {
543                             if (_idleTimeout <= 0)
544                                 job = _jobs.take();
545                             else
546                             {
547                                 // maybe we should shrink?
548                                 final int size = _threadsStarted.get();
549                                 if (size > _minThreads)
550                                 {
551                                     long last = _lastShrink.get();
552                                     long now = System.currentTimeMillis();
553                                     if (last == 0 || (now - last) > _idleTimeout)
554                                     {
555                                         shrink = _lastShrink.compareAndSet(last, now) &&
556                                                 _threadsStarted.compareAndSet(size, size - 1);
557                                         if (shrink)
558                                         {
559                                             return;
560                                         }
561                                     }
562                                 }
563                                 job = idleJobPoll();
564                             }
565                         }
566                     }
567                     finally
568                     {
569                         if (_threadsIdle.decrementAndGet() == 0)
570                         {
571                             startThreads(1);
572                         }
573                     }
574                 }
575             }
576             catch (InterruptedException e)
577             {
578                 LOG.ignore(e);
579             }
580             catch (Throwable e)
581             {
582                 LOG.warn(e);
583             }
584             finally
585             {
586                 if (!shrink)
587                     _threadsStarted.decrementAndGet();
588                 _threads.remove(Thread.currentThread());
589             }
590         }
591     };
592 
593     /**
594      * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
595      * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
596      *
597      * @param job the job to run
598      */
599     protected void runJob(Runnable job)
600     {
601         job.run();
602     }
603 
604     /**
605      * @return the job queue
606      */
607     protected BlockingQueue<Runnable> getQueue()
608     {
609         return _jobs;
610     }
611 
612     /**
613      * @param queue the job queue
614      */
615     public void setQueue(BlockingQueue<Runnable> queue)
616     {
617         throw new UnsupportedOperationException("Use constructor injection");
618     }
619 
620     /**
621      * @param id The thread ID to interrupt.
622      * @return true if the thread was found and interrupted.
623      */
624     @ManagedOperation("interrupt a pool thread")
625     public boolean interruptThread(@Name("id") long id)
626     {
627         for (Thread thread : _threads)
628         {
629             if (thread.getId() == id)
630             {
631                 thread.interrupt();
632                 return true;
633             }
634         }
635         return false;
636     }
637 
638     /**
639      * @param id The thread ID to interrupt.
640      * @return true if the thread was found and interrupted.
641      */
642     @ManagedOperation("dump a pool thread stack")
643     public String dumpThread(@Name("id") long id)
644     {
645         for (Thread thread : _threads)
646         {
647             if (thread.getId() == id)
648             {
649                 StringBuilder buf = new StringBuilder();
650                 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
651                 for (StackTraceElement element : thread.getStackTrace())
652                     buf.append("  at ").append(element.toString()).append('\n');
653                 return buf.toString();
654             }
655         }
656         return null;
657     }
658 }