KetchLeader.java

/*
 * Copyright (C) 2016, Google Inc.
 * and other copyright owners as documented in the project's IP log.
 *
 * This program and the accompanying materials are made available
 * under the terms of the Eclipse Distribution License v1.0 which
 * accompanies this distribution, is reproduced below, and is
 * available at http://www.eclipse.org/org/documents/edl-v10.php
 *
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or
 * without modification, are permitted provided that the following
 * conditions are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Redistributions in binary form must reproduce the above
 *   copyright notice, this list of conditions and the following
 *   disclaimer in the documentation and/or other materials provided
 *   with the distribution.
 *
 * - Neither the name of the Eclipse Foundation, Inc. nor the
 *   names of its contributors may be used to endorse or promote
 *   products derived from this software without specific prior
 *   written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

package org.eclipse.jgit.internal.ketch;

import static org.eclipse.jgit.internal.ketch.KetchLeader.State.CANDIDATE;
import static org.eclipse.jgit.internal.ketch.KetchLeader.State.LEADER;
import static org.eclipse.jgit.internal.ketch.KetchLeader.State.SHUTDOWN;
import static org.eclipse.jgit.internal.ketch.KetchReplica.Participation.FOLLOWER_ONLY;
import static org.eclipse.jgit.internal.ketch.Proposal.State.QUEUED;

import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.eclipse.jgit.internal.storage.reftree.RefTree;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevWalk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A leader managing consensus across remote followers.
 * <p>
 * A leader instance starts up in
 * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#CANDIDATE} and tries
 * to begin a new term by sending an
 * {@link org.eclipse.jgit.internal.ketch.ElectionRound} to all replicas. Its
 * term starts if a majority of replicas have accepted this leader instance for
 * the term.
 * <p>
 * Once elected by a majority the instance enters
 * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#LEADER} and runs
 * proposals offered to {@link #queueProposal(Proposal)}. This continues until
 * the leader is timed out for inactivity, or is deposed by a competing leader
 * gaining its own majority.
 * <p>
 * Once timed out or deposed this {@code KetchLeader} instance should be
 * discarded, and a new instance takes over.
 * <p>
 * Each leader instance coordinates a group of
 * {@link org.eclipse.jgit.internal.ketch.KetchReplica}s. Replica instances are
 * owned by the leader instance and must be discarded when the leader is
 * discarded.
 * <p>
 * In Ketch all push requests are issued through the leader. The steps are as
 * follows (see {@link org.eclipse.jgit.internal.ketch.KetchPreReceive} for an
 * example):
 * <ul>
 * <li>Create a {@link org.eclipse.jgit.internal.ketch.Proposal} with the
 * {@link org.eclipse.jgit.transport.ReceiveCommand}s that represent the push.
 * <li>Invoke {@link #queueProposal(Proposal)} on the leader instance.
 * <li>Wait for consensus with
 * {@link org.eclipse.jgit.internal.ketch.Proposal#await()}.
 * <li>To examine the status of the push, check
 * {@link org.eclipse.jgit.internal.ketch.Proposal#getCommands()}, looking at
 * {@link org.eclipse.jgit.internal.storage.reftree.Command#getResult()}.
 * </ul>
 * <p>
 * The leader gains consensus by first pushing the needed objects and a
 * {@link org.eclipse.jgit.internal.storage.reftree.RefTree} representing the
 * desired target repository state to the {@code refs/txn/accepted} branch on
 * each of the replicas. Once a majority has succeeded, the leader commits the
 * state by either pushing the {@code refs/txn/accepted} value to
 * {@code refs/txn/committed} (for Ketch-aware replicas) or by pushing updates
 * to {@code refs/heads/master}, etc. for stock Git replicas.
 * <p>
 * Internally, the actual transport to replicas is performed on background
 * threads via the {@link org.eclipse.jgit.internal.ketch.KetchSystem}'s
 * executor service. For performance, the
 * {@link org.eclipse.jgit.internal.ketch.KetchLeader},
 * {@link org.eclipse.jgit.internal.ketch.KetchReplica} and
 * {@link org.eclipse.jgit.internal.ketch.Proposal} objects share some state,
 * and may invoke each other's methods on different threads. This access is
 * protected by the leader's {@link #lock} object. Care must be taken to prevent
 * concurrent access by correctly obtaining the leader's lock.
 */
public abstract class KetchLeader {
	private static final Logger log = LoggerFactory.getLogger(KetchLeader.class);

	/** Current state of the leader instance. */
	public static enum State {
		/** Newly created instance trying to elect itself leader. */
		CANDIDATE,

		/** Leader instance elected by a majority. */
		LEADER,

		/** Instance has been deposed by another with a more recent term. */
		DEPOSED,

		/** Leader has been gracefully shutdown, e.g. due to inactivity. */
		SHUTDOWN;
	}

	private final KetchSystem system;

	/** Leader's knowledge of replicas for this repository. */
	private KetchReplica[] voters;
	private KetchReplica[] followers;
	private LocalReplica self;

	/**
	 * Lock protecting all data within this leader instance.
	 * <p>
	 * This lock extends into the {@link KetchReplica} instances used by the
	 * leader. They share the same lock instance to simplify concurrency.
	 */
	final Lock lock;

	private State state = CANDIDATE;

	/** Term of this leader, once elected. */
	private long term;

	/**
	 * Pending proposals accepted into the queue in FIFO order.
	 * <p>
	 * These proposals were preflighted and do not contain any conflicts with
	 * each other and their expectations matched the leader's local view of the
	 * agreed upon {@code refs/txn/accepted} tree.
	 */
	private final List<Proposal> queued;

	/**
	 * State of the repository's RefTree after applying all entries in
	 * {@link #queued}. New proposals must be consistent with this tree to be
	 * appended to the end of {@link #queued}.
	 * <p>
	 * Must be deep-copied with {@link RefTree#copy()} if
	 * {@link #roundHoldsReferenceToRefTree} is {@code true}.
	 */
	private RefTree refTree;

	/**
	 * If {@code true} {@link #refTree} must be duplicated before queuing the
	 * next proposal. The {@link #refTree} was passed into the constructor of a
	 * {@link ProposalRound}, and that external reference to the {@link RefTree}
	 * object is held by the proposal until it materializes the tree object in
	 * the object store. This field is set {@code true} when the proposal begins
	 * execution and set {@code false} once tree objects are persisted in the
	 * local repository's object store or {@link #refTree} is replaced with a
	 * copy to isolate it from any running rounds.
	 * <p>
	 * If proposals arrive less frequently than the {@code RefTree} is written
	 * out to the repository the {@link #roundHoldsReferenceToRefTree} behavior
	 * avoids duplicating {@link #refTree}, reducing both time and memory used.
	 * However if proposals arrive more frequently {@link #refTree} must be
	 * duplicated to prevent newly queued proposals from corrupting the
	 * {@link #runningRound}.
	 */
	volatile boolean roundHoldsReferenceToRefTree;

	/** End of the leader's log. */
	private LogIndex headIndex;

	/** Leader knows this (and all prior) states are committed. */
	private LogIndex committedIndex;

	/**
	 * Is the leader idle with no work pending? If {@code true} there is no work
	 * for the leader (normal state). This field is {@code false} when the
	 * leader thread is scheduled for execution, or while {@link #runningRound}
	 * defines a round in progress.
	 */
	private boolean idle;

	/** Current round the leader is preparing and waiting for a vote on. */
	private Round runningRound;

	/**
	 * Construct a leader for a Ketch instance.
	 *
	 * @param system
	 *            Ketch system configuration the leader must adhere to.
	 */
	protected KetchLeader(KetchSystem system) {
		this.system = system;
		this.lock = new ReentrantLock(true /* fair */);
		this.queued = new ArrayList<>(4);
		this.idle = true;
	}

	/** @return system configuration. */
	KetchSystem getSystem() {
		return system;
	}

	/**
	 * Configure the replicas used by this Ketch instance.
	 * <p>
	 * Replicas should be configured once at creation before any proposals are
	 * executed. Once elections happen, <b>reconfiguration is a complicated
	 * concept that is not currently supported</b>.
	 *
	 * @param replicas
	 *            members participating with the same repository.
	 */
	public void setReplicas(Collection<KetchReplica> replicas) {
		List<KetchReplica> v = new ArrayList<>(5);
		List<KetchReplica> f = new ArrayList<>(5);
		for (KetchReplica r : replicas) {
			switch (r.getParticipation()) {
			case FULL:
				v.add(r);
				break;

			case FOLLOWER_ONLY:
				f.add(r);
				break;
			}
		}

		Collection<Integer> validVoters = validVoterCounts();
		if (!validVoters.contains(Integer.valueOf(v.size()))) {
			throw new IllegalArgumentException(MessageFormat.format(
					KetchText.get().unsupportedVoterCount,
					Integer.valueOf(v.size()),
					validVoters));
		}

		LocalReplica me = findLocal(v);
		if (me == null) {
			throw new IllegalArgumentException(
					KetchText.get().localReplicaRequired);
		}

		lock.lock();
		try {
			voters = v.toArray(new KetchReplica[0]);
			followers = f.toArray(new KetchReplica[0]);
			self = me;
		} finally {
			lock.unlock();
		}
	}

	private static Collection<Integer> validVoterCounts() {
		@SuppressWarnings("boxing")
		Integer[] valid = {
				// An odd number of voting replicas is required.
				1, 3, 5, 7, 9 };
		return Arrays.asList(valid);
	}

	private static LocalReplica findLocal(Collection<KetchReplica> voters) {
		for (KetchReplica r : voters) {
			if (r instanceof LocalReplica) {
				return (LocalReplica) r;
			}
		}
		return null;
	}

	/**
	 * Get an instance of the repository for use by a leader thread.
	 * <p>
	 * The caller will close the repository.
	 *
	 * @return opened repository for use by the leader thread.
	 * @throws java.io.IOException
	 *             cannot reopen the repository for the leader.
	 */
	protected abstract Repository openRepository() throws IOException;

	/**
	 * Queue a reference update proposal for consensus.
	 * <p>
	 * This method does not wait for consensus to be reached. The proposal is
	 * checked to look for risks of conflicts, and then submitted into the queue
	 * for distribution as soon as possible.
	 * <p>
	 * Callers must use {@link org.eclipse.jgit.internal.ketch.Proposal#await()}
	 * to see if the proposal is done.
	 *
	 * @param proposal
	 *            the proposed reference updates to queue for consideration.
	 *            Once execution is complete the individual reference result
	 *            fields will be populated with the outcome.
	 * @throws java.lang.InterruptedException
	 *             current thread was interrupted. The proposal may have been
	 *             aborted if it was not yet queued for execution.
	 * @throws java.io.IOException
	 *             unrecoverable error preventing proposals from being attempted
	 *             by this leader.
	 */
	public void queueProposal(Proposal proposal)
			throws InterruptedException, IOException {
		try {
			lock.lockInterruptibly();
		} catch (InterruptedException e) {
			proposal.abort();
			throw e;
		}
		try {
			if (refTree == null) {
				initialize();
				for (Proposal p : queued) {
					refTree.apply(p.getCommands());
				}
			} else if (roundHoldsReferenceToRefTree) {
				refTree = refTree.copy();
				roundHoldsReferenceToRefTree = false;
			}

			if (!refTree.apply(proposal.getCommands())) {
				// A conflict exists so abort the proposal.
				proposal.abort();
				return;
			}

			queued.add(proposal);
			proposal.notifyState(QUEUED);

			if (idle) {
				scheduleLeader();
			}
		} finally {
			lock.unlock();
		}
	}

	private void initialize() throws IOException {
		try (Repository git = openRepository(); RevWalk rw = new RevWalk(git)) {
			self.initialize(git);

			ObjectId accepted = self.getTxnAccepted();
			if (!ObjectId.zeroId().equals(accepted)) {
				RevCommit c = rw.parseCommit(accepted);
				headIndex = LogIndex.unknown(accepted);
				refTree = RefTree.read(rw.getObjectReader(), c.getTree());
			} else {
				headIndex = LogIndex.unknown(ObjectId.zeroId());
				refTree = RefTree.newEmptyTree();
			}
		}
	}

	private void scheduleLeader() {
		idle = false;
		system.getExecutor().execute(new Runnable() {
			@Override
			public void run() {
				runLeader();
			}
		});
	}

	private void runLeader() {
		Round round;
		lock.lock();
		try {
			switch (state) {
			case CANDIDATE:
				round = new ElectionRound(this, headIndex);
				break;

			case LEADER:
				round = newProposalRound();
				break;

			case DEPOSED:
			case SHUTDOWN:
			default:
				log.warn("Leader cannot run {}", state); //$NON-NLS-1$
				// TODO(sop): Redirect proposals.
				return;
			}
		} finally {
			lock.unlock();
		}

		try {
			round.start();
		} catch (IOException e) {
			// TODO(sop) Depose leader if it cannot use its repository.
			log.error(KetchText.get().leaderFailedToStore, e);
			lock.lock();
			try {
				nextRound();
			} finally {
				lock.unlock();
			}
		}
	}

	private ProposalRound newProposalRound() {
		List<Proposal> todo = new ArrayList<>(queued);
		queued.clear();
		roundHoldsReferenceToRefTree = true;
		return new ProposalRound(this, headIndex, todo, refTree);
	}

	/** @return term of this leader's reign. */
	long getTerm() {
		return term;
	}

	/** @return end of the leader's log. */
	LogIndex getHead() {
		return headIndex;
	}

	/**
	 * @return state leader knows it has committed across a quorum of replicas.
	 */
	LogIndex getCommitted() {
		return committedIndex;
	}

	boolean isIdle() {
		return idle;
	}

	void runAsync(Round round) {
		lock.lock();
		try {
			// End of the log is this round. Once transport begins it is
			// reasonable to assume at least one replica will eventually get
			// this, and there is reasonable probability it commits.
			headIndex = round.acceptedNewIndex;
			runningRound = round;

			for (KetchReplica replica : voters) {
				replica.pushTxnAcceptedAsync(round);
			}
			for (KetchReplica replica : followers) {
				replica.pushTxnAcceptedAsync(round);
			}
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Asynchronous signal from a replica after completion.
	 * <p>
	 * Must be called while {@link #lock} is held by the replica.
	 *
	 * @param replica
	 *            replica posting a completion event.
	 */
	void onReplicaUpdate(KetchReplica replica) {
		if (log.isDebugEnabled()) {
			log.debug("Replica {} finished:\n{}", //$NON-NLS-1$
					replica.describeForLog(), snapshot());
		}

		if (replica.getParticipation() == FOLLOWER_ONLY) {
			// Followers cannot vote, so votes haven't changed.
			return;
		} else if (runningRound == null) {
			// No round running, no need to tally votes.
			return;
		}

		assert headIndex.equals(runningRound.acceptedNewIndex);
		int matching = 0;
		for (KetchReplica r : voters) {
			if (r.hasAccepted(headIndex)) {
				matching++;
			}
		}

		int quorum = voters.length / 2 + 1;
		boolean success = matching >= quorum;
		if (!success) {
			return;
		}

		switch (state) {
		case CANDIDATE:
			term = ((ElectionRound) runningRound).getTerm();
			state = LEADER;
			if (log.isDebugEnabled()) {
				log.debug("Won election, running term " + term); //$NON-NLS-1$
			}

			//$FALL-THROUGH$
		case LEADER:
			committedIndex = headIndex;
			if (log.isDebugEnabled()) {
				log.debug("Committed {} in term {}", //$NON-NLS-1$
						committedIndex.describeForLog(),
						Long.valueOf(term));
			}
			nextRound();
			commitAsync(replica);
			notifySuccess(runningRound);
			if (log.isDebugEnabled()) {
				log.debug("Leader state:\n{}", snapshot()); //$NON-NLS-1$
			}
			break;

		default:
			log.debug("Leader ignoring replica while in {}", state); //$NON-NLS-1$
			break;
		}
	}

	private void notifySuccess(Round round) {
		// Drop the leader lock while notifying Proposal listeners.
		lock.unlock();
		try {
			round.success();
		} finally {
			lock.lock();
		}
	}

	private void commitAsync(KetchReplica caller) {
		for (KetchReplica r : voters) {
			if (r == caller) {
				continue;
			}
			if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
				r.pushCommitAsync(committedIndex);
			}
		}
		for (KetchReplica r : followers) {
			if (r == caller) {
				continue;
			}
			if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
				r.pushCommitAsync(committedIndex);
			}
		}
	}

	/** Schedule the next round; invoked while {@link #lock} is held. */
	void nextRound() {
		runningRound = null;

		if (queued.isEmpty()) {
			idle = true;
		} else {
			// Caller holds lock. Reschedule leader on a new thread so
			// the call stack can unwind and lock is not held unexpectedly
			// during prepare for the next round.
			scheduleLeader();
		}
	}

	/**
	 * Snapshot this leader
	 *
	 * @return snapshot of this leader
	 */
	public LeaderSnapshot snapshot() {
		lock.lock();
		try {
			LeaderSnapshot s = new LeaderSnapshot();
			s.state = state;
			s.term = term;
			s.headIndex = headIndex;
			s.committedIndex = committedIndex;
			s.idle = isIdle();
			for (KetchReplica r : voters) {
				s.replicas.add(r.snapshot());
			}
			for (KetchReplica r : followers) {
				s.replicas.add(r.snapshot());
			}
			return s;
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Gracefully shutdown this leader and cancel outstanding operations.
	 */
	public void shutdown() {
		lock.lock();
		try {
			if (state != SHUTDOWN) {
				state = SHUTDOWN;
				for (KetchReplica r : voters) {
					r.shutdown();
				}
				for (KetchReplica r : followers) {
					r.shutdown();
				}
			}
		} finally {
			lock.unlock();
		}
	}

	/** {@inheritDoc} */
	@Override
	public String toString() {
		return snapshot().toString();
	}
}