1 // 2 // ======================================================================== 3 // Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. 4 // ------------------------------------------------------------------------ 5 // All rights reserved. This program and the accompanying materials 6 // are made available under the terms of the Eclipse Public License v1.0 7 // and Apache License v2.0 which accompanies this distribution. 8 // 9 // The Eclipse Public License is available at 10 // http://www.eclipse.org/legal/epl-v10.html 11 // 12 // The Apache License v2.0 is available at 13 // http://www.opensource.org/licenses/apache2.0.php 14 // 15 // You may elect to redistribute this code under either of these licenses. 16 // ======================================================================== 17 // 18 19 package org.eclipse.jetty.util.thread; 20 21 import java.util.concurrent.ArrayBlockingQueue; 22 import java.util.concurrent.BlockingQueue; 23 import java.util.concurrent.ExecutorService; 24 import java.util.concurrent.LinkedBlockingQueue; 25 import java.util.concurrent.RejectedExecutionException; 26 import java.util.concurrent.SynchronousQueue; 27 import java.util.concurrent.ThreadPoolExecutor; 28 import java.util.concurrent.TimeUnit; 29 30 import org.eclipse.jetty.util.component.AbstractLifeCycle; 31 import org.eclipse.jetty.util.component.LifeCycle; 32 import org.eclipse.jetty.util.log.Log; 33 import org.eclipse.jetty.util.log.Logger; 34 35 /* ------------------------------------------------------------ */ 36 /** 37 * Jetty ThreadPool using java 5 ThreadPoolExecutor 38 * This class wraps a {@link ExecutorService} as a {@link ThreadPool} and 39 * {@link LifeCycle} interfaces so that it may be used by the Jetty <code>org.eclipse.jetty.server.Server</code> 40 */ 41 public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool, LifeCycle 42 { 43 private static final Logger LOG = Log.getLogger(ExecutorThreadPool.class); 44 private final ExecutorService _executor; 45 46 /* ------------------------------------------------------------ */ 47 public ExecutorThreadPool(ExecutorService executor) 48 { 49 _executor = executor; 50 } 51 52 /* ------------------------------------------------------------ */ 53 /** 54 * Wraps an {@link ThreadPoolExecutor}. 55 * Max pool size is 256, pool thread timeout after 60 seconds and 56 * an unbounded {@link LinkedBlockingQueue} is used for the job queue; 57 */ 58 public ExecutorThreadPool() 59 { 60 // Using an unbounded queue makes the maxThreads parameter useless 61 // Refer to ThreadPoolExecutor javadocs for details 62 this(new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>())); 63 } 64 65 /* ------------------------------------------------------------ */ 66 /** 67 * Wraps an {@link ThreadPoolExecutor}. 68 * Max pool size is 256, pool thread timeout after 60 seconds, and core pool size is 32 when queueSize >= 0. 69 * @param queueSize can be -1 for using an unbounded {@link LinkedBlockingQueue}, 0 for using a 70 * {@link SynchronousQueue}, greater than 0 for using a {@link ArrayBlockingQueue} of the given size. 71 */ 72 public ExecutorThreadPool(int queueSize) 73 { 74 this(queueSize < 0 ? new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) : 75 queueSize == 0 ? new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()) : 76 new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize))); 77 } 78 79 /* ------------------------------------------------------------ */ 80 /** 81 * Wraps an {@link ThreadPoolExecutor} using 82 * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue; 83 * @param corePoolSize must be equal to maximumPoolSize 84 * @param maximumPoolSize the maximum number of threads to allow in the pool 85 * @param keepAliveTime the max time a thread can remain idle, in milliseconds 86 */ 87 public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime) 88 { 89 this(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS); 90 } 91 92 /* ------------------------------------------------------------ */ 93 /** 94 * Wraps an {@link ThreadPoolExecutor} using 95 * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue. 96 * @param corePoolSize must be equal to maximumPoolSize 97 * @param maximumPoolSize the maximum number of threads to allow in the pool 98 * @param keepAliveTime the max time a thread can remain idle 99 * @param unit the unit for the keepAliveTime 100 */ 101 public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) 102 { 103 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>()); 104 } 105 106 /* ------------------------------------------------------------ */ 107 108 /** 109 * Wraps an {@link ThreadPoolExecutor} 110 * @param corePoolSize the number of threads to keep in the pool, even if they are idle 111 * @param maximumPoolSize the maximum number of threads to allow in the pool 112 * @param keepAliveTime the max time a thread can remain idle 113 * @param unit the unit for the keepAliveTime 114 * @param workQueue the queue to use for holding tasks before they are executed 115 */ 116 public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) 117 { 118 this(new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue)); 119 } 120 121 /* ------------------------------------------------------------ */ 122 public boolean dispatch(Runnable job) 123 { 124 try 125 { 126 _executor.execute(job); 127 return true; 128 } 129 catch(RejectedExecutionException e) 130 { 131 LOG.warn(e); 132 return false; 133 } 134 } 135 136 /* ------------------------------------------------------------ */ 137 public int getIdleThreads() 138 { 139 if (_executor instanceof ThreadPoolExecutor) 140 { 141 final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor; 142 return tpe.getPoolSize() - tpe.getActiveCount(); 143 } 144 return -1; 145 } 146 147 /* ------------------------------------------------------------ */ 148 public int getThreads() 149 { 150 if (_executor instanceof ThreadPoolExecutor) 151 { 152 final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor; 153 return tpe.getPoolSize(); 154 } 155 return -1; 156 } 157 158 /* ------------------------------------------------------------ */ 159 public boolean isLowOnThreads() 160 { 161 if (_executor instanceof ThreadPoolExecutor) 162 { 163 final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor; 164 // getActiveCount() locks the thread pool, so execute it last 165 return tpe.getPoolSize() == tpe.getMaximumPoolSize() && 166 tpe.getQueue().size() >= tpe.getPoolSize() - tpe.getActiveCount(); 167 } 168 return false; 169 } 170 171 /* ------------------------------------------------------------ */ 172 public void join() throws InterruptedException 173 { 174 _executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); 175 } 176 177 /* ------------------------------------------------------------ */ 178 @Override 179 protected void doStop() throws Exception 180 { 181 super.doStop(); 182 _executor.shutdownNow(); 183 } 184 }