1 /*
2 * Copyright (C) 2016, Google Inc.
3 * and other copyright owners as documented in the project's IP log.
4 *
5 * This program and the accompanying materials are made available
6 * under the terms of the Eclipse Distribution License v1.0 which
7 * accompanies this distribution, is reproduced below, and is
8 * available at http://www.eclipse.org/org/documents/edl-v10.php
9 *
10 * All rights reserved.
11 *
12 * Redistribution and use in source and binary forms, with or
13 * without modification, are permitted provided that the following
14 * conditions are met:
15 *
16 * - Redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer.
18 *
19 * - Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials provided
22 * with the distribution.
23 *
24 * - Neither the name of the Eclipse Foundation, Inc. nor the
25 * names of its contributors may be used to endorse or promote
26 * products derived from this software without specific prior
27 * written permission.
28 *
29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42 */
43
44 package org.eclipse.jgit.internal.ketch;
45
46 import static org.eclipse.jgit.internal.ketch.Proposal.State.ABORTED;
47 import static org.eclipse.jgit.internal.ketch.Proposal.State.EXECUTED;
48 import static org.eclipse.jgit.internal.ketch.Proposal.State.NEW;
49 import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
50 import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
51
52 import java.io.IOException;
53 import java.util.ArrayList;
54 import java.util.Collection;
55 import java.util.Collections;
56 import java.util.List;
57 import java.util.concurrent.CopyOnWriteArrayList;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.atomic.AtomicReference;
60
61 import org.eclipse.jgit.annotations.Nullable;
62 import org.eclipse.jgit.errors.MissingObjectException;
63 import org.eclipse.jgit.internal.storage.reftree.Command;
64 import org.eclipse.jgit.lib.ObjectId;
65 import org.eclipse.jgit.lib.PersonIdent;
66 import org.eclipse.jgit.lib.Ref;
67 import org.eclipse.jgit.revwalk.RevWalk;
68 import org.eclipse.jgit.transport.PushCertificate;
69 import org.eclipse.jgit.transport.ReceiveCommand;
70 import org.eclipse.jgit.util.time.ProposedTimestamp;
71
72 /**
73 * A proposal to be applied in a Ketch system.
74 * <p>
75 * Pushing to a Ketch leader results in the leader making a proposal. The
76 * proposal includes the list of reference updates. The leader attempts to send
77 * the proposal to a quorum of replicas by pushing the proposal to a "staging"
78 * area under the {@code refs/txn/stage/} namespace. If the proposal succeeds
79 * then the changes are durable and the leader can commit the proposal.
80 * <p>
81 * Proposals are executed by {@link KetchLeader#queueProposal(Proposal)}, which
82 * runs them asynchronously in the background. Proposals are thread-safe futures
83 * allowing callers to {@link #await()} for results or be notified by callback
84 * using {@link #addListener(Runnable)}.
85 */
86 public class Proposal {
87 /** Current state of the proposal. */
88 public enum State {
89 /** Proposal has not yet been given to a {@link KetchLeader}. */
90 NEW(false),
91
92 /**
93 * Proposal was validated and has entered the queue, but a round
94 * containing this proposal has not started yet.
95 */
96 QUEUED(false),
97
98 /** Round containing the proposal has begun and is in progress. */
99 RUNNING(false),
100
101 /**
102 * Proposal was executed through a round. Individual results from
103 * {@link Proposal#getCommands()}, {@link Command#getResult()} explain
104 * the success or failure outcome.
105 */
106 EXECUTED(true),
107
108 /** Proposal was aborted and did not reach consensus. */
109 ABORTED(true);
110
111 private final boolean done;
112
113 private State(boolean done) {
114 this.done = done;
115 }
116
117 /** @return true if this is a terminal state. */
118 public boolean isDone() {
119 return done;
120 }
121 }
122
123 private final List<Command> commands;
124 private PersonIdent author;
125 private String message;
126 private PushCertificate pushCert;
127
128 private List<ProposedTimestamp> timestamps;
129 private final List<Runnable> listeners = new CopyOnWriteArrayList<>();
130 private final AtomicReference<State> state = new AtomicReference<>(NEW);
131
132 /**
133 * Create a proposal from a list of Ketch commands.
134 *
135 * @param cmds
136 * prepared list of commands.
137 */
138 public Proposal(List<Command> cmds) {
139 commands = Collections.unmodifiableList(new ArrayList<>(cmds));
140 }
141
142 /**
143 * Create a proposal from a collection of received commands.
144 *
145 * @param rw
146 * walker to assist in preparing commands.
147 * @param cmds
148 * list of pending commands.
149 * @throws MissingObjectException
150 * newId of a command is not found locally.
151 * @throws IOException
152 * local objects cannot be accessed.
153 */
154 public Proposal(RevWalk rw, Collection<ReceiveCommand> cmds)
155 throws MissingObjectException, IOException {
156 commands = asCommandList(rw, cmds);
157 }
158
159 private static List<Command> asCommandList(RevWalk rw,
160 Collection<ReceiveCommand> cmds)
161 throws MissingObjectException, IOException {
162 List<Command> commands = new ArrayList<>(cmds.size());
163 for (ReceiveCommand cmd : cmds) {
164 commands.add(new Command(rw, cmd));
165 }
166 return Collections.unmodifiableList(commands);
167 }
168
169 /** @return commands from this proposal. */
170 public Collection<Command> getCommands() {
171 return commands;
172 }
173
174 /** @return optional author of the proposal. */
175 @Nullable
176 public PersonIdent getAuthor() {
177 return author;
178 }
179
180 /**
181 * Set the author for the proposal.
182 *
183 * @param who
184 * optional identity of the author of the proposal.
185 * @return {@code this}
186 */
187 public Proposal setAuthor(@Nullable PersonIdent who) {
188 author = who;
189 return this;
190 }
191
192 /** @return optional message for the commit log of the RefTree. */
193 @Nullable
194 public String getMessage() {
195 return message;
196 }
197
198 /**
199 * Set the message to appear in the commit log of the RefTree.
200 *
201 * @param msg
202 * message text for the commit.
203 * @return {@code this}
204 */
205 public Proposal setMessage(@Nullable String msg) {
206 message = msg != null && !msg.isEmpty() ? msg : null;
207 return this;
208 }
209
210 /** @return optional certificate signing the references. */
211 @Nullable
212 public PushCertificate getPushCertificate() {
213 return pushCert;
214 }
215
216 /**
217 * Set the push certificate signing the references.
218 *
219 * @param cert
220 * certificate, may be null.
221 * @return {@code this}
222 */
223 public Proposal setPushCertificate(@Nullable PushCertificate cert) {
224 pushCert = cert;
225 return this;
226 }
227
228 /**
229 * @return timestamps that Ketch must block for. These may have been used as
230 * commit times inside the objects involved in the proposal.
231 */
232 public List<ProposedTimestamp> getProposedTimestamps() {
233 if (timestamps != null) {
234 return timestamps;
235 }
236 return Collections.emptyList();
237 }
238
239 /**
240 * Request the proposal to wait for the affected timestamps to resolve.
241 *
242 * @param ts
243 * @return {@code this}.
244 */
245 public Proposal addProposedTimestamp(ProposedTimestamp ts) {
246 if (timestamps == null) {
247 timestamps = new ArrayList<>(4);
248 }
249 timestamps.add(ts);
250 return this;
251 }
252
253 /**
254 * Add a callback to be invoked when the proposal is done.
255 * <p>
256 * A proposal is done when it has entered either {@link State#EXECUTED} or
257 * {@link State#ABORTED} state. If the proposal is already done
258 * {@code callback.run()} is immediately invoked on the caller's thread.
259 *
260 * @param callback
261 * method to run after the proposal is done. The callback may be
262 * run on a Ketch system thread and should be completed quickly.
263 */
264 public void addListener(Runnable callback) {
265 boolean runNow = false;
266 synchronized (state) {
267 if (state.get().isDone()) {
268 runNow = true;
269 } else {
270 listeners.add(callback);
271 }
272 }
273 if (runNow) {
274 callback.run();
275 }
276 }
277
278 /** Set command result as OK. */
279 void success() {
280 for (Command c : commands) {
281 if (c.getResult() == NOT_ATTEMPTED) {
282 c.setResult(OK);
283 }
284 }
285 notifyState(EXECUTED);
286 }
287
288 /** Mark commands as "transaction aborted". */
289 void abort() {
290 Command.abort(commands, null);
291 notifyState(ABORTED);
292 }
293
294 /** @return read the current state of the proposal. */
295 public State getState() {
296 return state.get();
297 }
298
299 /**
300 * @return {@code true} if the proposal was attempted. A true value does not
301 * mean consensus was reached, only that the proposal was considered
302 * and will not be making any more progress beyond its current
303 * state.
304 */
305 public boolean isDone() {
306 return state.get().isDone();
307 }
308
309 /**
310 * Wait for the proposal to be attempted and {@link #isDone()} to be true.
311 *
312 * @throws InterruptedException
313 * caller was interrupted before proposal executed.
314 */
315 public void await() throws InterruptedException {
316 synchronized (state) {
317 while (!state.get().isDone()) {
318 state.wait();
319 }
320 }
321 }
322
323 /**
324 * Wait for the proposal to be attempted and {@link #isDone()} to be true.
325 *
326 * @param wait
327 * how long to wait.
328 * @param unit
329 * unit describing the wait time.
330 * @return true if the proposal is done; false if the method timed out.
331 * @throws InterruptedException
332 * caller was interrupted before proposal executed.
333 */
334 public boolean await(long wait, TimeUnit unit) throws InterruptedException {
335 synchronized (state) {
336 if (state.get().isDone()) {
337 return true;
338 }
339 state.wait(unit.toMillis(wait));
340 return state.get().isDone();
341 }
342 }
343
344 /**
345 * Wait for the proposal to exit a state.
346 *
347 * @param notIn
348 * state the proposal should not be in to return.
349 * @param wait
350 * how long to wait.
351 * @param unit
352 * unit describing the wait time.
353 * @return true if the proposal exited the state; false on time out.
354 * @throws InterruptedException
355 * caller was interrupted before proposal executed.
356 */
357 public boolean awaitStateChange(State notIn, long wait, TimeUnit unit)
358 throws InterruptedException {
359 synchronized (state) {
360 if (state.get() != notIn) {
361 return true;
362 }
363 state.wait(unit.toMillis(wait));
364 return state.get() != notIn;
365 }
366 }
367
368 void notifyState(State s) {
369 synchronized (state) {
370 state.set(s);
371 state.notifyAll();
372 }
373 if (s.isDone()) {
374 for (Runnable callback : listeners) {
375 callback.run();
376 }
377 listeners.clear();
378 }
379 }
380
381 @Override
382 public String toString() {
383 StringBuilder s = new StringBuilder();
384 s.append("Ketch Proposal {\n"); //$NON-NLS-1$
385 s.append(" ").append(state.get()).append('\n'); //$NON-NLS-1$
386 if (author != null) {
387 s.append(" author ").append(author).append('\n'); //$NON-NLS-1$
388 }
389 if (message != null) {
390 s.append(" message ").append(message).append('\n'); //$NON-NLS-1$
391 }
392 for (Command c : commands) {
393 s.append(" "); //$NON-NLS-1$
394 format(s, c.getOldRef(), "CREATE"); //$NON-NLS-1$
395 s.append(' ');
396 format(s, c.getNewRef(), "DELETE"); //$NON-NLS-1$
397 s.append(' ').append(c.getRefName());
398 if (c.getResult() != ReceiveCommand.Result.NOT_ATTEMPTED) {
399 s.append(' ').append(c.getResult()); // $NON-NLS-1$
400 }
401 s.append('\n');
402 }
403 s.append('}');
404 return s.toString();
405 }
406
407 private static void format(StringBuilder s, @Nullable Ref r, String n) {
408 if (r == null) {
409 s.append(n);
410 } else if (r.isSymbolic()) {
411 s.append(r.getTarget().getName());
412 } else {
413 ObjectId id = r.getObjectId();
414 if (id != null) {
415 s.append(id.abbreviate(8).name());
416 }
417 }
418 }
419 }