View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util.thread.strategy;
20  
21  import java.util.concurrent.Executor;
22  
23  import org.eclipse.jetty.util.log.Log;
24  import org.eclipse.jetty.util.log.Logger;
25  import org.eclipse.jetty.util.thread.ExecutionStrategy;
26  import org.eclipse.jetty.util.thread.Locker;
27  import org.eclipse.jetty.util.thread.Locker.Lock;
28  import org.eclipse.jetty.util.thread.ThreadPool;
29  
30  /**
31   * <p>A strategy where the thread calls produce will always run the resulting task
32   * itself.  The strategy may dispatches another thread to continue production.
33   * </p>
34   * <p>The strategy is also known by the nickname 'eat what you kill', which comes from
35   * the hunting ethic that says a person should not kill anything he or she does not
36   * plan on eating. In this case, the phrase is used to mean that a thread should
37   * not produce a task that it does not intend to run. By making producers run the
38   * task that they have just produced avoids execution delays and avoids parallel slow
39   * down by running the task in the same core, with good chances of having a hot CPU
40   * cache. It also avoids the creation of a queue of produced tasks that the system
41   * does not yet have capacity to consume, which can save memory and exert back
42   * pressure on producers.
43   * </p>
44   */
45  public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
46  {
47      private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
48      private final Locker _locker = new Locker();
49      private final Runnable _runExecute = new RunExecute();
50      private final Producer _producer;
51      private final Executor _executor;
52      private boolean _idle=true;
53      private boolean _execute;
54      private boolean _producing;
55      private boolean _pending;
56      private final ThreadPool _threadpool;
57      private final ExecutionStrategy _lowresources;
58  
59      public ExecuteProduceConsume(Producer producer, Executor executor)
60      {
61          this(producer,executor,(executor instanceof ThreadPool)?new ProduceExecuteConsume(producer,executor):null);
62      }
63      
64      public ExecuteProduceConsume(Producer producer, Executor executor, ExecutionStrategy lowResourceStrategy)
65      {
66          this._producer = producer;
67          this._executor = executor;
68          _threadpool = (executor instanceof ThreadPool)?((ThreadPool)executor):null;
69          _lowresources = _threadpool==null?null:lowResourceStrategy;
70      }
71  
72      @Override
73      public void execute()
74      {
75          if (LOG.isDebugEnabled())
76              LOG.debug("{} execute",this);
77          
78          boolean produce=false;
79          try (Lock locked = _locker.lock())
80          {
81              // If we are idle and a thread is not producing
82              if (_idle)
83              {
84                  if (_producing)
85                      throw new IllegalStateException();
86  
87                  // Then this thread will do the producing
88                  produce=_producing=true;
89                  // and we are no longer idle
90                  _idle=false;
91              }
92              else
93              {
94                  // Otherwise, lets tell the producing thread
95                  // that it should call produce again before going idle
96                  _execute=true;
97              }
98          }
99  
100         if (produce)
101             produceAndRun();
102     }
103 
104     @Override
105     public void dispatch()
106     {
107         if (LOG.isDebugEnabled())
108             LOG.debug("{} spawning",this);
109         boolean dispatch=false;
110         try (Lock locked = _locker.lock())
111         {
112             if (_idle)
113                 dispatch=true;
114             else
115                 _execute=true;
116         }
117         if (dispatch)
118             _executor.execute(_runExecute);
119     }
120 
121     @Override
122     public void run()
123     {
124         if (LOG.isDebugEnabled())
125             LOG.debug("{} run",this);
126         boolean produce=false;
127         try (Lock locked = _locker.lock())
128         {
129             _pending=false;
130             if (!_idle && !_producing)
131             {
132                 produce=_producing=true;
133             }
134         }
135 
136         if (produce)
137         {
138             // If we are low on resources, then switch to PEC strategy which does not
139             // suffer as badly from thread starvation
140             while (_threadpool!=null && _threadpool.isLowOnThreads())
141             {
142                 LOG.debug("EWYK low resources {}",this);
143                 _lowresources.execute();
144             }
145             
146             // no longer low resources so produceAndRun normally
147             produceAndRun();
148         }
149     }
150 
151     private void produceAndRun()
152     {
153         if (LOG.isDebugEnabled())
154             LOG.debug("{} produce enter",this);
155 
156         while (true)
157         {
158             // If we got here, then we are the thread that is producing.
159             if (LOG.isDebugEnabled())
160                 LOG.debug("{} producing",this);
161 
162             Runnable task = _producer.produce();
163 
164             if (LOG.isDebugEnabled())
165                 LOG.debug("{} produced {}",this,task);
166 
167             boolean dispatch=false;
168             try (Lock locked = _locker.lock())
169             {
170                 // Finished producing
171                 _producing=false;
172 
173                 // Did we produced a task?
174                 if (task == null)
175                 {
176                     // There is no task.
177                     if (_execute)
178                     {
179                         _idle=false;
180                         _producing=true;
181                         _execute=false;
182                         continue;
183                     }
184 
185                     // ... and no additional calls to execute, so we are idle
186                     _idle=true;
187                     break;
188                 }
189 
190                 // We have a task, which we will run ourselves,
191                 // so if we don't have another thread pending
192                 if (!_pending)
193                 {
194                     // dispatch one
195                     dispatch=_pending=true;
196                 }
197 
198                 _execute=false;
199             }
200 
201             // If we became pending
202             if (dispatch)
203             {
204                 // Spawn a new thread to continue production by running the produce loop.
205                 if (LOG.isDebugEnabled())
206                     LOG.debug("{} dispatch",this);
207                 _executor.execute(this);
208             }
209 
210             // Run the task.
211             if (LOG.isDebugEnabled())
212                 LOG.debug("{} run {}",this,task);
213             task.run();
214             if (LOG.isDebugEnabled())
215                 LOG.debug("{} ran {}",this,task);
216 
217             // Once we have run the task, we can try producing again.
218             try (Lock locked = _locker.lock())
219             {
220                 // Is another thread already producing or we are now idle?
221                 if (_producing || _idle)
222                     break;
223                 _producing=true;
224             }
225         }
226 
227         if (LOG.isDebugEnabled())
228             LOG.debug("{} produce exit",this);
229     }
230 
231     public Boolean isIdle()
232     {
233         try (Lock locked = _locker.lock())
234         {
235             return _idle;
236         }
237     }
238 
239     public String toString()
240     {
241         StringBuilder builder = new StringBuilder();
242         builder.append("EPR ");
243         try (Lock locked = _locker.lock())
244         {
245             builder.append(_idle?"Idle/":"");
246             builder.append(_producing?"Prod/":"");
247             builder.append(_pending?"Pend/":"");
248             builder.append(_execute?"Exec/":"");
249         }
250         builder.append(_producer);
251         return builder.toString();
252     }
253 
254     private class RunExecute implements Runnable
255     {
256         @Override
257         public void run()
258         {
259             execute();
260         }
261     }
262 }