1 // ======================================================================== 2 // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd. 3 // ------------------------------------------------------------------------ 4 // All rights reserved. This program and the accompanying materials 5 // are made available under the terms of the Eclipse Public License v1.0 6 // and Apache License v2.0 which accompanies this distribution. 7 // The Eclipse Public License is available at 8 // http://www.eclipse.org/legal/epl-v10.html 9 // The Apache License v2.0 is available at 10 // http://www.opensource.org/licenses/apache2.0.php 11 // You may elect to redistribute this code under either of these licenses. 12 // ======================================================================== 13 14 package org.eclipse.jetty.util.thread; 15 16 import java.util.concurrent.ArrayBlockingQueue; 17 import java.util.concurrent.BlockingQueue; 18 import java.util.concurrent.ExecutorService; 19 import java.util.concurrent.LinkedBlockingQueue; 20 import java.util.concurrent.RejectedExecutionException; 21 import java.util.concurrent.SynchronousQueue; 22 import java.util.concurrent.ThreadPoolExecutor; 23 import java.util.concurrent.TimeUnit; 24 25 import org.eclipse.jetty.util.component.AbstractLifeCycle; 26 import org.eclipse.jetty.util.component.LifeCycle; 27 import org.eclipse.jetty.util.log.Log; 28 import org.eclipse.jetty.util.log.Logger; 29 30 /* ------------------------------------------------------------ */ 31 /** 32 * Jetty ThreadPool using java 5 ThreadPoolExecutor 33 * This class wraps a {@link ExecutorService} as a {@link ThreadPool} and 34 * {@link LifeCycle} interfaces so that it may be used by the Jetty <code>org.eclipse.jetty.server.Server</code> 35 */ 36 public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool, LifeCycle 37 { 38 private static final Logger LOG = Log.getLogger(ExecutorThreadPool.class); 39 private final ExecutorService _executor; 40 41 /* ------------------------------------------------------------ */ 42 public ExecutorThreadPool(ExecutorService executor) 43 { 44 _executor = executor; 45 } 46 47 /* ------------------------------------------------------------ */ 48 /** 49 * Wraps an {@link ThreadPoolExecutor}. 50 * Max pool size is 256, pool thread timeout after 60 seconds and 51 * an unbounded {@link LinkedBlockingQueue} is used for the job queue; 52 */ 53 public ExecutorThreadPool() 54 { 55 // Using an unbounded queue makes the maxThreads parameter useless 56 // Refer to ThreadPoolExecutor javadocs for details 57 this(new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>())); 58 } 59 60 /* ------------------------------------------------------------ */ 61 /** 62 * Wraps an {@link ThreadPoolExecutor}. 63 * Max pool size is 256, pool thread timeout after 60 seconds, and core pool size is 32 when queueSize >= 0. 64 * @param queueSize can be -1 for using an unbounded {@link LinkedBlockingQueue}, 0 for using a 65 * {@link SynchronousQueue}, greater than 0 for using a {@link ArrayBlockingQueue} of the given size. 66 */ 67 public ExecutorThreadPool(int queueSize) 68 { 69 this(queueSize < 0 ? new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) : 70 queueSize == 0 ? new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()) : 71 new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize))); 72 } 73 74 /* ------------------------------------------------------------ */ 75 /** 76 * Wraps an {@link ThreadPoolExecutor} using 77 * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue; 78 * @param corePoolSize must be equal to maximumPoolSize 79 * @param maximumPoolSize the maximum number of threads to allow in the pool 80 * @param keepAliveTime the max time a thread can remain idle, in milliseconds 81 */ 82 public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime) 83 { 84 this(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS); 85 } 86 87 /* ------------------------------------------------------------ */ 88 /** 89 * Wraps an {@link ThreadPoolExecutor} using 90 * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue. 91 * @param corePoolSize must be equal to maximumPoolSize 92 * @param maximumPoolSize the maximum number of threads to allow in the pool 93 * @param keepAliveTime the max time a thread can remain idle 94 * @param unit the unit for the keepAliveTime 95 */ 96 public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) 97 { 98 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>()); 99 } 100 101 /* ------------------------------------------------------------ */ 102 103 /** 104 * Wraps an {@link ThreadPoolExecutor} 105 * @param corePoolSize the number of threads to keep in the pool, even if they are idle 106 * @param maximumPoolSize the maximum number of threads to allow in the pool 107 * @param keepAliveTime the max time a thread can remain idle 108 * @param unit the unit for the keepAliveTime 109 * @param workQueue the queue to use for holding tasks before they are executed 110 */ 111 public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) 112 { 113 this(new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue)); 114 } 115 116 /* ------------------------------------------------------------ */ 117 public boolean dispatch(Runnable job) 118 { 119 try 120 { 121 _executor.execute(job); 122 return true; 123 } 124 catch(RejectedExecutionException e) 125 { 126 LOG.warn(e); 127 return false; 128 } 129 } 130 131 /* ------------------------------------------------------------ */ 132 public int getIdleThreads() 133 { 134 if (_executor instanceof ThreadPoolExecutor) 135 { 136 final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor; 137 return tpe.getPoolSize() - tpe.getActiveCount(); 138 } 139 return -1; 140 } 141 142 /* ------------------------------------------------------------ */ 143 public int getThreads() 144 { 145 if (_executor instanceof ThreadPoolExecutor) 146 { 147 final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor; 148 return tpe.getPoolSize(); 149 } 150 return -1; 151 } 152 153 /* ------------------------------------------------------------ */ 154 public boolean isLowOnThreads() 155 { 156 if (_executor instanceof ThreadPoolExecutor) 157 { 158 final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor; 159 // getActiveCount() locks the thread pool, so execute it last 160 return tpe.getPoolSize() == tpe.getMaximumPoolSize() && 161 tpe.getQueue().size() >= tpe.getPoolSize() - tpe.getActiveCount(); 162 } 163 return false; 164 } 165 166 /* ------------------------------------------------------------ */ 167 public void join() throws InterruptedException 168 { 169 _executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); 170 } 171 172 /* ------------------------------------------------------------ */ 173 @Override 174 protected void doStop() throws Exception 175 { 176 super.doStop(); 177 _executor.shutdownNow(); 178 } 179 }