View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.util.thread;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.List;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.RejectedExecutionException;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.atomic.AtomicInteger;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.eclipse.jetty.util.BlockingArrayQueue;
34  import org.eclipse.jetty.util.StringUtil;
35  import org.eclipse.jetty.util.annotation.ManagedAttribute;
36  import org.eclipse.jetty.util.annotation.ManagedObject;
37  import org.eclipse.jetty.util.annotation.ManagedOperation;
38  import org.eclipse.jetty.util.annotation.Name;
39  import org.eclipse.jetty.util.component.AbstractLifeCycle;
40  import org.eclipse.jetty.util.component.ContainerLifeCycle;
41  import org.eclipse.jetty.util.component.Dumpable;
42  import org.eclipse.jetty.util.component.LifeCycle;
43  import org.eclipse.jetty.util.log.Log;
44  import org.eclipse.jetty.util.log.Logger;
45  import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
46  
47  @ManagedObject("A thread pool with no max bound by default")
48  public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
49  {
50      private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
51  
52      private final AtomicInteger _threadsStarted = new AtomicInteger();
53      private final AtomicInteger _threadsIdle = new AtomicInteger();
54      private final AtomicLong _lastShrink = new AtomicLong();
55      private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
56      private final Object _joinLock = new Object();
57      private final BlockingQueue<Runnable> _jobs;
58      private String _name = "qtp" + hashCode();
59      private int _idleTimeout;
60      private int _maxThreads;
61      private int _minThreads;
62      private int _priority = Thread.NORM_PRIORITY;
63      private boolean _daemon = false;
64      private boolean _detailedDump = false;
65  
66      public QueuedThreadPool()
67      {
68          this(200);
69      }
70  
71      public QueuedThreadPool(@Name("maxThreads") int maxThreads)
72      {
73          this(maxThreads, 8);
74      }
75  
76      public QueuedThreadPool(@Name("maxThreads") int maxThreads,  @Name("minThreads") int minThreads)
77      {
78          this(maxThreads, minThreads, 60000);
79      }
80  
81      public QueuedThreadPool(@Name("maxThreads") int maxThreads,  @Name("minThreads") int minThreads, @Name("idleTimeout")int idleTimeout)
82      {
83          this(maxThreads, minThreads, idleTimeout, null);
84      }
85  
86      public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
87      {
88          setMinThreads(minThreads);
89          setMaxThreads(maxThreads);
90          setIdleTimeout(idleTimeout);
91          setStopTimeout(5000);
92  
93          if (queue==null)
94              queue=new BlockingArrayQueue<>(_minThreads, _minThreads);
95          _jobs=queue;
96  
97      }
98  
99      @Override
100     protected void doStart() throws Exception
101     {
102         super.doStart();
103         _threadsStarted.set(0);
104 
105         startThreads(_minThreads);
106     }
107 
108     @Override
109     protected void doStop() throws Exception
110     {
111         super.doStop();
112 
113         long timeout = getStopTimeout();
114         BlockingQueue<Runnable> jobs = getQueue();
115 
116         // If no stop timeout, clear job queue
117         if (timeout <= 0)
118             jobs.clear();
119 
120         // Fill job Q with noop jobs to wakeup idle
121         Runnable noop = new Runnable()
122         {
123             @Override
124             public void run()
125             {
126             }
127         };
128         for (int i = _threadsStarted.get(); i-- > 0; )
129             jobs.offer(noop);
130 
131         // try to jobs complete naturally for half our stop time
132         long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
133         for (Thread thread : _threads)
134         {
135             long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
136             if (canwait > 0)
137                 thread.join(canwait);
138         }
139 
140         // If we still have threads running, get a bit more aggressive
141 
142         // interrupt remaining threads
143         if (_threadsStarted.get() > 0)
144             for (Thread thread : _threads)
145                 thread.interrupt();
146 
147         // wait again for the other half of our stop time
148         stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
149         for (Thread thread : _threads)
150         {
151             long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
152             if (canwait > 0)
153                 thread.join(canwait);
154         }
155 
156         Thread.yield();
157         int size = _threads.size();
158         if (size > 0)
159         {
160             Thread.yield();
161             
162             if (LOG.isDebugEnabled())
163             {
164                 for (Thread unstopped : _threads)
165                 {
166                     StringBuilder dmp = new StringBuilder();
167                     for (StackTraceElement element : unstopped.getStackTrace())
168                     {
169                         dmp.append(StringUtil.__LINE_SEPARATOR).append("\tat ").append(element);
170                     }
171                     LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
172                 }
173             }
174             else
175             {
176                 for (Thread unstopped : _threads)
177                     LOG.warn("{} Couldn't stop {}",this,unstopped);
178             }
179         }
180 
181         synchronized (_joinLock)
182         {
183             _joinLock.notifyAll();
184         }
185     }
186 
187     /**
188      * Delegated to the named or anonymous Pool.
189      */
190     public void setDaemon(boolean daemon)
191     {
192         _daemon = daemon;
193     }
194 
195     /**
196      * Set the maximum thread idle time.
197      * Threads that are idle for longer than this period may be
198      * stopped.
199      * Delegated to the named or anonymous Pool.
200      *
201      * @param idleTimeout Max idle time in ms.
202      * @see #getIdleTimeout
203      */
204     public void setIdleTimeout(int idleTimeout)
205     {
206         _idleTimeout = idleTimeout;
207     }
208 
209     /**
210      * Set the maximum number of threads.
211      * Delegated to the named or anonymous Pool.
212      *
213      * @param maxThreads maximum number of threads.
214      * @see #getMaxThreads
215      */
216     @Override
217     public void setMaxThreads(int maxThreads)
218     {
219         _maxThreads = maxThreads;
220         if (_minThreads > _maxThreads)
221             _minThreads = _maxThreads;
222     }
223 
224     /**
225      * Set the minimum number of threads.
226      * Delegated to the named or anonymous Pool.
227      *
228      * @param minThreads minimum number of threads
229      * @see #getMinThreads
230      */
231     @Override
232     public void setMinThreads(int minThreads)
233     {
234         _minThreads = minThreads;
235 
236         if (_minThreads > _maxThreads)
237             _maxThreads = _minThreads;
238 
239         int threads = _threadsStarted.get();
240         if (isStarted() && threads < _minThreads)
241             startThreads(_minThreads - threads);
242     }
243 
244     /**
245      * @param name Name of this thread pool to use when naming threads.
246      */
247     public void setName(String name)
248     {
249         if (isRunning())
250             throw new IllegalStateException("started");
251         _name = name;
252     }
253 
254     /**
255      * Set the priority of the pool threads.
256      *
257      * @param priority the new thread priority.
258      */
259     public void setThreadsPriority(int priority)
260     {
261         _priority = priority;
262     }
263 
264     /**
265      * Get the maximum thread idle time.
266      * Delegated to the named or anonymous Pool.
267      *
268      * @return Max idle time in ms.
269      * @see #setIdleTimeout
270      */
271     @ManagedAttribute("maximum time a thread may be idle in ms")
272     public int getIdleTimeout()
273     {
274         return _idleTimeout;
275     }
276 
277     /**
278      * Set the maximum number of threads.
279      * Delegated to the named or anonymous Pool.
280      *
281      * @return maximum number of threads.
282      * @see #setMaxThreads
283      */
284     @Override
285     @ManagedAttribute("maximum number of threads in the pool")
286     public int getMaxThreads()
287     {
288         return _maxThreads;
289     }
290 
291     /**
292      * Get the minimum number of threads.
293      * Delegated to the named or anonymous Pool.
294      *
295      * @return minimum number of threads.
296      * @see #setMinThreads
297      */
298     @Override
299     @ManagedAttribute("minimum number of threads in the pool")
300     public int getMinThreads()
301     {
302         return _minThreads;
303     }
304 
305     /**
306      * @return The name of the this thread pool
307      */
308     @ManagedAttribute("name of the thread pool")
309     public String getName()
310     {
311         return _name;
312     }
313 
314     /**
315      * Get the priority of the pool threads.
316      *
317      * @return the priority of the pool threads.
318      */
319     @ManagedAttribute("priority of threads in the pool")
320     public int getThreadsPriority()
321     {
322         return _priority;
323     }
324 
325     /**
326      * Delegated to the named or anonymous Pool.
327      */
328     @ManagedAttribute("thead pool using a daemon thread")
329     public boolean isDaemon()
330     {
331         return _daemon;
332     }
333 
334     public boolean isDetailedDump()
335     {
336         return _detailedDump;
337     }
338 
339     public void setDetailedDump(boolean detailedDump)
340     {
341         _detailedDump = detailedDump;
342     }
343     
344     @Override
345     public void execute(Runnable job)
346     {
347         if (!isRunning() || !_jobs.offer(job))
348         {
349             LOG.warn("{} rejected {}", this, job);
350             throw new RejectedExecutionException(job.toString());
351         }
352     }
353 
354     /**
355      * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
356      */
357     @Override
358     public void join() throws InterruptedException
359     {
360         synchronized (_joinLock)
361         {
362             while (isRunning())
363                 _joinLock.wait();
364         }
365 
366         while (isStopping())
367             Thread.sleep(1);
368     }
369 
370     /**
371      * @return The total number of threads currently in the pool
372      */
373     @Override
374     @ManagedAttribute("total number of threads currently in the pool")
375     public int getThreads()
376     {
377         return _threadsStarted.get();
378     }
379 
380     /**
381      * @return The number of idle threads in the pool
382      */
383     @Override
384     @ManagedAttribute("total number of idle threads in the pool")
385     public int getIdleThreads()
386     {
387         return _threadsIdle.get();
388     }
389 
390     /**
391      * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
392      */
393     @Override
394     @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
395     public boolean isLowOnThreads()
396     {
397         return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
398     }
399 
400     private boolean startThreads(int threadsToStart)
401     {
402         while (threadsToStart > 0)
403         {
404             int threads = _threadsStarted.get();
405             if (threads >= _maxThreads)
406                 return false;
407 
408             if (!_threadsStarted.compareAndSet(threads, threads + 1))
409                 continue;
410 
411             boolean started = false;
412             try
413             {
414                 Thread thread = newThread(_runnable);
415                 thread.setDaemon(isDaemon());
416                 thread.setPriority(getThreadsPriority());
417                 thread.setName(_name + "-" + thread.getId());
418                 _threads.add(thread);
419 
420                 thread.start();
421                 started = true;
422             }
423             finally
424             {
425                 if (!started)
426                     _threadsStarted.decrementAndGet();
427             }
428             if (started)
429                 threadsToStart--;
430         }
431         return true;
432     }
433 
434     protected Thread newThread(Runnable runnable)
435     {
436         return new Thread(runnable);
437     }
438 
439 
440     @Override
441     @ManagedOperation("dump thread state")
442     public String dump()
443     {
444         return ContainerLifeCycle.dump(this);
445     }
446 
447     @Override
448     public void dump(Appendable out, String indent) throws IOException
449     {
450         List<Object> dump = new ArrayList<>(getMaxThreads());
451         for (final Thread thread : _threads)
452         {
453             final StackTraceElement[] trace = thread.getStackTrace();
454             boolean inIdleJobPoll = false;
455             for (StackTraceElement t : trace)
456             {
457                 if ("idleJobPoll".equals(t.getMethodName()))
458                 {
459                     inIdleJobPoll = true;
460                     break;
461                 }
462             }
463             final boolean idle = inIdleJobPoll;
464 
465             if (isDetailedDump())
466             {
467                 dump.add(new Dumpable()
468                 {
469                     @Override
470                     public void dump(Appendable out, String indent) throws IOException
471                     {
472                         out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "").append('\n');
473                         if (!idle)
474                             ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
475                     }
476 
477                     @Override
478                     public String dump()
479                     {
480                         return null;
481                     }
482                 });
483             }
484             else
485             {
486                 dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : ""));
487             }
488         }
489 
490         ContainerLifeCycle.dumpObject(out, this);
491         ContainerLifeCycle.dump(out, indent, dump);
492     }
493 
494     @Override
495     public String toString()
496     {
497         return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
498     }
499 
500     private Runnable idleJobPoll() throws InterruptedException
501     {
502         return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
503     }
504 
505     private Runnable _runnable = new Runnable()
506     {
507         @Override
508         public void run()
509         {
510             boolean shrink = false;
511             try
512             {
513                 Runnable job = _jobs.poll();
514 
515                 if (job != null && _threadsIdle.get() == 0)
516                 {
517                     startThreads(1);
518                 }
519 
520                 while (isRunning())
521                 {
522                     // Job loop
523                     while (job != null && isRunning())
524                     {
525                         runJob(job);
526                         job = _jobs.poll();
527                     }
528 
529                     // Idle loop
530                     try
531                     {
532                         _threadsIdle.incrementAndGet();
533 
534                         while (isRunning() && job == null)
535                         {
536                             if (_idleTimeout <= 0)
537                                 job = _jobs.take();
538                             else
539                             {
540                                 // maybe we should shrink?
541                                 final int size = _threadsStarted.get();
542                                 if (size > _minThreads)
543                                 {
544                                     long last = _lastShrink.get();
545                                     long now = System.nanoTime();
546                                     if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
547                                     {
548                                         shrink = _lastShrink.compareAndSet(last, now) &&
549                                                 _threadsStarted.compareAndSet(size, size - 1);
550                                         if (shrink)
551                                         {
552                                             return;
553                                         }
554                                     }
555                                 }
556                                 job = idleJobPoll();
557                             }
558                         }
559                     }
560                     finally
561                     {
562                         if (_threadsIdle.decrementAndGet() == 0)
563                         {
564                             startThreads(1);
565                         }
566                     }
567                 }
568             }
569             catch (InterruptedException e)
570             {
571                 LOG.ignore(e);
572             }
573             catch (Throwable e)
574             {
575                 LOG.warn(e);
576             }
577             finally
578             {
579                 if (!shrink)
580                     _threadsStarted.decrementAndGet();
581                 _threads.remove(Thread.currentThread());
582             }
583         }
584     };
585 
586     /**
587      * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
588      * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
589      *
590      * @param job the job to run
591      */
592     protected void runJob(Runnable job)
593     {
594         job.run();
595     }
596 
597     /**
598      * @return the job queue
599      */
600     protected BlockingQueue<Runnable> getQueue()
601     {
602         return _jobs;
603     }
604 
605     /**
606      * @param queue the job queue
607      */
608     public void setQueue(BlockingQueue<Runnable> queue)
609     {
610         throw new UnsupportedOperationException("Use constructor injection");
611     }
612 
613     /**
614      * @param id The thread ID to interrupt.
615      * @return true if the thread was found and interrupted.
616      */
617     @ManagedOperation("interrupt a pool thread")
618     public boolean interruptThread(@Name("id") long id)
619     {
620         for (Thread thread : _threads)
621         {
622             if (thread.getId() == id)
623             {
624                 thread.interrupt();
625                 return true;
626             }
627         }
628         return false;
629     }
630 
631     /**
632      * @param id The thread ID to interrupt.
633      * @return true if the thread was found and interrupted.
634      */
635     @ManagedOperation("dump a pool thread stack")
636     public String dumpThread(@Name("id") long id)
637     {
638         for (Thread thread : _threads)
639         {
640             if (thread.getId() == id)
641             {
642                 StringBuilder buf = new StringBuilder();
643                 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
644                 for (StackTraceElement element : thread.getStackTrace())
645                     buf.append("  at ").append(element.toString()).append('\n');
646                 return buf.toString();
647             }
648         }
649         return null;
650     }
651 }