1 //
2 // ========================================================================
3 // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
8 //
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
11 //
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
14 //
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
17 //
18
19 package org.eclipse.jetty.util.thread.strategy;
20
21 import java.util.concurrent.Executor;
22
23 import org.eclipse.jetty.util.log.Log;
24 import org.eclipse.jetty.util.log.Logger;
25 import org.eclipse.jetty.util.thread.ExecutionStrategy;
26 import org.eclipse.jetty.util.thread.Locker;
27 import org.eclipse.jetty.util.thread.Locker.Lock;
28
29 /**
30 * <p>A strategy where the caller thread iterates over task production, submitting each
31 * task to an {@link Executor} for execution.</p>
32 */
33 public class ProduceExecuteConsume extends ExecutingExecutionStrategy implements ExecutionStrategy
34 {
35 private static final Logger LOG = Log.getLogger(ProduceExecuteConsume.class);
36
37 private final Locker _locker = new Locker();
38 private final Producer _producer;
39 private State _state = State.IDLE;
40
41 public ProduceExecuteConsume(Producer producer, Executor executor)
42 {
43 super(executor);
44 this._producer = producer;
45 }
46
47 @Override
48 public void execute()
49 {
50 try (Lock locked = _locker.lock())
51 {
52 switch(_state)
53 {
54 case IDLE:
55 _state=State.PRODUCE;
56 break;
57
58 case PRODUCE:
59 case EXECUTE:
60 _state=State.EXECUTE;
61 return;
62 }
63 }
64
65 // Produce until we no task is found.
66 while (true)
67 {
68 // Produce a task.
69 Runnable task = _producer.produce();
70 if (LOG.isDebugEnabled())
71 LOG.debug("{} produced {}", _producer, task);
72
73 if (task == null)
74 {
75 try (Lock locked = _locker.lock())
76 {
77 switch(_state)
78 {
79 case IDLE:
80 throw new IllegalStateException();
81 case PRODUCE:
82 _state=State.IDLE;
83 return;
84 case EXECUTE:
85 _state=State.PRODUCE;
86 continue;
87 }
88 }
89 }
90
91 // Execute the task.
92 execute(task);
93 }
94 }
95
96 @Override
97 public void dispatch()
98 {
99 execute();
100 }
101
102 public static class Factory implements ExecutionStrategy.Factory
103 {
104 @Override
105 public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
106 {
107 return new ProduceExecuteConsume(producer, executor);
108 }
109 }
110
111 private enum State
112 {
113 IDLE, PRODUCE, EXECUTE
114 }
115 }