1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.util.thread;
20
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.SynchronousQueue;
27 import java.util.concurrent.ThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29
30 import org.eclipse.jetty.util.component.AbstractLifeCycle;
31 import org.eclipse.jetty.util.component.LifeCycle;
32 import org.eclipse.jetty.util.log.Log;
33 import org.eclipse.jetty.util.log.Logger;
34
35
36
37
38
39
40
41 public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool, LifeCycle
42 {
43 private static final Logger LOG = Log.getLogger(ExecutorThreadPool.class);
44 private final ExecutorService _executor;
45
46
47 public ExecutorThreadPool(ExecutorService executor)
48 {
49 _executor = executor;
50 }
51
52
53
54
55
56
57
58 public ExecutorThreadPool()
59 {
60
61
62 this(new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()));
63 }
64
65
66
67
68
69
70
71
72 public ExecutorThreadPool(int queueSize)
73 {
74 this(queueSize < 0 ? new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) :
75 queueSize == 0 ? new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()) :
76 new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize)));
77 }
78
79
80
81
82
83
84
85
86
87 public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime)
88 {
89 this(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS);
90 }
91
92
93
94
95
96
97
98
99
100
101 public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit)
102 {
103 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>());
104 }
105
106
107
108
109
110
111
112
113
114
115
116 public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
117 {
118 this(new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue));
119 }
120
121
122
123 @Override
124 public void execute(Runnable job)
125 {
126 _executor.execute(job);
127 }
128
129
130 public boolean dispatch(Runnable job)
131 {
132 try
133 {
134 _executor.execute(job);
135 return true;
136 }
137 catch(RejectedExecutionException e)
138 {
139 LOG.warn(e);
140 return false;
141 }
142 }
143
144
145 public int getIdleThreads()
146 {
147 if (_executor instanceof ThreadPoolExecutor)
148 {
149 final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
150 return tpe.getPoolSize() - tpe.getActiveCount();
151 }
152 return -1;
153 }
154
155
156 public int getThreads()
157 {
158 if (_executor instanceof ThreadPoolExecutor)
159 {
160 final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
161 return tpe.getPoolSize();
162 }
163 return -1;
164 }
165
166
167 public boolean isLowOnThreads()
168 {
169 if (_executor instanceof ThreadPoolExecutor)
170 {
171 final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
172
173 return tpe.getPoolSize() == tpe.getMaximumPoolSize() &&
174 tpe.getQueue().size() >= tpe.getPoolSize() - tpe.getActiveCount();
175 }
176 return false;
177 }
178
179
180 public void join() throws InterruptedException
181 {
182 _executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
183 }
184
185
186 @Override
187 protected void doStop() throws Exception
188 {
189 super.doStop();
190 _executor.shutdownNow();
191 }
192 }