KetchReplica.java
- /*
- * Copyright (C) 2016, Google Inc. and others
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Distribution License v. 1.0 which is available at
- * https://www.eclipse.org/org/documents/edl-v10.php.
- *
- * SPDX-License-Identifier: BSD-3-Clause
- */
- package org.eclipse.jgit.internal.ketch;
- import static java.util.concurrent.TimeUnit.MILLISECONDS;
- import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.BATCHED;
- import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.FAST;
- import static org.eclipse.jgit.internal.ketch.KetchReplica.State.CURRENT;
- import static org.eclipse.jgit.internal.ketch.KetchReplica.State.LAGGING;
- import static org.eclipse.jgit.internal.ketch.KetchReplica.State.OFFLINE;
- import static org.eclipse.jgit.internal.ketch.KetchReplica.State.UNKNOWN;
- import static org.eclipse.jgit.lib.Constants.HEAD;
- import static org.eclipse.jgit.lib.FileMode.TYPE_GITLINK;
- import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
- import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
- import static org.eclipse.jgit.transport.ReceiveCommand.Type.CREATE;
- import java.io.IOException;
- import java.lang.ref.WeakReference;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.Callable;
- import java.util.concurrent.Future;
- import org.eclipse.jgit.annotations.NonNull;
- import org.eclipse.jgit.annotations.Nullable;
- import org.eclipse.jgit.internal.storage.reftree.RefTree;
- import org.eclipse.jgit.lib.AnyObjectId;
- import org.eclipse.jgit.lib.ObjectId;
- import org.eclipse.jgit.lib.Ref;
- import org.eclipse.jgit.lib.Repository;
- import org.eclipse.jgit.revwalk.RevWalk;
- import org.eclipse.jgit.transport.ReceiveCommand;
- import org.eclipse.jgit.treewalk.TreeWalk;
- import org.eclipse.jgit.util.FileUtils;
- import org.eclipse.jgit.util.SystemReader;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * A Ketch replica, either {@link org.eclipse.jgit.internal.ketch.LocalReplica}
- * or {@link org.eclipse.jgit.internal.ketch.RemoteGitReplica}.
- * <p>
- * Replicas can be either a stock Git replica, or a Ketch-aware replica.
- * <p>
- * A stock Git replica has no special knowledge of Ketch and simply stores
- * objects and references. Ketch communicates with the stock Git replica using
- * the Git push wire protocol. The
- * {@link org.eclipse.jgit.internal.ketch.KetchLeader} commits an agreed upon
- * state by pushing all references to the Git replica, for example
- * {@code "refs/heads/master"} is pushed during commit. Stock Git replicas use
- * {@link org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod#ALL_REFS} to
- * record the final state.
- * <p>
- * Ketch-aware replicas understand the {@code RefTree} sent during the proposal
- * and during commit are able to update their own reference space to match the
- * state represented by the {@code RefTree}. Ketch-aware replicas typically use
- * a {@link org.eclipse.jgit.internal.storage.reftree.RefTreeDatabase} and
- * {@link org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod#TXN_COMMITTED}
- * to record the final state.
- * <p>
- * KetchReplica instances are tightly coupled with a single
- * {@link org.eclipse.jgit.internal.ketch.KetchLeader}. Some state may be
- * accessed by the leader thread and uses the leader's own
- * {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock} to protect shared
- * data.
- */
- public abstract class KetchReplica {
- static final Logger log = LoggerFactory.getLogger(KetchReplica.class);
- private static final byte[] PEEL = { ' ', '^' };
- /** Participation of a replica in establishing consensus. */
- public enum Participation {
- /** Replica can vote. */
- FULL,
- /** Replica does not vote, but tracks leader. */
- FOLLOWER_ONLY;
- }
- /** How this replica wants to receive Ketch commit operations. */
- public enum CommitMethod {
- /** All references are pushed to the peer as standard Git. */
- ALL_REFS,
- /** Only {@code refs/txn/committed} is written/updated. */
- TXN_COMMITTED;
- }
- /** Delay before committing to a replica. */
- public enum CommitSpeed {
- /**
- * Send the commit immediately, even if it could be batched with the
- * next proposal.
- */
- FAST,
- /**
- * If the next proposal is available, batch the commit with it,
- * otherwise just send the commit. This generates less network use, but
- * may provide slower consistency on the replica.
- */
- BATCHED;
- }
- /** Current state of a replica. */
- public enum State {
- /** Leader has not yet contacted the replica. */
- UNKNOWN,
- /** Replica is behind the consensus. */
- LAGGING,
- /** Replica matches the consensus. */
- CURRENT,
- /** Replica has a different (or unknown) history. */
- DIVERGENT,
- /** Replica's history contains the leader's history. */
- AHEAD,
- /** Replica can not be contacted. */
- OFFLINE;
- }
- private final KetchLeader leader;
- private final String replicaName;
- private final Participation participation;
- private final CommitMethod commitMethod;
- private final CommitSpeed commitSpeed;
- private final long minRetryMillis;
- private final long maxRetryMillis;
- private final Map<ObjectId, List<ReceiveCommand>> staged;
- private final Map<String, ReceiveCommand> running;
- private final Map<String, ReceiveCommand> waiting;
- private final List<ReplicaPushRequest> queued;
- /**
- * Value known for {@code "refs/txn/accepted"}.
- * <p>
- * Raft literature refers to this as {@code matchIndex}.
- */
- private ObjectId txnAccepted;
- /**
- * Value known for {@code "refs/txn/committed"}.
- * <p>
- * Raft literature refers to this as {@code commitIndex}. In traditional
- * Raft this is a state variable inside the follower implementation, but
- * Ketch keeps it in the leader.
- */
- private ObjectId txnCommitted;
- /** What is happening with this replica. */
- private State state = UNKNOWN;
- private String error;
- /** Scheduled retry due to communication failure. */
- private Future<?> retryFuture;
- private long lastRetryMillis;
- private long retryAtMillis;
- /**
- * Configure a replica representation.
- *
- * @param leader
- * instance this replica follows.
- * @param name
- * unique-ish name identifying this replica for debugging.
- * @param cfg
- * how Ketch should treat the replica.
- */
- protected KetchReplica(KetchLeader leader, String name, ReplicaConfig cfg) {
- this.leader = leader;
- this.replicaName = name;
- this.participation = cfg.getParticipation();
- this.commitMethod = cfg.getCommitMethod();
- this.commitSpeed = cfg.getCommitSpeed();
- this.minRetryMillis = cfg.getMinRetry(MILLISECONDS);
- this.maxRetryMillis = cfg.getMaxRetry(MILLISECONDS);
- this.staged = new HashMap<>();
- this.running = new HashMap<>();
- this.waiting = new HashMap<>();
- this.queued = new ArrayList<>(4);
- }
- /**
- * Get system configuration.
- *
- * @return system configuration.
- */
- public KetchSystem getSystem() {
- return getLeader().getSystem();
- }
- /**
- * Get leader instance this replica follows.
- *
- * @return leader instance this replica follows.
- */
- public KetchLeader getLeader() {
- return leader;
- }
- /**
- * Get unique-ish name for debugging.
- *
- * @return unique-ish name for debugging.
- */
- public String getName() {
- return replicaName;
- }
- /**
- * Get description of this replica for error/debug logging purposes.
- *
- * @return description of this replica for error/debug logging purposes.
- */
- protected String describeForLog() {
- return getName();
- }
- /**
- * Get how the replica participates in this Ketch system.
- *
- * @return how the replica participates in this Ketch system.
- */
- public Participation getParticipation() {
- return participation;
- }
- /**
- * Get how Ketch will commit to the repository.
- *
- * @return how Ketch will commit to the repository.
- */
- public CommitMethod getCommitMethod() {
- return commitMethod;
- }
- /**
- * Get when Ketch will commit to the repository.
- *
- * @return when Ketch will commit to the repository.
- */
- public CommitSpeed getCommitSpeed() {
- return commitSpeed;
- }
- /**
- * Called by leader to perform graceful shutdown.
- * <p>
- * Default implementation cancels any scheduled retry. Subclasses may add
- * additional logic before or after calling {@code super.shutdown()}.
- * <p>
- * Called with {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock} held
- * by caller.
- */
- protected void shutdown() {
- Future<?> f = retryFuture;
- if (f != null) {
- retryFuture = null;
- f.cancel(true);
- }
- }
- ReplicaSnapshot snapshot() {
- ReplicaSnapshot s = new ReplicaSnapshot(this);
- s.accepted = txnAccepted;
- s.committed = txnCommitted;
- s.state = state;
- s.error = error;
- s.retryAtMillis = waitingForRetry() ? retryAtMillis : 0;
- return s;
- }
- /**
- * Update the leader's view of the replica after a poll.
- * <p>
- * Called with {@link KetchLeader#lock} held by caller.
- *
- * @param refs
- * map of refs from the replica.
- */
- void initialize(Map<String, Ref> refs) {
- if (txnAccepted == null) {
- txnAccepted = getId(refs.get(getSystem().getTxnAccepted()));
- }
- if (txnCommitted == null) {
- txnCommitted = getId(refs.get(getSystem().getTxnCommitted()));
- }
- }
- ObjectId getTxnAccepted() {
- return txnAccepted;
- }
- boolean hasAccepted(LogIndex id) {
- return equals(txnAccepted, id);
- }
- private static boolean equals(@Nullable ObjectId a, LogIndex b) {
- return a != null && b != null && AnyObjectId.isEqual(a, b);
- }
- /**
- * Schedule a proposal round with the replica.
- * <p>
- * Called with {@link KetchLeader#lock} held by caller.
- *
- * @param round
- * current round being run by the leader.
- */
- void pushTxnAcceptedAsync(Round round) {
- List<ReceiveCommand> cmds = new ArrayList<>();
- if (commitSpeed == BATCHED) {
- LogIndex committedIndex = leader.getCommitted();
- if (equals(txnAccepted, committedIndex)
- && !equals(txnCommitted, committedIndex)) {
- prepareTxnCommitted(cmds, committedIndex);
- }
- }
- // TODO(sop) Lagging replicas should build accept on the fly.
- if (round.stageCommands != null) {
- for (ReceiveCommand cmd : round.stageCommands) {
- // TODO(sop): Do not send certain object graphs to replica.
- cmds.add(copy(cmd));
- }
- }
- cmds.add(new ReceiveCommand(
- round.acceptedOldIndex, round.acceptedNewIndex,
- getSystem().getTxnAccepted()));
- pushAsync(new ReplicaPushRequest(this, cmds));
- }
- private static ReceiveCommand copy(ReceiveCommand c) {
- return new ReceiveCommand(c.getOldId(), c.getNewId(), c.getRefName());
- }
- boolean shouldPushUnbatchedCommit(LogIndex committed, boolean leaderIdle) {
- return (leaderIdle || commitSpeed == FAST) && hasAccepted(committed);
- }
- void pushCommitAsync(LogIndex committed) {
- List<ReceiveCommand> cmds = new ArrayList<>();
- prepareTxnCommitted(cmds, committed);
- pushAsync(new ReplicaPushRequest(this, cmds));
- }
- private void prepareTxnCommitted(List<ReceiveCommand> cmds,
- ObjectId committed) {
- removeStaged(cmds, committed);
- cmds.add(new ReceiveCommand(
- txnCommitted, committed,
- getSystem().getTxnCommitted()));
- }
- private void removeStaged(List<ReceiveCommand> cmds, ObjectId committed) {
- List<ReceiveCommand> a = staged.remove(committed);
- if (a != null) {
- delete(cmds, a);
- }
- if (staged.isEmpty() || !(committed instanceof LogIndex)) {
- return;
- }
- LogIndex committedIndex = (LogIndex) committed;
- Iterator<Map.Entry<ObjectId, List<ReceiveCommand>>> itr = staged
- .entrySet().iterator();
- while (itr.hasNext()) {
- Map.Entry<ObjectId, List<ReceiveCommand>> e = itr.next();
- if (e.getKey() instanceof LogIndex) {
- LogIndex stagedIndex = (LogIndex) e.getKey();
- if (stagedIndex.isBefore(committedIndex)) {
- delete(cmds, e.getValue());
- itr.remove();
- }
- }
- }
- }
- private static void delete(List<ReceiveCommand> cmds,
- List<ReceiveCommand> createCmds) {
- for (ReceiveCommand cmd : createCmds) {
- ObjectId id = cmd.getNewId();
- String name = cmd.getRefName();
- cmds.add(new ReceiveCommand(id, ObjectId.zeroId(), name));
- }
- }
- /**
- * Determine the next push for this replica (if any) and start it.
- * <p>
- * If the replica has successfully accepted the committed state of the
- * leader, this method will push all references to the replica using the
- * configured {@link CommitMethod}.
- * <p>
- * If the replica is {@link State#LAGGING} this method will begin catch up
- * by sending a more recent {@code refs/txn/accepted}.
- * <p>
- * Must be invoked with {@link KetchLeader#lock} held by caller.
- */
- private void runNextPushRequest() {
- LogIndex committed = leader.getCommitted();
- if (!equals(txnCommitted, committed)
- && shouldPushUnbatchedCommit(committed, leader.isIdle())) {
- pushCommitAsync(committed);
- }
- if (queued.isEmpty() || !running.isEmpty() || waitingForRetry()) {
- return;
- }
- // Collapse all queued requests into a single request.
- Map<String, ReceiveCommand> cmdMap = new HashMap<>();
- for (ReplicaPushRequest req : queued) {
- for (ReceiveCommand cmd : req.getCommands()) {
- String name = cmd.getRefName();
- ReceiveCommand old = cmdMap.remove(name);
- if (old != null) {
- cmd = new ReceiveCommand(
- old.getOldId(), cmd.getNewId(),
- name);
- }
- cmdMap.put(name, cmd);
- }
- }
- queued.clear();
- waiting.clear();
- List<ReceiveCommand> next = new ArrayList<>(cmdMap.values());
- for (ReceiveCommand cmd : next) {
- running.put(cmd.getRefName(), cmd);
- }
- startPush(new ReplicaPushRequest(this, next));
- }
- private void pushAsync(ReplicaPushRequest req) {
- if (defer(req)) {
- // TODO(sop) Collapse during long retry outage.
- for (ReceiveCommand cmd : req.getCommands()) {
- waiting.put(cmd.getRefName(), cmd);
- }
- queued.add(req);
- } else {
- for (ReceiveCommand cmd : req.getCommands()) {
- running.put(cmd.getRefName(), cmd);
- }
- startPush(req);
- }
- }
- private boolean defer(ReplicaPushRequest req) {
- if (waitingForRetry()) {
- // Prior communication failure; everything is deferred.
- return true;
- }
- for (ReceiveCommand nextCmd : req.getCommands()) {
- ReceiveCommand priorCmd = waiting.get(nextCmd.getRefName());
- if (priorCmd == null) {
- priorCmd = running.get(nextCmd.getRefName());
- }
- if (priorCmd != null) {
- // Another request pending on same ref; that must go first.
- // Verify priorCmd.newId == nextCmd.oldId?
- return true;
- }
- }
- return false;
- }
- private boolean waitingForRetry() {
- Future<?> f = retryFuture;
- return f != null && !f.isDone();
- }
- private void retryLater(ReplicaPushRequest req) {
- Collection<ReceiveCommand> cmds = req.getCommands();
- for (ReceiveCommand cmd : cmds) {
- cmd.setResult(NOT_ATTEMPTED, null);
- if (!waiting.containsKey(cmd.getRefName())) {
- waiting.put(cmd.getRefName(), cmd);
- }
- }
- queued.add(0, new ReplicaPushRequest(this, cmds));
- if (!waitingForRetry()) {
- long delay = FileUtils
- .delay(lastRetryMillis, minRetryMillis, maxRetryMillis);
- if (log.isDebugEnabled()) {
- log.debug("Retrying {} after {} ms", //$NON-NLS-1$
- describeForLog(), Long.valueOf(delay));
- }
- lastRetryMillis = delay;
- retryAtMillis = SystemReader.getInstance().getCurrentTime() + delay;
- retryFuture = getSystem().getExecutor()
- .schedule(new WeakRetryPush(this), delay, MILLISECONDS);
- }
- }
- /** Weakly holds a retrying replica, allowing it to garbage collect. */
- static class WeakRetryPush extends WeakReference<KetchReplica>
- implements Callable<Void> {
- WeakRetryPush(KetchReplica r) {
- super(r);
- }
- @Override
- public Void call() throws Exception {
- KetchReplica r = get();
- if (r != null) {
- r.doRetryPush();
- }
- return null;
- }
- }
- private void doRetryPush() {
- leader.lock.lock();
- try {
- retryFuture = null;
- runNextPushRequest();
- } finally {
- leader.lock.unlock();
- }
- }
- /**
- * Begin executing a single push.
- * <p>
- * This method must move processing onto another thread. Called with
- * {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock} held by caller.
- *
- * @param req
- * the request to send to the replica.
- */
- protected abstract void startPush(ReplicaPushRequest req);
- /**
- * Callback from {@link ReplicaPushRequest} upon success or failure.
- * <p>
- * Acquires the {@link KetchLeader#lock} and updates the leader's internal
- * knowledge about this replica to reflect what has been learned during a
- * push to the replica. In some cases of divergence this method may take
- * some time to determine how the replica has diverged; to reduce contention
- * this is evaluated before acquiring the leader lock.
- *
- * @param repo
- * local repository instance used by the push thread.
- * @param req
- * push request just attempted.
- */
- void afterPush(@Nullable Repository repo, ReplicaPushRequest req) {
- ReceiveCommand acceptCmd = null;
- ReceiveCommand commitCmd = null;
- List<ReceiveCommand> stages = null;
- for (ReceiveCommand cmd : req.getCommands()) {
- String name = cmd.getRefName();
- if (name.equals(getSystem().getTxnAccepted())) {
- acceptCmd = cmd;
- } else if (name.equals(getSystem().getTxnCommitted())) {
- commitCmd = cmd;
- } else if (cmd.getResult() == OK && cmd.getType() == CREATE
- && name.startsWith(getSystem().getTxnStage())) {
- if (stages == null) {
- stages = new ArrayList<>();
- }
- stages.add(cmd);
- }
- }
- State newState = null;
- ObjectId acceptId = readId(req, acceptCmd);
- if (repo != null && acceptCmd != null && acceptCmd.getResult() != OK
- && req.getException() == null) {
- try (LagCheck lag = new LagCheck(this, repo)) {
- newState = lag.check(acceptId, acceptCmd);
- acceptId = lag.getRemoteId();
- }
- }
- leader.lock.lock();
- try {
- for (ReceiveCommand cmd : req.getCommands()) {
- running.remove(cmd.getRefName());
- }
- Throwable err = req.getException();
- if (err != null) {
- state = OFFLINE;
- error = err.toString();
- retryLater(req);
- leader.onReplicaUpdate(this);
- return;
- }
- lastRetryMillis = 0;
- error = null;
- updateView(req, acceptId, commitCmd);
- if (acceptCmd != null && acceptCmd.getResult() == OK) {
- state = hasAccepted(leader.getHead()) ? CURRENT : LAGGING;
- if (stages != null) {
- staged.put(acceptCmd.getNewId(), stages);
- }
- } else if (newState != null) {
- state = newState;
- }
- leader.onReplicaUpdate(this);
- runNextPushRequest();
- } finally {
- leader.lock.unlock();
- }
- }
- private void updateView(ReplicaPushRequest req, @Nullable ObjectId acceptId,
- ReceiveCommand commitCmd) {
- if (acceptId != null) {
- txnAccepted = acceptId;
- }
- ObjectId committed = readId(req, commitCmd);
- if (committed != null) {
- txnCommitted = committed;
- } else if (acceptId != null && txnCommitted == null) {
- // Initialize during first conversation.
- Map<String, Ref> adv = req.getRefs();
- if (adv != null) {
- Ref refs = adv.get(getSystem().getTxnCommitted());
- txnCommitted = getId(refs);
- }
- }
- }
- @Nullable
- private static ObjectId readId(ReplicaPushRequest req,
- @Nullable ReceiveCommand cmd) {
- if (cmd == null) {
- // Ref was not in the command list, do not trust advertisement.
- return null;
- } else if (cmd.getResult() == OK) {
- // Currently at newId.
- return cmd.getNewId();
- }
- Map<String, Ref> refs = req.getRefs();
- return refs != null ? getId(refs.get(cmd.getRefName())) : null;
- }
- /**
- * Fetch objects from the remote using the calling thread.
- * <p>
- * Called without {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock}.
- *
- * @param repo
- * local repository to fetch objects into.
- * @param req
- * the request to fetch from a replica.
- * @throws java.io.IOException
- * communication with the replica was not possible.
- */
- protected abstract void blockingFetch(Repository repo,
- ReplicaFetchRequest req) throws IOException;
- /**
- * Build a list of commands to commit
- * {@link org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod#ALL_REFS}.
- *
- * @param git
- * local leader repository to read committed state from.
- * @param current
- * all known references in the replica's repository. Typically
- * this comes from a push advertisement.
- * @param committed
- * state being pushed to {@code refs/txn/committed}.
- * @return commands to update during commit.
- * @throws java.io.IOException
- * cannot read the committed state.
- */
- protected Collection<ReceiveCommand> prepareCommit(Repository git,
- Map<String, Ref> current, ObjectId committed) throws IOException {
- List<ReceiveCommand> delta = new ArrayList<>();
- Map<String, Ref> remote = new HashMap<>(current);
- try (RevWalk rw = new RevWalk(git);
- TreeWalk tw = new TreeWalk(rw.getObjectReader())) {
- tw.setRecursive(true);
- tw.addTree(rw.parseCommit(committed).getTree());
- while (tw.next()) {
- if (tw.getRawMode(0) != TYPE_GITLINK
- || tw.isPathSuffix(PEEL, 2)) {
- // Symbolic references cannot be pushed.
- // Caching peeled values is handled remotely.
- continue;
- }
- // TODO(sop) Do not send certain ref names to replica.
- String name = RefTree.refName(tw.getPathString());
- Ref oldRef = remote.remove(name);
- ObjectId oldId = getId(oldRef);
- ObjectId newId = tw.getObjectId(0);
- if (!AnyObjectId.isEqual(oldId, newId)) {
- delta.add(new ReceiveCommand(oldId, newId, name));
- }
- }
- }
- // Delete any extra references not in the committed state.
- for (Ref ref : remote.values()) {
- if (canDelete(ref)) {
- delta.add(new ReceiveCommand(
- ref.getObjectId(), ObjectId.zeroId(),
- ref.getName()));
- }
- }
- return delta;
- }
- boolean canDelete(Ref ref) {
- String name = ref.getName();
- if (HEAD.equals(name)) {
- return false;
- }
- if (name.startsWith(getSystem().getTxnNamespace())) {
- return false;
- }
- // TODO(sop) Do not delete precious names from replica.
- return true;
- }
- @NonNull
- static ObjectId getId(@Nullable Ref ref) {
- if (ref != null) {
- ObjectId id = ref.getObjectId();
- if (id != null) {
- return id;
- }
- }
- return ObjectId.zeroId();
- }
- }