Proposal.java

/*
 * Copyright (C) 2016, Google Inc. and others
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Distribution License v. 1.0 which is available at
 * https://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 */

package org.eclipse.jgit.internal.ketch;

import static org.eclipse.jgit.internal.ketch.Proposal.State.ABORTED;
import static org.eclipse.jgit.internal.ketch.Proposal.State.EXECUTED;
import static org.eclipse.jgit.internal.ketch.Proposal.State.NEW;
import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jgit.annotations.Nullable;
import org.eclipse.jgit.errors.MissingObjectException;
import org.eclipse.jgit.internal.storage.reftree.Command;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.PersonIdent;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.transport.PushCertificate;
import org.eclipse.jgit.transport.ReceiveCommand;
import org.eclipse.jgit.util.time.ProposedTimestamp;

/**
 * A proposal to be applied in a Ketch system.
 * <p>
 * Pushing to a Ketch leader results in the leader making a proposal. The
 * proposal includes the list of reference updates. The leader attempts to send
 * the proposal to a quorum of replicas by pushing the proposal to a "staging"
 * area under the {@code refs/txn/stage/} namespace. If the proposal succeeds
 * then the changes are durable and the leader can commit the proposal.
 * <p>
 * Proposals are executed by
 * {@link org.eclipse.jgit.internal.ketch.KetchLeader#queueProposal(Proposal)},
 * which runs them asynchronously in the background. Proposals are thread-safe
 * futures allowing callers to {@link #await()} for results or be notified by
 * callback using {@link #addListener(Runnable)}.
 */
public class Proposal {
	/** Current state of the proposal. */
	public enum State {
		/** Proposal has not yet been given to a {@link KetchLeader}. */
		NEW(false),

		/**
		 * Proposal was validated and has entered the queue, but a round
		 * containing this proposal has not started yet.
		 */
		QUEUED(false),

		/** Round containing the proposal has begun and is in progress. */
		RUNNING(false),

		/**
		 * Proposal was executed through a round. Individual results from
		 * {@link Proposal#getCommands()}, {@link Command#getResult()} explain
		 * the success or failure outcome.
		 */
		EXECUTED(true),

		/** Proposal was aborted and did not reach consensus. */
		ABORTED(true);

		private final boolean done;

		private State(boolean done) {
			this.done = done;
		}

		/** @return true if this is a terminal state. */
		public boolean isDone() {
			return done;
		}
	}

	private final List<Command> commands;
	private PersonIdent author;
	private String message;
	private PushCertificate pushCert;

	private List<ProposedTimestamp> timestamps;
	private final List<Runnable> listeners = new CopyOnWriteArrayList<>();
	private final AtomicReference<State> state = new AtomicReference<>(NEW);

	/**
	 * Create a proposal from a list of Ketch commands.
	 *
	 * @param cmds
	 *            prepared list of commands.
	 */
	public Proposal(List<Command> cmds) {
		commands = Collections.unmodifiableList(new ArrayList<>(cmds));
	}

	/**
	 * Create a proposal from a collection of received commands.
	 *
	 * @param rw
	 *            walker to assist in preparing commands.
	 * @param cmds
	 *            list of pending commands.
	 * @throws org.eclipse.jgit.errors.MissingObjectException
	 *             newId of a command is not found locally.
	 * @throws java.io.IOException
	 *             local objects cannot be accessed.
	 */
	public Proposal(RevWalk rw, Collection<ReceiveCommand> cmds)
			throws MissingObjectException, IOException {
		commands = asCommandList(rw, cmds);
	}

	private static List<Command> asCommandList(RevWalk rw,
			Collection<ReceiveCommand> cmds)
					throws MissingObjectException, IOException {
		List<Command> commands = new ArrayList<>(cmds.size());
		for (ReceiveCommand cmd : cmds) {
			commands.add(new Command(rw, cmd));
		}
		return Collections.unmodifiableList(commands);
	}

	/**
	 * Get commands from this proposal.
	 *
	 * @return commands from this proposal.
	 */
	public Collection<Command> getCommands() {
		return commands;
	}

	/**
	 * Get optional author of the proposal.
	 *
	 * @return optional author of the proposal.
	 */
	@Nullable
	public PersonIdent getAuthor() {
		return author;
	}

	/**
	 * Set the author for the proposal.
	 *
	 * @param who
	 *            optional identity of the author of the proposal.
	 * @return {@code this}
	 */
	public Proposal setAuthor(@Nullable PersonIdent who) {
		author = who;
		return this;
	}

	/**
	 * Get optional message for the commit log of the RefTree.
	 *
	 * @return optional message for the commit log of the RefTree.
	 */
	@Nullable
	public String getMessage() {
		return message;
	}

	/**
	 * Set the message to appear in the commit log of the RefTree.
	 *
	 * @param msg
	 *            message text for the commit.
	 * @return {@code this}
	 */
	public Proposal setMessage(@Nullable String msg) {
		message = msg != null && !msg.isEmpty() ? msg : null;
		return this;
	}

	/**
	 * Get optional certificate signing the references.
	 *
	 * @return optional certificate signing the references.
	 */
	@Nullable
	public PushCertificate getPushCertificate() {
		return pushCert;
	}

	/**
	 * Set the push certificate signing the references.
	 *
	 * @param cert
	 *            certificate, may be null.
	 * @return {@code this}
	 */
	public Proposal setPushCertificate(@Nullable PushCertificate cert) {
		pushCert = cert;
		return this;
	}

	/**
	 * Get timestamps that Ketch must block for.
	 *
	 * @return timestamps that Ketch must block for. These may have been used as
	 *         commit times inside the objects involved in the proposal.
	 */
	public List<ProposedTimestamp> getProposedTimestamps() {
		if (timestamps != null) {
			return timestamps;
		}
		return Collections.emptyList();
	}

	/**
	 * Request the proposal to wait for the affected timestamps to resolve.
	 *
	 * @param ts
	 *            a {@link org.eclipse.jgit.util.time.ProposedTimestamp} object.
	 * @return {@code this}.
	 */
	public Proposal addProposedTimestamp(ProposedTimestamp ts) {
		if (timestamps == null) {
			timestamps = new ArrayList<>(4);
		}
		timestamps.add(ts);
		return this;
	}

	/**
	 * Add a callback to be invoked when the proposal is done.
	 * <p>
	 * A proposal is done when it has entered either
	 * {@link org.eclipse.jgit.internal.ketch.Proposal.State#EXECUTED} or
	 * {@link org.eclipse.jgit.internal.ketch.Proposal.State#ABORTED} state. If
	 * the proposal is already done {@code callback.run()} is immediately
	 * invoked on the caller's thread.
	 *
	 * @param callback
	 *            method to run after the proposal is done. The callback may be
	 *            run on a Ketch system thread and should be completed quickly.
	 */
	public void addListener(Runnable callback) {
		boolean runNow = false;
		synchronized (state) {
			if (state.get().isDone()) {
				runNow = true;
			} else {
				listeners.add(callback);
			}
		}
		if (runNow) {
			callback.run();
		}
	}

	/** Set command result as OK. */
	void success() {
		for (Command c : commands) {
			if (c.getResult() == NOT_ATTEMPTED) {
				c.setResult(OK);
			}
		}
		notifyState(EXECUTED);
	}

	/** Mark commands as "transaction aborted". */
	void abort() {
		Command.abort(commands, null);
		notifyState(ABORTED);
	}

	/**
	 * Read the current state of the proposal.
	 *
	 * @return read the current state of the proposal.
	 */
	public State getState() {
		return state.get();
	}

	/**
	 * Whether the proposal was attempted
	 *
	 * @return {@code true} if the proposal was attempted. A true value does not
	 *         mean consensus was reached, only that the proposal was considered
	 *         and will not be making any more progress beyond its current
	 *         state.
	 */
	public boolean isDone() {
		return state.get().isDone();
	}

	/**
	 * Wait for the proposal to be attempted and {@link #isDone()} to be true.
	 *
	 * @throws java.lang.InterruptedException
	 *             caller was interrupted before proposal executed.
	 */
	public void await() throws InterruptedException {
		synchronized (state) {
			while (!state.get().isDone()) {
				state.wait();
			}
		}
	}

	/**
	 * Wait for the proposal to be attempted and {@link #isDone()} to be true.
	 *
	 * @param wait
	 *            how long to wait.
	 * @param unit
	 *            unit describing the wait time.
	 * @return true if the proposal is done; false if the method timed out.
	 * @throws java.lang.InterruptedException
	 *             caller was interrupted before proposal executed.
	 */
	public boolean await(long wait, TimeUnit unit) throws InterruptedException {
		synchronized (state) {
			if (state.get().isDone()) {
				return true;
			}
			state.wait(unit.toMillis(wait));
			return state.get().isDone();
		}
	}

	/**
	 * Wait for the proposal to exit a state.
	 *
	 * @param notIn
	 *            state the proposal should not be in to return.
	 * @param wait
	 *            how long to wait.
	 * @param unit
	 *            unit describing the wait time.
	 * @return true if the proposal exited the state; false on time out.
	 * @throws java.lang.InterruptedException
	 *             caller was interrupted before proposal executed.
	 */
	public boolean awaitStateChange(State notIn, long wait, TimeUnit unit)
			throws InterruptedException {
		synchronized (state) {
			if (state.get() != notIn) {
				return true;
			}
			state.wait(unit.toMillis(wait));
			return state.get() != notIn;
		}
	}

	void notifyState(State s) {
		synchronized (state) {
			state.set(s);
			state.notifyAll();
		}
		if (s.isDone()) {
			for (Runnable callback : listeners) {
				callback.run();
			}
			listeners.clear();
		}
	}

	/** {@inheritDoc} */
	@Override
	public String toString() {
		StringBuilder s = new StringBuilder();
		s.append("Ketch Proposal {\n"); //$NON-NLS-1$
		s.append("  ").append(state.get()).append('\n'); //$NON-NLS-1$
		if (author != null) {
			s.append("  author ").append(author).append('\n'); //$NON-NLS-1$
		}
		if (message != null) {
			s.append("  message ").append(message).append('\n'); //$NON-NLS-1$
		}
		for (Command c : commands) {
			s.append("  "); //$NON-NLS-1$
			format(s, c.getOldRef(), "CREATE"); //$NON-NLS-1$
			s.append(' ');
			format(s, c.getNewRef(), "DELETE"); //$NON-NLS-1$
			s.append(' ').append(c.getRefName());
			if (c.getResult() != ReceiveCommand.Result.NOT_ATTEMPTED) {
				s.append(' ').append(c.getResult()); // $NON-NLS-1$
			}
			s.append('\n');
		}
		s.append('}');
		return s.toString();
	}

	private static void format(StringBuilder s, @Nullable Ref r, String n) {
		if (r == null) {
			s.append(n);
		} else if (r.isSymbolic()) {
			s.append(r.getTarget().getName());
		} else {
			ObjectId id = r.getObjectId();
			if (id != null) {
				s.append(id.abbreviate(8).name());
			}
		}
	}
}