1 // ========================================================================
2 // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3 // ------------------------------------------------------------------------
4 // All rights reserved. This program and the accompanying materials
5 // are made available under the terms of the Eclipse Public License v1.0
6 // and Apache License v2.0 which accompanies this distribution.
7 // The Eclipse Public License is available at
8 // http://www.eclipse.org/legal/epl-v10.html
9 // The Apache License v2.0 is available at
10 // http://www.opensource.org/licenses/apache2.0.php
11 // You may elect to redistribute this code under either of these licenses.
12 // ========================================================================
13
14 package org.eclipse.jetty.util.thread;
15
16 import java.util.concurrent.ArrayBlockingQueue;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.LinkedBlockingQueue;
20 import java.util.concurrent.RejectedExecutionException;
21 import java.util.concurrent.SynchronousQueue;
22 import java.util.concurrent.ThreadPoolExecutor;
23 import java.util.concurrent.TimeUnit;
24
25 import org.eclipse.jetty.util.component.AbstractLifeCycle;
26 import org.eclipse.jetty.util.component.LifeCycle;
27 import org.eclipse.jetty.util.log.Log;
28 import org.eclipse.jetty.util.log.Logger;
29
30 /* ------------------------------------------------------------ */
31 /**
32 * Jetty ThreadPool using java 5 ThreadPoolExecutor
33 * This class wraps a {@link ExecutorService} as a {@link ThreadPool} and
34 * {@link LifeCycle} interfaces so that it may be used by the Jetty <code>org.eclipse.jetty.server.Server</code>
35 */
36 public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool, LifeCycle
37 {
38 private static final Logger LOG = Log.getLogger(ExecutorThreadPool.class);
39 private final ExecutorService _executor;
40
41 /* ------------------------------------------------------------ */
42 public ExecutorThreadPool(ExecutorService executor)
43 {
44 _executor=executor;
45 }
46
47 /* ------------------------------------------------------------ */
48 /** constructor.
49 * Wraps an {@link ThreadPoolExecutor}.
50 * Core size is 32, max pool size is 256, pool thread timeout after 60 seconds and
51 * an unbounded {@link LinkedBlockingQueue} is used for the job queue;
52 */
53 public ExecutorThreadPool()
54 {
55 this(new ThreadPoolExecutor(32,256,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()));
56 }
57
58 /* ------------------------------------------------------------ */
59 /** constructor.
60 * Wraps an {@link ThreadPoolExecutor}.
61 * Core size is 32, max pool size is 256, pool thread timeout after 60 seconds
62 * @param queueSize if -1, an unbounded {@link LinkedBlockingQueue} is used, if 0 then a
63 * {@link SynchronousQueue} is used, other a {@link ArrayBlockingQueue} of the given size is used.
64 */
65 public ExecutorThreadPool(int queueSize)
66 {
67 this(new ThreadPoolExecutor(32,256,60,TimeUnit.SECONDS,
68 queueSize<0?new LinkedBlockingQueue<Runnable>()
69 : (queueSize==0?new SynchronousQueue<Runnable>()
70 :new ArrayBlockingQueue<Runnable>(queueSize))));
71 }
72
73 /* ------------------------------------------------------------ */
74 /** constructor.
75 * Wraps an {@link ThreadPoolExecutor} using
76 * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue;
77 */
78 public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime)
79 {
80 this(new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
81 }
82
83 /* ------------------------------------------------------------ */
84 /** constructor.
85 * Wraps an {@link ThreadPoolExecutor} using
86 * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue;
87 */
88 public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit)
89 {
90 this(new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,new LinkedBlockingQueue<Runnable>()));
91 }
92
93 /* ------------------------------------------------------------ */
94 public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
95 {
96 this(new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue));
97 }
98
99
100 /* ------------------------------------------------------------ */
101 public boolean dispatch(Runnable job)
102 {
103 try
104 {
105 _executor.execute(job);
106 return true;
107 }
108 catch(RejectedExecutionException e)
109 {
110 LOG.warn(e);
111 return false;
112 }
113 }
114
115 /* ------------------------------------------------------------ */
116 public int getIdleThreads()
117 {
118 if (_executor instanceof ThreadPoolExecutor)
119 {
120 final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
121 return tpe.getPoolSize() -tpe.getActiveCount();
122 }
123 return -1;
124 }
125
126 /* ------------------------------------------------------------ */
127 public int getThreads()
128 {
129 if (_executor instanceof ThreadPoolExecutor)
130 {
131 final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
132 return tpe.getPoolSize();
133 }
134 return -1;
135 }
136
137 /* ------------------------------------------------------------ */
138 public boolean isLowOnThreads()
139 {
140 if (_executor instanceof ThreadPoolExecutor)
141 {
142 final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
143 return tpe.getTaskCount()>=(tpe.getMaximumPoolSize());
144 }
145 return false;
146 }
147
148 /* ------------------------------------------------------------ */
149 public void join() throws InterruptedException
150 {
151 _executor.awaitTermination(Long.MAX_VALUE,TimeUnit.MILLISECONDS);
152 }
153
154 /* ------------------------------------------------------------ */
155 @Override
156 protected void doStart() throws Exception
157 {
158 super.doStart();
159 }
160
161 /* ------------------------------------------------------------ */
162 @Override
163 protected void doStop() throws Exception
164 {
165 super.doStop();
166 _executor.shutdownNow();
167 }
168
169 }