1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.util.thread.strategy;
20
21 import java.io.Closeable;
22 import java.util.concurrent.Executor;
23
24 import org.eclipse.jetty.util.log.Log;
25 import org.eclipse.jetty.util.log.Logger;
26 import org.eclipse.jetty.util.thread.ExecutionStrategy;
27 import org.eclipse.jetty.util.thread.Locker;
28 import org.eclipse.jetty.util.thread.Locker.Lock;
29 import org.eclipse.jetty.util.thread.ThreadPool;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements ExecutionStrategy, Runnable
45 {
46 private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
47
48 private final Locker _locker = new Locker();
49 private final Runnable _runExecute = new RunExecute();
50 private final Producer _producer;
51 private final ThreadPool _threadPool;
52 private boolean _idle = true;
53 private boolean _execute;
54 private boolean _producing;
55 private boolean _pending;
56 private boolean _lowThreads;
57
58 public ExecuteProduceConsume(Producer producer, Executor executor)
59 {
60 super(executor);
61 this._producer = producer;
62 _threadPool = executor instanceof ThreadPool ? (ThreadPool)executor : null;
63 }
64
65 @Deprecated
66 public ExecuteProduceConsume(Producer producer, Executor executor, ExecutionStrategy lowResourceStrategy)
67 {
68 this(producer, executor);
69 }
70
71 @Override
72 public void execute()
73 {
74 if (LOG.isDebugEnabled())
75 LOG.debug("{} execute", this);
76
77 boolean produce = false;
78 try (Lock locked = _locker.lock())
79 {
80
81 if (_idle)
82 {
83 if (_producing)
84 throw new IllegalStateException();
85
86
87 produce = _producing = true;
88
89 _idle = false;
90 }
91 else
92 {
93
94
95 _execute = true;
96 }
97 }
98
99 if (produce)
100 produceConsume();
101 }
102
103 @Override
104 public void dispatch()
105 {
106 if (LOG.isDebugEnabled())
107 LOG.debug("{} spawning", this);
108 boolean dispatch = false;
109 try (Lock locked = _locker.lock())
110 {
111 if (_idle)
112 dispatch = true;
113 else
114 _execute = true;
115 }
116 if (dispatch)
117 execute(_runExecute);
118 }
119
120 @Override
121 public void run()
122 {
123 if (LOG.isDebugEnabled())
124 LOG.debug("{} run", this);
125 boolean produce = false;
126 try (Lock locked = _locker.lock())
127 {
128 _pending = false;
129 if (!_idle && !_producing)
130 {
131 produce = _producing = true;
132 }
133 }
134
135 if (produce)
136 produceConsume();
137 }
138
139 private void produceConsume()
140 {
141 if (_threadPool != null && _threadPool.isLowOnThreads())
142 {
143
144
145 if (!produceExecuteConsume())
146 return;
147 }
148 executeProduceConsume();
149 }
150
151 public boolean isLowOnThreads()
152 {
153 return _lowThreads;
154 }
155
156
157
158
159 private boolean produceExecuteConsume()
160 {
161 if (LOG.isDebugEnabled())
162 LOG.debug("{} enter low threads mode", this);
163 _lowThreads = true;
164 try
165 {
166 boolean idle = false;
167 while (_threadPool.isLowOnThreads())
168 {
169 Runnable task = _producer.produce();
170 if (LOG.isDebugEnabled())
171 LOG.debug("{} produced {}", _producer, task);
172
173 if (task == null)
174 {
175
176 try (Lock locked = _locker.lock())
177 {
178 if (_execute)
179 {
180 _execute = false;
181 _producing = true;
182 _idle = false;
183 continue;
184 }
185
186 _producing = false;
187 idle = _idle = true;
188 break;
189 }
190 }
191
192
193 executeProduct(task);
194 }
195 return !idle;
196 }
197 finally
198 {
199 _lowThreads = false;
200 if (LOG.isDebugEnabled())
201 LOG.debug("{} exit low threads mode", this);
202 }
203 }
204
205
206
207
208
209
210
211
212
213
214
215
216
217 protected void executeProduct(Runnable task)
218 {
219 if (task instanceof Rejectable)
220 {
221 try
222 {
223 ((Rejectable)task).reject();
224 if (task instanceof Closeable)
225 ((Closeable)task).close();
226 }
227 catch (Throwable x)
228 {
229 LOG.debug(x);
230 }
231 }
232 else
233 {
234 execute(task);
235 }
236 }
237
238 private void executeProduceConsume()
239 {
240 if (LOG.isDebugEnabled())
241 LOG.debug("{} produce enter", this);
242
243 while (true)
244 {
245
246 if (LOG.isDebugEnabled())
247 LOG.debug("{} producing", this);
248
249 Runnable task = _producer.produce();
250
251 if (LOG.isDebugEnabled())
252 LOG.debug("{} produced {}", this, task);
253
254 boolean dispatch = false;
255 try (Lock locked = _locker.lock())
256 {
257
258 _producing = false;
259
260
261 if (task == null)
262 {
263
264
265 if (_execute)
266 {
267 _idle = false;
268 _producing = true;
269 _execute = false;
270 continue;
271 }
272
273
274 _idle = true;
275 break;
276 }
277
278
279
280 if (!_pending)
281 {
282
283 dispatch = _pending = true;
284 }
285
286 _execute = false;
287 }
288
289
290 if (dispatch)
291 {
292
293 if (LOG.isDebugEnabled())
294 LOG.debug("{} dispatch", this);
295 if (!execute(this))
296 task = null;
297 }
298
299
300 if (LOG.isDebugEnabled())
301 LOG.debug("{} run {}", this, task);
302 if (task != null)
303 task.run();
304 if (LOG.isDebugEnabled())
305 LOG.debug("{} ran {}", this, task);
306
307
308 try (Lock locked = _locker.lock())
309 {
310
311 if (_producing || _idle)
312 break;
313 _producing = true;
314 }
315 }
316
317 if (LOG.isDebugEnabled())
318 LOG.debug("{} produce exit", this);
319 }
320
321 public Boolean isIdle()
322 {
323 try (Lock locked = _locker.lock())
324 {
325 return _idle;
326 }
327 }
328
329 public String toString()
330 {
331 StringBuilder builder = new StringBuilder();
332 builder.append("EPC ");
333 try (Lock locked = _locker.lock())
334 {
335 builder.append(_idle ? "Idle/" : "");
336 builder.append(_producing ? "Prod/" : "");
337 builder.append(_pending ? "Pend/" : "");
338 builder.append(_execute ? "Exec/" : "");
339 }
340 builder.append(_producer);
341 return builder.toString();
342 }
343
344 private class RunExecute implements Runnable
345 {
346 @Override
347 public void run()
348 {
349 execute();
350 }
351 }
352
353 public static class Factory implements ExecutionStrategy.Factory
354 {
355 @Override
356 public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
357 {
358 return new ExecuteProduceConsume(producer, executor);
359 }
360 }
361 }