1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.util.thread.strategy;
20
21 import java.util.concurrent.Executor;
22
23 import org.eclipse.jetty.util.log.Log;
24 import org.eclipse.jetty.util.log.Logger;
25 import org.eclipse.jetty.util.thread.ExecutionStrategy;
26 import org.eclipse.jetty.util.thread.Locker;
27 import org.eclipse.jetty.util.thread.Locker.Lock;
28 import org.eclipse.jetty.util.thread.ThreadPool;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
46 {
47 private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
48 private final Locker _locker = new Locker();
49 private final Runnable _runExecute = new RunExecute();
50 private final Producer _producer;
51 private final Executor _executor;
52 private boolean _idle=true;
53 private boolean _execute;
54 private boolean _producing;
55 private boolean _pending;
56 private final ThreadPool _threadpool;
57 private final ExecutionStrategy _lowresources;
58
59 public ExecuteProduceConsume(Producer producer, Executor executor)
60 {
61 this(producer,executor,(executor instanceof ThreadPool)?new ProduceExecuteConsume(producer,executor):null);
62 }
63
64 public ExecuteProduceConsume(Producer producer, Executor executor, ExecutionStrategy lowResourceStrategy)
65 {
66 this._producer = producer;
67 this._executor = executor;
68 _threadpool = (executor instanceof ThreadPool)?((ThreadPool)executor):null;
69 _lowresources = _threadpool==null?null:lowResourceStrategy;
70 }
71
72 @Override
73 public void execute()
74 {
75 if (LOG.isDebugEnabled())
76 LOG.debug("{} execute",this);
77
78 boolean produce=false;
79 try (Lock locked = _locker.lock())
80 {
81
82 if (_idle)
83 {
84 if (_producing)
85 throw new IllegalStateException();
86
87
88 produce=_producing=true;
89
90 _idle=false;
91 }
92 else
93 {
94
95
96 _execute=true;
97 }
98 }
99
100 if (produce)
101 produceAndRun();
102 }
103
104 @Override
105 public void dispatch()
106 {
107 if (LOG.isDebugEnabled())
108 LOG.debug("{} spawning",this);
109 boolean dispatch=false;
110 try (Lock locked = _locker.lock())
111 {
112 if (_idle)
113 dispatch=true;
114 else
115 _execute=true;
116 }
117 if (dispatch)
118 _executor.execute(_runExecute);
119 }
120
121 @Override
122 public void run()
123 {
124 if (LOG.isDebugEnabled())
125 LOG.debug("{} run",this);
126 boolean produce=false;
127 try (Lock locked = _locker.lock())
128 {
129 _pending=false;
130 if (!_idle && !_producing)
131 {
132 produce=_producing=true;
133 }
134 }
135
136 if (produce)
137 {
138
139
140 while (_threadpool!=null && _threadpool.isLowOnThreads())
141 {
142 LOG.debug("EWYK low resources {}",this);
143 _lowresources.execute();
144 }
145
146
147 produceAndRun();
148 }
149 }
150
151 private void produceAndRun()
152 {
153 if (LOG.isDebugEnabled())
154 LOG.debug("{} produce enter",this);
155
156 while (true)
157 {
158
159 if (LOG.isDebugEnabled())
160 LOG.debug("{} producing",this);
161
162 Runnable task = _producer.produce();
163
164 if (LOG.isDebugEnabled())
165 LOG.debug("{} produced {}",this,task);
166
167 boolean dispatch=false;
168 try (Lock locked = _locker.lock())
169 {
170
171 _producing=false;
172
173
174 if (task == null)
175 {
176
177 if (_execute)
178 {
179 _idle=false;
180 _producing=true;
181 _execute=false;
182 continue;
183 }
184
185
186 _idle=true;
187 break;
188 }
189
190
191
192 if (!_pending)
193 {
194
195 dispatch=_pending=true;
196 }
197
198 _execute=false;
199 }
200
201
202 if (dispatch)
203 {
204
205 if (LOG.isDebugEnabled())
206 LOG.debug("{} dispatch",this);
207 _executor.execute(this);
208 }
209
210
211 if (LOG.isDebugEnabled())
212 LOG.debug("{} run {}",this,task);
213 task.run();
214 if (LOG.isDebugEnabled())
215 LOG.debug("{} ran {}",this,task);
216
217
218 try (Lock locked = _locker.lock())
219 {
220
221 if (_producing || _idle)
222 break;
223 _producing=true;
224 }
225 }
226
227 if (LOG.isDebugEnabled())
228 LOG.debug("{} produce exit",this);
229 }
230
231 public Boolean isIdle()
232 {
233 try (Lock locked = _locker.lock())
234 {
235 return _idle;
236 }
237 }
238
239 public String toString()
240 {
241 StringBuilder builder = new StringBuilder();
242 builder.append("EPR ");
243 try (Lock locked = _locker.lock())
244 {
245 builder.append(_idle?"Idle/":"");
246 builder.append(_producing?"Prod/":"");
247 builder.append(_pending?"Pend/":"");
248 builder.append(_execute?"Exec/":"");
249 }
250 builder.append(_producer);
251 return builder.toString();
252 }
253
254 private class RunExecute implements Runnable
255 {
256 @Override
257 public void run()
258 {
259 execute();
260 }
261 }
262 }