View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.util.thread.strategy;
20  
21  import java.util.concurrent.Executor;
22  
23  import org.eclipse.jetty.util.log.Log;
24  import org.eclipse.jetty.util.log.Logger;
25  import org.eclipse.jetty.util.thread.ExecutionStrategy;
26  import org.eclipse.jetty.util.thread.Locker;
27  import org.eclipse.jetty.util.thread.Locker.Lock;
28  
29  /**
30   * <p>A strategy where the caller thread iterates over task production, submitting each
31   * task to an {@link Executor} for execution.</p>
32   */
33  public class ProduceExecuteConsume extends ExecutingExecutionStrategy implements ExecutionStrategy
34  {
35      private static final Logger LOG = Log.getLogger(ProduceExecuteConsume.class);
36  
37      private final Locker _locker = new Locker();
38      private final Producer _producer;
39      private State _state = State.IDLE;
40  
41      public ProduceExecuteConsume(Producer producer, Executor executor)
42      {
43          super(executor);
44          this._producer = producer;
45      }
46  
47      @Override
48      public void execute()
49      {
50          try (Lock locked = _locker.lock())
51          {
52              switch(_state)
53              {
54                  case IDLE:
55                      _state=State.PRODUCE;
56                      break;
57  
58                  case PRODUCE:
59                  case EXECUTE:
60                      _state=State.EXECUTE;
61                      return;
62              }
63          }
64  
65          // Produce until we no task is found.
66          while (true)
67          {
68              // Produce a task.
69              Runnable task = _producer.produce();
70              if (LOG.isDebugEnabled())
71                  LOG.debug("{} produced {}", _producer, task);
72  
73              if (task == null)
74              {
75                  try (Lock locked = _locker.lock())
76                  {
77                      switch(_state)
78                      {
79                          case IDLE:
80                              throw new IllegalStateException();
81                          case PRODUCE:
82                              _state=State.IDLE;
83                              return;
84                          case EXECUTE:
85                              _state=State.PRODUCE;
86                              continue;
87                      }
88                  }
89              }
90  
91              // Execute the task.
92              execute(task);
93          }        
94      }
95  
96      @Override
97      public void dispatch()
98      {
99          execute();
100     }
101 
102     public static class Factory implements ExecutionStrategy.Factory
103     {
104         @Override
105         public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
106         {
107             return new ProduceExecuteConsume(producer, executor);
108         }
109     }
110 
111     private enum State
112     {
113         IDLE, PRODUCE, EXECUTE
114     }
115 }