View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util.thread.strategy;
20  
21  import java.io.Closeable;
22  import java.util.concurrent.Executor;
23  
24  import org.eclipse.jetty.util.log.Log;
25  import org.eclipse.jetty.util.log.Logger;
26  import org.eclipse.jetty.util.thread.ExecutionStrategy;
27  import org.eclipse.jetty.util.thread.Locker;
28  import org.eclipse.jetty.util.thread.Locker.Lock;
29  import org.eclipse.jetty.util.thread.ThreadPool;
30  
31  /**
32   * <p>A strategy where the thread that produces will always run the resulting task.</p>
33   * <p>The strategy may then dispatch another thread to continue production.</p>
34   * <p>The strategy is also known by the nickname 'eat what you kill', which comes from
35   * the hunting ethic that says a person should not kill anything he or she does not
36   * plan on eating. In this case, the phrase is used to mean that a thread should
37   * not produce a task that it does not intend to run. By making producers run the
38   * task that they have just produced avoids execution delays and avoids parallel slow
39   * down by running the task in the same core, with good chances of having a hot CPU
40   * cache. It also avoids the creation of a queue of produced tasks that the system
41   * does not yet have capacity to consume, which can save memory and exert back
42   * pressure on producers.</p>
43   */
44  public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements ExecutionStrategy, Runnable
45  {
46      private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
47  
48      private final Locker _locker = new Locker();
49      private final Runnable _runExecute = new RunExecute();
50      private final Producer _producer;
51      private final ThreadPool _threadPool;
52      private boolean _idle = true;
53      private boolean _execute;
54      private boolean _producing;
55      private boolean _pending;
56      private boolean _lowThreads;
57  
58      public ExecuteProduceConsume(Producer producer, Executor executor)
59      {
60          super(executor);
61          this._producer = producer;
62          _threadPool = executor instanceof ThreadPool ? (ThreadPool)executor : null;
63      }
64  
65      @Deprecated
66      public ExecuteProduceConsume(Producer producer, Executor executor, ExecutionStrategy lowResourceStrategy)
67      {
68          this(producer, executor);
69      }
70  
71      @Override
72      public void execute()
73      {
74          if (LOG.isDebugEnabled())
75              LOG.debug("{} execute", this);
76  
77          boolean produce = false;
78          try (Lock locked = _locker.lock())
79          {
80              // If we are idle and a thread is not producing
81              if (_idle)
82              {
83                  if (_producing)
84                      throw new IllegalStateException();
85  
86                  // Then this thread will do the producing
87                  produce = _producing = true;
88                  // and we are no longer idle
89                  _idle = false;
90              }
91              else
92              {
93                  // Otherwise, lets tell the producing thread
94                  // that it should call produce again before going idle
95                  _execute = true;
96              }
97          }
98  
99          if (produce)
100             produceConsume();
101     }
102 
103     @Override
104     public void dispatch()
105     {
106         if (LOG.isDebugEnabled())
107             LOG.debug("{} spawning", this);
108         boolean dispatch = false;
109         try (Lock locked = _locker.lock())
110         {
111             if (_idle)
112                 dispatch = true;
113             else
114                 _execute = true;
115         }
116         if (dispatch)
117             execute(_runExecute);
118     }
119 
120     @Override
121     public void run()
122     {
123         if (LOG.isDebugEnabled())
124             LOG.debug("{} run", this);
125         boolean produce = false;
126         try (Lock locked = _locker.lock())
127         {
128             _pending = false;
129             if (!_idle && !_producing)
130             {
131                 produce = _producing = true;
132             }
133         }
134 
135         if (produce)
136             produceConsume();
137     }
138 
139     private void produceConsume()
140     {
141         if (_threadPool != null && _threadPool.isLowOnThreads())
142         {
143             // If we are low on threads we must not produce and consume
144             // in the same thread, but produce and execute to consume.
145             if (!produceExecuteConsume())
146                 return;
147         }
148         executeProduceConsume();
149     }
150 
151     public boolean isLowOnThreads()
152     {
153         return _lowThreads;
154     }
155 
156     /**
157      * @return true if we are still producing
158      */
159     private boolean produceExecuteConsume()
160     {
161         if (LOG.isDebugEnabled())
162             LOG.debug("{} enter low threads mode", this);
163         _lowThreads = true;
164         try
165         {
166             boolean idle = false;
167             while (_threadPool.isLowOnThreads())
168             {
169                 Runnable task = _producer.produce();
170                 if (LOG.isDebugEnabled())
171                     LOG.debug("{} produced {}", _producer, task);
172 
173                 if (task == null)
174                 {
175                     // No task, so we are now idle
176                     try (Lock locked = _locker.lock())
177                     {
178                         if (_execute)
179                         {
180                             _execute = false;
181                             _producing = true;
182                             _idle = false;
183                             continue;
184                         }
185 
186                         _producing = false;
187                         idle = _idle = true;
188                         break;
189                     }
190                 }
191 
192                 // Execute the task.
193                 executeProduct(task);
194             }
195             return !idle;
196         }
197         finally
198         {
199             _lowThreads = false;
200             if (LOG.isDebugEnabled())
201                 LOG.debug("{} exit low threads mode", this);
202         }
203     }
204 
205     /**
206      * <p>Only called when in {@link #isLowOnThreads() low threads mode}
207      * to execute the task produced by the producer.</p>
208      * <p>Because </p>
209      * <p>If the task implements {@link Rejectable}, then {@link Rejectable#reject()}
210      * is immediately called on the task object. If the task also implements
211      * {@link Closeable}, then {@link Closeable#close()} is called on the task object.</p>
212      * <p>If the task does not implement {@link Rejectable}, then it is
213      * {@link #execute(Runnable) executed}.</p>
214      *
215      * @param task the produced task to execute
216      */
217     protected void executeProduct(Runnable task)
218     {
219         if (task instanceof Rejectable)
220         {
221             try
222             {
223                 ((Rejectable)task).reject();
224                 if (task instanceof Closeable)
225                     ((Closeable)task).close();
226             }
227             catch (Throwable x)
228             {
229                 LOG.debug(x);
230             }
231         }
232         else
233         {
234             execute(task);
235         }
236     }
237 
238     private void executeProduceConsume()
239     {
240         if (LOG.isDebugEnabled())
241             LOG.debug("{} produce enter", this);
242 
243         while (true)
244         {
245             // If we got here, then we are the thread that is producing.
246             if (LOG.isDebugEnabled())
247                 LOG.debug("{} producing", this);
248 
249             Runnable task = _producer.produce();
250 
251             if (LOG.isDebugEnabled())
252                 LOG.debug("{} produced {}", this, task);
253 
254             boolean dispatch = false;
255             try (Lock locked = _locker.lock())
256             {
257                 // Finished producing
258                 _producing = false;
259 
260                 // Did we produced a task?
261                 if (task == null)
262                 {
263                     // There is no task.
264                     // Could another one just have been queued with an execute?
265                     if (_execute)
266                     {
267                         _idle = false;
268                         _producing = true;
269                         _execute = false;
270                         continue;
271                     }
272 
273                     // ... and no additional calls to execute, so we are idle
274                     _idle = true;
275                     break;
276                 }
277 
278                 // We have a task, which we will run ourselves,
279                 // so if we don't have another thread pending
280                 if (!_pending)
281                 {
282                     // dispatch one
283                     dispatch = _pending = true;
284                 }
285 
286                 _execute = false;
287             }
288 
289             // If we became pending
290             if (dispatch)
291             {
292                 // Spawn a new thread to continue production by running the produce loop.
293                 if (LOG.isDebugEnabled())
294                     LOG.debug("{} dispatch", this);
295                 if (!execute(this))
296                     task = null;
297             }
298 
299             // Run the task.
300             if (LOG.isDebugEnabled())
301                 LOG.debug("{} run {}", this, task);
302             if (task != null)
303                 task.run();
304             if (LOG.isDebugEnabled())
305                 LOG.debug("{} ran {}", this, task);
306 
307             // Once we have run the task, we can try producing again.
308             try (Lock locked = _locker.lock())
309             {
310                 // Is another thread already producing or we are now idle?
311                 if (_producing || _idle)
312                     break;
313                 _producing = true;
314             }
315         }
316 
317         if (LOG.isDebugEnabled())
318             LOG.debug("{} produce exit", this);
319     }
320 
321     public Boolean isIdle()
322     {
323         try (Lock locked = _locker.lock())
324         {
325             return _idle;
326         }
327     }
328 
329     public String toString()
330     {
331         StringBuilder builder = new StringBuilder();
332         builder.append("EPC ");
333         try (Lock locked = _locker.lock())
334         {
335             builder.append(_idle ? "Idle/" : "");
336             builder.append(_producing ? "Prod/" : "");
337             builder.append(_pending ? "Pend/" : "");
338             builder.append(_execute ? "Exec/" : "");
339         }
340         builder.append(_producer);
341         return builder.toString();
342     }
343 
344     private class RunExecute implements Runnable
345     {
346         @Override
347         public void run()
348         {
349             execute();
350         }
351     }
352 
353     public static class Factory implements ExecutionStrategy.Factory
354     {
355         @Override
356         public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
357         {
358             return new ExecuteProduceConsume(producer, executor);
359         }
360     }
361 }