KetchLeader.java
/*
* Copyright (C) 2016, Google Inc. and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Distribution License v. 1.0 which is available at
* https://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*/
package org.eclipse.jgit.internal.ketch;
import static org.eclipse.jgit.internal.ketch.KetchLeader.State.CANDIDATE;
import static org.eclipse.jgit.internal.ketch.KetchLeader.State.LEADER;
import static org.eclipse.jgit.internal.ketch.KetchLeader.State.SHUTDOWN;
import static org.eclipse.jgit.internal.ketch.KetchReplica.Participation.FOLLOWER_ONLY;
import static org.eclipse.jgit.internal.ketch.Proposal.State.QUEUED;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jgit.internal.storage.reftree.RefTree;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevWalk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A leader managing consensus across remote followers.
* <p>
* A leader instance starts up in
* {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#CANDIDATE} and tries
* to begin a new term by sending an
* {@link org.eclipse.jgit.internal.ketch.ElectionRound} to all replicas. Its
* term starts if a majority of replicas have accepted this leader instance for
* the term.
* <p>
* Once elected by a majority the instance enters
* {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#LEADER} and runs
* proposals offered to {@link #queueProposal(Proposal)}. This continues until
* the leader is timed out for inactivity, or is deposed by a competing leader
* gaining its own majority.
* <p>
* Once timed out or deposed this {@code KetchLeader} instance should be
* discarded, and a new instance takes over.
* <p>
* Each leader instance coordinates a group of
* {@link org.eclipse.jgit.internal.ketch.KetchReplica}s. Replica instances are
* owned by the leader instance and must be discarded when the leader is
* discarded.
* <p>
* In Ketch all push requests are issued through the leader. The steps are as
* follows (see {@link org.eclipse.jgit.internal.ketch.KetchPreReceive} for an
* example):
* <ul>
* <li>Create a {@link org.eclipse.jgit.internal.ketch.Proposal} with the
* {@link org.eclipse.jgit.transport.ReceiveCommand}s that represent the push.
* <li>Invoke {@link #queueProposal(Proposal)} on the leader instance.
* <li>Wait for consensus with
* {@link org.eclipse.jgit.internal.ketch.Proposal#await()}.
* <li>To examine the status of the push, check
* {@link org.eclipse.jgit.internal.ketch.Proposal#getCommands()}, looking at
* {@link org.eclipse.jgit.internal.storage.reftree.Command#getResult()}.
* </ul>
* <p>
* The leader gains consensus by first pushing the needed objects and a
* {@link org.eclipse.jgit.internal.storage.reftree.RefTree} representing the
* desired target repository state to the {@code refs/txn/accepted} branch on
* each of the replicas. Once a majority has succeeded, the leader commits the
* state by either pushing the {@code refs/txn/accepted} value to
* {@code refs/txn/committed} (for Ketch-aware replicas) or by pushing updates
* to {@code refs/heads/master}, etc. for stock Git replicas.
* <p>
* Internally, the actual transport to replicas is performed on background
* threads via the {@link org.eclipse.jgit.internal.ketch.KetchSystem}'s
* executor service. For performance, the
* {@link org.eclipse.jgit.internal.ketch.KetchLeader},
* {@link org.eclipse.jgit.internal.ketch.KetchReplica} and
* {@link org.eclipse.jgit.internal.ketch.Proposal} objects share some state,
* and may invoke each other's methods on different threads. This access is
* protected by the leader's {@link #lock} object. Care must be taken to prevent
* concurrent access by correctly obtaining the leader's lock.
*/
public abstract class KetchLeader {
private static final Logger log = LoggerFactory.getLogger(KetchLeader.class);
/** Current state of the leader instance. */
public enum State {
/** Newly created instance trying to elect itself leader. */
CANDIDATE,
/** Leader instance elected by a majority. */
LEADER,
/** Instance has been deposed by another with a more recent term. */
DEPOSED,
/** Leader has been gracefully shutdown, e.g. due to inactivity. */
SHUTDOWN;
}
private final KetchSystem system;
/** Leader's knowledge of replicas for this repository. */
private KetchReplica[] voters;
private KetchReplica[] followers;
private LocalReplica self;
/**
* Lock protecting all data within this leader instance.
* <p>
* This lock extends into the {@link KetchReplica} instances used by the
* leader. They share the same lock instance to simplify concurrency.
*/
final Lock lock;
private State state = CANDIDATE;
/** Term of this leader, once elected. */
private long term;
/**
* Pending proposals accepted into the queue in FIFO order.
* <p>
* These proposals were preflighted and do not contain any conflicts with
* each other and their expectations matched the leader's local view of the
* agreed upon {@code refs/txn/accepted} tree.
*/
private final List<Proposal> queued;
/**
* State of the repository's RefTree after applying all entries in
* {@link #queued}. New proposals must be consistent with this tree to be
* appended to the end of {@link #queued}.
* <p>
* Must be deep-copied with {@link RefTree#copy()} if
* {@link #roundHoldsReferenceToRefTree} is {@code true}.
*/
private RefTree refTree;
/**
* If {@code true} {@link #refTree} must be duplicated before queuing the
* next proposal. The {@link #refTree} was passed into the constructor of a
* {@link ProposalRound}, and that external reference to the {@link RefTree}
* object is held by the proposal until it materializes the tree object in
* the object store. This field is set {@code true} when the proposal begins
* execution and set {@code false} once tree objects are persisted in the
* local repository's object store or {@link #refTree} is replaced with a
* copy to isolate it from any running rounds.
* <p>
* If proposals arrive less frequently than the {@code RefTree} is written
* out to the repository the {@link #roundHoldsReferenceToRefTree} behavior
* avoids duplicating {@link #refTree}, reducing both time and memory used.
* However if proposals arrive more frequently {@link #refTree} must be
* duplicated to prevent newly queued proposals from corrupting the
* {@link #runningRound}.
*/
volatile boolean roundHoldsReferenceToRefTree;
/** End of the leader's log. */
private LogIndex headIndex;
/** Leader knows this (and all prior) states are committed. */
private LogIndex committedIndex;
/**
* Is the leader idle with no work pending? If {@code true} there is no work
* for the leader (normal state). This field is {@code false} when the
* leader thread is scheduled for execution, or while {@link #runningRound}
* defines a round in progress.
*/
private boolean idle;
/** Current round the leader is preparing and waiting for a vote on. */
private Round runningRound;
/**
* Construct a leader for a Ketch instance.
*
* @param system
* Ketch system configuration the leader must adhere to.
*/
protected KetchLeader(KetchSystem system) {
this.system = system;
this.lock = new ReentrantLock(true /* fair */);
this.queued = new ArrayList<>(4);
this.idle = true;
}
/** @return system configuration. */
KetchSystem getSystem() {
return system;
}
/**
* Configure the replicas used by this Ketch instance.
* <p>
* Replicas should be configured once at creation before any proposals are
* executed. Once elections happen, <b>reconfiguration is a complicated
* concept that is not currently supported</b>.
*
* @param replicas
* members participating with the same repository.
*/
public void setReplicas(Collection<KetchReplica> replicas) {
List<KetchReplica> v = new ArrayList<>(5);
List<KetchReplica> f = new ArrayList<>(5);
for (KetchReplica r : replicas) {
switch (r.getParticipation()) {
case FULL:
v.add(r);
break;
case FOLLOWER_ONLY:
f.add(r);
break;
}
}
Collection<Integer> validVoters = validVoterCounts();
if (!validVoters.contains(Integer.valueOf(v.size()))) {
throw new IllegalArgumentException(MessageFormat.format(
KetchText.get().unsupportedVoterCount,
Integer.valueOf(v.size()),
validVoters));
}
LocalReplica me = findLocal(v);
if (me == null) {
throw new IllegalArgumentException(
KetchText.get().localReplicaRequired);
}
lock.lock();
try {
voters = v.toArray(new KetchReplica[0]);
followers = f.toArray(new KetchReplica[0]);
self = me;
} finally {
lock.unlock();
}
}
private static Collection<Integer> validVoterCounts() {
@SuppressWarnings("boxing")
Integer[] valid = {
// An odd number of voting replicas is required.
1, 3, 5, 7, 9 };
return Arrays.asList(valid);
}
private static LocalReplica findLocal(Collection<KetchReplica> voters) {
for (KetchReplica r : voters) {
if (r instanceof LocalReplica) {
return (LocalReplica) r;
}
}
return null;
}
/**
* Get an instance of the repository for use by a leader thread.
* <p>
* The caller will close the repository.
*
* @return opened repository for use by the leader thread.
* @throws java.io.IOException
* cannot reopen the repository for the leader.
*/
protected abstract Repository openRepository() throws IOException;
/**
* Queue a reference update proposal for consensus.
* <p>
* This method does not wait for consensus to be reached. The proposal is
* checked to look for risks of conflicts, and then submitted into the queue
* for distribution as soon as possible.
* <p>
* Callers must use {@link org.eclipse.jgit.internal.ketch.Proposal#await()}
* to see if the proposal is done.
*
* @param proposal
* the proposed reference updates to queue for consideration.
* Once execution is complete the individual reference result
* fields will be populated with the outcome.
* @throws java.lang.InterruptedException
* current thread was interrupted. The proposal may have been
* aborted if it was not yet queued for execution.
* @throws java.io.IOException
* unrecoverable error preventing proposals from being attempted
* by this leader.
*/
public void queueProposal(Proposal proposal)
throws InterruptedException, IOException {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
proposal.abort();
throw e;
}
try {
if (refTree == null) {
initialize();
for (Proposal p : queued) {
refTree.apply(p.getCommands());
}
} else if (roundHoldsReferenceToRefTree) {
refTree = refTree.copy();
roundHoldsReferenceToRefTree = false;
}
if (!refTree.apply(proposal.getCommands())) {
// A conflict exists so abort the proposal.
proposal.abort();
return;
}
queued.add(proposal);
proposal.notifyState(QUEUED);
if (idle) {
scheduleLeader();
}
} finally {
lock.unlock();
}
}
private void initialize() throws IOException {
try (Repository git = openRepository(); RevWalk rw = new RevWalk(git)) {
self.initialize(git);
ObjectId accepted = self.getTxnAccepted();
if (!ObjectId.zeroId().equals(accepted)) {
RevCommit c = rw.parseCommit(accepted);
headIndex = LogIndex.unknown(accepted);
refTree = RefTree.read(rw.getObjectReader(), c.getTree());
} else {
headIndex = LogIndex.unknown(ObjectId.zeroId());
refTree = RefTree.newEmptyTree();
}
}
}
private void scheduleLeader() {
idle = false;
system.getExecutor().execute(this::runLeader);
}
private void runLeader() {
Round round;
lock.lock();
try {
switch (state) {
case CANDIDATE:
round = new ElectionRound(this, headIndex);
break;
case LEADER:
round = newProposalRound();
break;
case DEPOSED:
case SHUTDOWN:
default:
log.warn("Leader cannot run {}", state); //$NON-NLS-1$
// TODO(sop): Redirect proposals.
return;
}
} finally {
lock.unlock();
}
try {
round.start();
} catch (IOException e) {
// TODO(sop) Depose leader if it cannot use its repository.
log.error(KetchText.get().leaderFailedToStore, e);
lock.lock();
try {
nextRound();
} finally {
lock.unlock();
}
}
}
private ProposalRound newProposalRound() {
List<Proposal> todo = new ArrayList<>(queued);
queued.clear();
roundHoldsReferenceToRefTree = true;
return new ProposalRound(this, headIndex, todo, refTree);
}
/** @return term of this leader's reign. */
long getTerm() {
return term;
}
/** @return end of the leader's log. */
LogIndex getHead() {
return headIndex;
}
/**
* @return state leader knows it has committed across a quorum of replicas.
*/
LogIndex getCommitted() {
return committedIndex;
}
boolean isIdle() {
return idle;
}
void runAsync(Round round) {
lock.lock();
try {
// End of the log is this round. Once transport begins it is
// reasonable to assume at least one replica will eventually get
// this, and there is reasonable probability it commits.
headIndex = round.acceptedNewIndex;
runningRound = round;
for (KetchReplica replica : voters) {
replica.pushTxnAcceptedAsync(round);
}
for (KetchReplica replica : followers) {
replica.pushTxnAcceptedAsync(round);
}
} finally {
lock.unlock();
}
}
/**
* Asynchronous signal from a replica after completion.
* <p>
* Must be called while {@link #lock} is held by the replica.
*
* @param replica
* replica posting a completion event.
*/
void onReplicaUpdate(KetchReplica replica) {
if (log.isDebugEnabled()) {
log.debug("Replica {} finished:\n{}", //$NON-NLS-1$
replica.describeForLog(), snapshot());
}
if (replica.getParticipation() == FOLLOWER_ONLY) {
// Followers cannot vote, so votes haven't changed.
return;
} else if (runningRound == null) {
// No round running, no need to tally votes.
return;
}
assert headIndex.equals(runningRound.acceptedNewIndex);
int matching = 0;
for (KetchReplica r : voters) {
if (r.hasAccepted(headIndex)) {
matching++;
}
}
int quorum = voters.length / 2 + 1;
boolean success = matching >= quorum;
if (!success) {
return;
}
switch (state) {
case CANDIDATE:
term = ((ElectionRound) runningRound).getTerm();
state = LEADER;
if (log.isDebugEnabled()) {
log.debug("Won election, running term " + term); //$NON-NLS-1$
}
//$FALL-THROUGH$
case LEADER:
committedIndex = headIndex;
if (log.isDebugEnabled()) {
log.debug("Committed {} in term {}", //$NON-NLS-1$
committedIndex.describeForLog(),
Long.valueOf(term));
}
nextRound();
commitAsync(replica);
notifySuccess(runningRound);
if (log.isDebugEnabled()) {
log.debug("Leader state:\n{}", snapshot()); //$NON-NLS-1$
}
break;
default:
log.debug("Leader ignoring replica while in {}", state); //$NON-NLS-1$
break;
}
}
private void notifySuccess(Round round) {
// Drop the leader lock while notifying Proposal listeners.
lock.unlock();
try {
round.success();
} finally {
lock.lock();
}
}
private void commitAsync(KetchReplica caller) {
for (KetchReplica r : voters) {
if (r == caller) {
continue;
}
if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
r.pushCommitAsync(committedIndex);
}
}
for (KetchReplica r : followers) {
if (r == caller) {
continue;
}
if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
r.pushCommitAsync(committedIndex);
}
}
}
/** Schedule the next round; invoked while {@link #lock} is held. */
void nextRound() {
runningRound = null;
if (queued.isEmpty()) {
idle = true;
} else {
// Caller holds lock. Reschedule leader on a new thread so
// the call stack can unwind and lock is not held unexpectedly
// during prepare for the next round.
scheduleLeader();
}
}
/**
* Snapshot this leader
*
* @return snapshot of this leader
*/
public LeaderSnapshot snapshot() {
lock.lock();
try {
LeaderSnapshot s = new LeaderSnapshot();
s.state = state;
s.term = term;
s.headIndex = headIndex;
s.committedIndex = committedIndex;
s.idle = isIdle();
for (KetchReplica r : voters) {
s.replicas.add(r.snapshot());
}
for (KetchReplica r : followers) {
s.replicas.add(r.snapshot());
}
return s;
} finally {
lock.unlock();
}
}
/**
* Gracefully shutdown this leader and cancel outstanding operations.
*/
public void shutdown() {
lock.lock();
try {
if (state != SHUTDOWN) {
state = SHUTDOWN;
for (KetchReplica r : voters) {
r.shutdown();
}
for (KetchReplica r : followers) {
r.shutdown();
}
}
} finally {
lock.unlock();
}
}
/** {@inheritDoc} */
@Override
public String toString() {
return snapshot().toString();
}
}