KetchReplica.java
/*
* Copyright (C) 2016, Google Inc. and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Distribution License v. 1.0 which is available at
* https://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*/
package org.eclipse.jgit.internal.ketch;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.BATCHED;
import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.FAST;
import static org.eclipse.jgit.internal.ketch.KetchReplica.State.CURRENT;
import static org.eclipse.jgit.internal.ketch.KetchReplica.State.LAGGING;
import static org.eclipse.jgit.internal.ketch.KetchReplica.State.OFFLINE;
import static org.eclipse.jgit.internal.ketch.KetchReplica.State.UNKNOWN;
import static org.eclipse.jgit.lib.Constants.HEAD;
import static org.eclipse.jgit.lib.FileMode.TYPE_GITLINK;
import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
import static org.eclipse.jgit.transport.ReceiveCommand.Type.CREATE;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import org.eclipse.jgit.annotations.NonNull;
import org.eclipse.jgit.annotations.Nullable;
import org.eclipse.jgit.internal.storage.reftree.RefTree;
import org.eclipse.jgit.lib.AnyObjectId;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.transport.ReceiveCommand;
import org.eclipse.jgit.treewalk.TreeWalk;
import org.eclipse.jgit.util.FileUtils;
import org.eclipse.jgit.util.SystemReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Ketch replica, either {@link org.eclipse.jgit.internal.ketch.LocalReplica}
* or {@link org.eclipse.jgit.internal.ketch.RemoteGitReplica}.
* <p>
* Replicas can be either a stock Git replica, or a Ketch-aware replica.
* <p>
* A stock Git replica has no special knowledge of Ketch and simply stores
* objects and references. Ketch communicates with the stock Git replica using
* the Git push wire protocol. The
* {@link org.eclipse.jgit.internal.ketch.KetchLeader} commits an agreed upon
* state by pushing all references to the Git replica, for example
* {@code "refs/heads/master"} is pushed during commit. Stock Git replicas use
* {@link org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod#ALL_REFS} to
* record the final state.
* <p>
* Ketch-aware replicas understand the {@code RefTree} sent during the proposal
* and during commit are able to update their own reference space to match the
* state represented by the {@code RefTree}. Ketch-aware replicas typically use
* a {@link org.eclipse.jgit.internal.storage.reftree.RefTreeDatabase} and
* {@link org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod#TXN_COMMITTED}
* to record the final state.
* <p>
* KetchReplica instances are tightly coupled with a single
* {@link org.eclipse.jgit.internal.ketch.KetchLeader}. Some state may be
* accessed by the leader thread and uses the leader's own
* {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock} to protect shared
* data.
*/
public abstract class KetchReplica {
static final Logger log = LoggerFactory.getLogger(KetchReplica.class);
private static final byte[] PEEL = { ' ', '^' };
/** Participation of a replica in establishing consensus. */
public enum Participation {
/** Replica can vote. */
FULL,
/** Replica does not vote, but tracks leader. */
FOLLOWER_ONLY;
}
/** How this replica wants to receive Ketch commit operations. */
public enum CommitMethod {
/** All references are pushed to the peer as standard Git. */
ALL_REFS,
/** Only {@code refs/txn/committed} is written/updated. */
TXN_COMMITTED;
}
/** Delay before committing to a replica. */
public enum CommitSpeed {
/**
* Send the commit immediately, even if it could be batched with the
* next proposal.
*/
FAST,
/**
* If the next proposal is available, batch the commit with it,
* otherwise just send the commit. This generates less network use, but
* may provide slower consistency on the replica.
*/
BATCHED;
}
/** Current state of a replica. */
public enum State {
/** Leader has not yet contacted the replica. */
UNKNOWN,
/** Replica is behind the consensus. */
LAGGING,
/** Replica matches the consensus. */
CURRENT,
/** Replica has a different (or unknown) history. */
DIVERGENT,
/** Replica's history contains the leader's history. */
AHEAD,
/** Replica can not be contacted. */
OFFLINE;
}
private final KetchLeader leader;
private final String replicaName;
private final Participation participation;
private final CommitMethod commitMethod;
private final CommitSpeed commitSpeed;
private final long minRetryMillis;
private final long maxRetryMillis;
private final Map<ObjectId, List<ReceiveCommand>> staged;
private final Map<String, ReceiveCommand> running;
private final Map<String, ReceiveCommand> waiting;
private final List<ReplicaPushRequest> queued;
/**
* Value known for {@code "refs/txn/accepted"}.
* <p>
* Raft literature refers to this as {@code matchIndex}.
*/
private ObjectId txnAccepted;
/**
* Value known for {@code "refs/txn/committed"}.
* <p>
* Raft literature refers to this as {@code commitIndex}. In traditional
* Raft this is a state variable inside the follower implementation, but
* Ketch keeps it in the leader.
*/
private ObjectId txnCommitted;
/** What is happening with this replica. */
private State state = UNKNOWN;
private String error;
/** Scheduled retry due to communication failure. */
private Future<?> retryFuture;
private long lastRetryMillis;
private long retryAtMillis;
/**
* Configure a replica representation.
*
* @param leader
* instance this replica follows.
* @param name
* unique-ish name identifying this replica for debugging.
* @param cfg
* how Ketch should treat the replica.
*/
protected KetchReplica(KetchLeader leader, String name, ReplicaConfig cfg) {
this.leader = leader;
this.replicaName = name;
this.participation = cfg.getParticipation();
this.commitMethod = cfg.getCommitMethod();
this.commitSpeed = cfg.getCommitSpeed();
this.minRetryMillis = cfg.getMinRetry(MILLISECONDS);
this.maxRetryMillis = cfg.getMaxRetry(MILLISECONDS);
this.staged = new HashMap<>();
this.running = new HashMap<>();
this.waiting = new HashMap<>();
this.queued = new ArrayList<>(4);
}
/**
* Get system configuration.
*
* @return system configuration.
*/
public KetchSystem getSystem() {
return getLeader().getSystem();
}
/**
* Get leader instance this replica follows.
*
* @return leader instance this replica follows.
*/
public KetchLeader getLeader() {
return leader;
}
/**
* Get unique-ish name for debugging.
*
* @return unique-ish name for debugging.
*/
public String getName() {
return replicaName;
}
/**
* Get description of this replica for error/debug logging purposes.
*
* @return description of this replica for error/debug logging purposes.
*/
protected String describeForLog() {
return getName();
}
/**
* Get how the replica participates in this Ketch system.
*
* @return how the replica participates in this Ketch system.
*/
public Participation getParticipation() {
return participation;
}
/**
* Get how Ketch will commit to the repository.
*
* @return how Ketch will commit to the repository.
*/
public CommitMethod getCommitMethod() {
return commitMethod;
}
/**
* Get when Ketch will commit to the repository.
*
* @return when Ketch will commit to the repository.
*/
public CommitSpeed getCommitSpeed() {
return commitSpeed;
}
/**
* Called by leader to perform graceful shutdown.
* <p>
* Default implementation cancels any scheduled retry. Subclasses may add
* additional logic before or after calling {@code super.shutdown()}.
* <p>
* Called with {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock} held
* by caller.
*/
protected void shutdown() {
Future<?> f = retryFuture;
if (f != null) {
retryFuture = null;
f.cancel(true);
}
}
ReplicaSnapshot snapshot() {
ReplicaSnapshot s = new ReplicaSnapshot(this);
s.accepted = txnAccepted;
s.committed = txnCommitted;
s.state = state;
s.error = error;
s.retryAtMillis = waitingForRetry() ? retryAtMillis : 0;
return s;
}
/**
* Update the leader's view of the replica after a poll.
* <p>
* Called with {@link KetchLeader#lock} held by caller.
*
* @param refs
* map of refs from the replica.
*/
void initialize(Map<String, Ref> refs) {
if (txnAccepted == null) {
txnAccepted = getId(refs.get(getSystem().getTxnAccepted()));
}
if (txnCommitted == null) {
txnCommitted = getId(refs.get(getSystem().getTxnCommitted()));
}
}
ObjectId getTxnAccepted() {
return txnAccepted;
}
boolean hasAccepted(LogIndex id) {
return equals(txnAccepted, id);
}
private static boolean equals(@Nullable ObjectId a, LogIndex b) {
return a != null && b != null && AnyObjectId.isEqual(a, b);
}
/**
* Schedule a proposal round with the replica.
* <p>
* Called with {@link KetchLeader#lock} held by caller.
*
* @param round
* current round being run by the leader.
*/
void pushTxnAcceptedAsync(Round round) {
List<ReceiveCommand> cmds = new ArrayList<>();
if (commitSpeed == BATCHED) {
LogIndex committedIndex = leader.getCommitted();
if (equals(txnAccepted, committedIndex)
&& !equals(txnCommitted, committedIndex)) {
prepareTxnCommitted(cmds, committedIndex);
}
}
// TODO(sop) Lagging replicas should build accept on the fly.
if (round.stageCommands != null) {
for (ReceiveCommand cmd : round.stageCommands) {
// TODO(sop): Do not send certain object graphs to replica.
cmds.add(copy(cmd));
}
}
cmds.add(new ReceiveCommand(
round.acceptedOldIndex, round.acceptedNewIndex,
getSystem().getTxnAccepted()));
pushAsync(new ReplicaPushRequest(this, cmds));
}
private static ReceiveCommand copy(ReceiveCommand c) {
return new ReceiveCommand(c.getOldId(), c.getNewId(), c.getRefName());
}
boolean shouldPushUnbatchedCommit(LogIndex committed, boolean leaderIdle) {
return (leaderIdle || commitSpeed == FAST) && hasAccepted(committed);
}
void pushCommitAsync(LogIndex committed) {
List<ReceiveCommand> cmds = new ArrayList<>();
prepareTxnCommitted(cmds, committed);
pushAsync(new ReplicaPushRequest(this, cmds));
}
private void prepareTxnCommitted(List<ReceiveCommand> cmds,
ObjectId committed) {
removeStaged(cmds, committed);
cmds.add(new ReceiveCommand(
txnCommitted, committed,
getSystem().getTxnCommitted()));
}
private void removeStaged(List<ReceiveCommand> cmds, ObjectId committed) {
List<ReceiveCommand> a = staged.remove(committed);
if (a != null) {
delete(cmds, a);
}
if (staged.isEmpty() || !(committed instanceof LogIndex)) {
return;
}
LogIndex committedIndex = (LogIndex) committed;
Iterator<Map.Entry<ObjectId, List<ReceiveCommand>>> itr = staged
.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<ObjectId, List<ReceiveCommand>> e = itr.next();
if (e.getKey() instanceof LogIndex) {
LogIndex stagedIndex = (LogIndex) e.getKey();
if (stagedIndex.isBefore(committedIndex)) {
delete(cmds, e.getValue());
itr.remove();
}
}
}
}
private static void delete(List<ReceiveCommand> cmds,
List<ReceiveCommand> createCmds) {
for (ReceiveCommand cmd : createCmds) {
ObjectId id = cmd.getNewId();
String name = cmd.getRefName();
cmds.add(new ReceiveCommand(id, ObjectId.zeroId(), name));
}
}
/**
* Determine the next push for this replica (if any) and start it.
* <p>
* If the replica has successfully accepted the committed state of the
* leader, this method will push all references to the replica using the
* configured {@link CommitMethod}.
* <p>
* If the replica is {@link State#LAGGING} this method will begin catch up
* by sending a more recent {@code refs/txn/accepted}.
* <p>
* Must be invoked with {@link KetchLeader#lock} held by caller.
*/
private void runNextPushRequest() {
LogIndex committed = leader.getCommitted();
if (!equals(txnCommitted, committed)
&& shouldPushUnbatchedCommit(committed, leader.isIdle())) {
pushCommitAsync(committed);
}
if (queued.isEmpty() || !running.isEmpty() || waitingForRetry()) {
return;
}
// Collapse all queued requests into a single request.
Map<String, ReceiveCommand> cmdMap = new HashMap<>();
for (ReplicaPushRequest req : queued) {
for (ReceiveCommand cmd : req.getCommands()) {
String name = cmd.getRefName();
ReceiveCommand old = cmdMap.remove(name);
if (old != null) {
cmd = new ReceiveCommand(
old.getOldId(), cmd.getNewId(),
name);
}
cmdMap.put(name, cmd);
}
}
queued.clear();
waiting.clear();
List<ReceiveCommand> next = new ArrayList<>(cmdMap.values());
for (ReceiveCommand cmd : next) {
running.put(cmd.getRefName(), cmd);
}
startPush(new ReplicaPushRequest(this, next));
}
private void pushAsync(ReplicaPushRequest req) {
if (defer(req)) {
// TODO(sop) Collapse during long retry outage.
for (ReceiveCommand cmd : req.getCommands()) {
waiting.put(cmd.getRefName(), cmd);
}
queued.add(req);
} else {
for (ReceiveCommand cmd : req.getCommands()) {
running.put(cmd.getRefName(), cmd);
}
startPush(req);
}
}
private boolean defer(ReplicaPushRequest req) {
if (waitingForRetry()) {
// Prior communication failure; everything is deferred.
return true;
}
for (ReceiveCommand nextCmd : req.getCommands()) {
ReceiveCommand priorCmd = waiting.get(nextCmd.getRefName());
if (priorCmd == null) {
priorCmd = running.get(nextCmd.getRefName());
}
if (priorCmd != null) {
// Another request pending on same ref; that must go first.
// Verify priorCmd.newId == nextCmd.oldId?
return true;
}
}
return false;
}
private boolean waitingForRetry() {
Future<?> f = retryFuture;
return f != null && !f.isDone();
}
private void retryLater(ReplicaPushRequest req) {
Collection<ReceiveCommand> cmds = req.getCommands();
for (ReceiveCommand cmd : cmds) {
cmd.setResult(NOT_ATTEMPTED, null);
if (!waiting.containsKey(cmd.getRefName())) {
waiting.put(cmd.getRefName(), cmd);
}
}
queued.add(0, new ReplicaPushRequest(this, cmds));
if (!waitingForRetry()) {
long delay = FileUtils
.delay(lastRetryMillis, minRetryMillis, maxRetryMillis);
if (log.isDebugEnabled()) {
log.debug("Retrying {} after {} ms", //$NON-NLS-1$
describeForLog(), Long.valueOf(delay));
}
lastRetryMillis = delay;
retryAtMillis = SystemReader.getInstance().getCurrentTime() + delay;
retryFuture = getSystem().getExecutor()
.schedule(new WeakRetryPush(this), delay, MILLISECONDS);
}
}
/** Weakly holds a retrying replica, allowing it to garbage collect. */
static class WeakRetryPush extends WeakReference<KetchReplica>
implements Callable<Void> {
WeakRetryPush(KetchReplica r) {
super(r);
}
@Override
public Void call() throws Exception {
KetchReplica r = get();
if (r != null) {
r.doRetryPush();
}
return null;
}
}
private void doRetryPush() {
leader.lock.lock();
try {
retryFuture = null;
runNextPushRequest();
} finally {
leader.lock.unlock();
}
}
/**
* Begin executing a single push.
* <p>
* This method must move processing onto another thread. Called with
* {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock} held by caller.
*
* @param req
* the request to send to the replica.
*/
protected abstract void startPush(ReplicaPushRequest req);
/**
* Callback from {@link ReplicaPushRequest} upon success or failure.
* <p>
* Acquires the {@link KetchLeader#lock} and updates the leader's internal
* knowledge about this replica to reflect what has been learned during a
* push to the replica. In some cases of divergence this method may take
* some time to determine how the replica has diverged; to reduce contention
* this is evaluated before acquiring the leader lock.
*
* @param repo
* local repository instance used by the push thread.
* @param req
* push request just attempted.
*/
void afterPush(@Nullable Repository repo, ReplicaPushRequest req) {
ReceiveCommand acceptCmd = null;
ReceiveCommand commitCmd = null;
List<ReceiveCommand> stages = null;
for (ReceiveCommand cmd : req.getCommands()) {
String name = cmd.getRefName();
if (name.equals(getSystem().getTxnAccepted())) {
acceptCmd = cmd;
} else if (name.equals(getSystem().getTxnCommitted())) {
commitCmd = cmd;
} else if (cmd.getResult() == OK && cmd.getType() == CREATE
&& name.startsWith(getSystem().getTxnStage())) {
if (stages == null) {
stages = new ArrayList<>();
}
stages.add(cmd);
}
}
State newState = null;
ObjectId acceptId = readId(req, acceptCmd);
if (repo != null && acceptCmd != null && acceptCmd.getResult() != OK
&& req.getException() == null) {
try (LagCheck lag = new LagCheck(this, repo)) {
newState = lag.check(acceptId, acceptCmd);
acceptId = lag.getRemoteId();
}
}
leader.lock.lock();
try {
for (ReceiveCommand cmd : req.getCommands()) {
running.remove(cmd.getRefName());
}
Throwable err = req.getException();
if (err != null) {
state = OFFLINE;
error = err.toString();
retryLater(req);
leader.onReplicaUpdate(this);
return;
}
lastRetryMillis = 0;
error = null;
updateView(req, acceptId, commitCmd);
if (acceptCmd != null && acceptCmd.getResult() == OK) {
state = hasAccepted(leader.getHead()) ? CURRENT : LAGGING;
if (stages != null) {
staged.put(acceptCmd.getNewId(), stages);
}
} else if (newState != null) {
state = newState;
}
leader.onReplicaUpdate(this);
runNextPushRequest();
} finally {
leader.lock.unlock();
}
}
private void updateView(ReplicaPushRequest req, @Nullable ObjectId acceptId,
ReceiveCommand commitCmd) {
if (acceptId != null) {
txnAccepted = acceptId;
}
ObjectId committed = readId(req, commitCmd);
if (committed != null) {
txnCommitted = committed;
} else if (acceptId != null && txnCommitted == null) {
// Initialize during first conversation.
Map<String, Ref> adv = req.getRefs();
if (adv != null) {
Ref refs = adv.get(getSystem().getTxnCommitted());
txnCommitted = getId(refs);
}
}
}
@Nullable
private static ObjectId readId(ReplicaPushRequest req,
@Nullable ReceiveCommand cmd) {
if (cmd == null) {
// Ref was not in the command list, do not trust advertisement.
return null;
} else if (cmd.getResult() == OK) {
// Currently at newId.
return cmd.getNewId();
}
Map<String, Ref> refs = req.getRefs();
return refs != null ? getId(refs.get(cmd.getRefName())) : null;
}
/**
* Fetch objects from the remote using the calling thread.
* <p>
* Called without {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock}.
*
* @param repo
* local repository to fetch objects into.
* @param req
* the request to fetch from a replica.
* @throws java.io.IOException
* communication with the replica was not possible.
*/
protected abstract void blockingFetch(Repository repo,
ReplicaFetchRequest req) throws IOException;
/**
* Build a list of commands to commit
* {@link org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod#ALL_REFS}.
*
* @param git
* local leader repository to read committed state from.
* @param current
* all known references in the replica's repository. Typically
* this comes from a push advertisement.
* @param committed
* state being pushed to {@code refs/txn/committed}.
* @return commands to update during commit.
* @throws java.io.IOException
* cannot read the committed state.
*/
protected Collection<ReceiveCommand> prepareCommit(Repository git,
Map<String, Ref> current, ObjectId committed) throws IOException {
List<ReceiveCommand> delta = new ArrayList<>();
Map<String, Ref> remote = new HashMap<>(current);
try (RevWalk rw = new RevWalk(git);
TreeWalk tw = new TreeWalk(rw.getObjectReader())) {
tw.setRecursive(true);
tw.addTree(rw.parseCommit(committed).getTree());
while (tw.next()) {
if (tw.getRawMode(0) != TYPE_GITLINK
|| tw.isPathSuffix(PEEL, 2)) {
// Symbolic references cannot be pushed.
// Caching peeled values is handled remotely.
continue;
}
// TODO(sop) Do not send certain ref names to replica.
String name = RefTree.refName(tw.getPathString());
Ref oldRef = remote.remove(name);
ObjectId oldId = getId(oldRef);
ObjectId newId = tw.getObjectId(0);
if (!AnyObjectId.isEqual(oldId, newId)) {
delta.add(new ReceiveCommand(oldId, newId, name));
}
}
}
// Delete any extra references not in the committed state.
for (Ref ref : remote.values()) {
if (canDelete(ref)) {
delta.add(new ReceiveCommand(
ref.getObjectId(), ObjectId.zeroId(),
ref.getName()));
}
}
return delta;
}
boolean canDelete(Ref ref) {
String name = ref.getName();
if (HEAD.equals(name)) {
return false;
}
if (name.startsWith(getSystem().getTxnNamespace())) {
return false;
}
// TODO(sop) Do not delete precious names from replica.
return true;
}
@NonNull
static ObjectId getId(@Nullable Ref ref) {
if (ref != null) {
ObjectId id = ref.getObjectId();
if (id != null) {
return id;
}
}
return ObjectId.zeroId();
}
}