KetchReplica.java

  1. /*
  2.  * Copyright (C) 2016, Google Inc. and others
  3.  *
  4.  * This program and the accompanying materials are made available under the
  5.  * terms of the Eclipse Distribution License v. 1.0 which is available at
  6.  * https://www.eclipse.org/org/documents/edl-v10.php.
  7.  *
  8.  * SPDX-License-Identifier: BSD-3-Clause
  9.  */

  10. package org.eclipse.jgit.internal.ketch;

  11. import static java.util.concurrent.TimeUnit.MILLISECONDS;
  12. import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.BATCHED;
  13. import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.FAST;
  14. import static org.eclipse.jgit.internal.ketch.KetchReplica.State.CURRENT;
  15. import static org.eclipse.jgit.internal.ketch.KetchReplica.State.LAGGING;
  16. import static org.eclipse.jgit.internal.ketch.KetchReplica.State.OFFLINE;
  17. import static org.eclipse.jgit.internal.ketch.KetchReplica.State.UNKNOWN;
  18. import static org.eclipse.jgit.lib.Constants.HEAD;
  19. import static org.eclipse.jgit.lib.FileMode.TYPE_GITLINK;
  20. import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
  21. import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
  22. import static org.eclipse.jgit.transport.ReceiveCommand.Type.CREATE;

  23. import java.io.IOException;
  24. import java.lang.ref.WeakReference;
  25. import java.util.ArrayList;
  26. import java.util.Collection;
  27. import java.util.HashMap;
  28. import java.util.Iterator;
  29. import java.util.List;
  30. import java.util.Map;
  31. import java.util.concurrent.Callable;
  32. import java.util.concurrent.Future;

  33. import org.eclipse.jgit.annotations.NonNull;
  34. import org.eclipse.jgit.annotations.Nullable;
  35. import org.eclipse.jgit.internal.storage.reftree.RefTree;
  36. import org.eclipse.jgit.lib.AnyObjectId;
  37. import org.eclipse.jgit.lib.ObjectId;
  38. import org.eclipse.jgit.lib.Ref;
  39. import org.eclipse.jgit.lib.Repository;
  40. import org.eclipse.jgit.revwalk.RevWalk;
  41. import org.eclipse.jgit.transport.ReceiveCommand;
  42. import org.eclipse.jgit.treewalk.TreeWalk;
  43. import org.eclipse.jgit.util.FileUtils;
  44. import org.eclipse.jgit.util.SystemReader;
  45. import org.slf4j.Logger;
  46. import org.slf4j.LoggerFactory;

  47. /**
  48.  * A Ketch replica, either {@link org.eclipse.jgit.internal.ketch.LocalReplica}
  49.  * or {@link org.eclipse.jgit.internal.ketch.RemoteGitReplica}.
  50.  * <p>
  51.  * Replicas can be either a stock Git replica, or a Ketch-aware replica.
  52.  * <p>
  53.  * A stock Git replica has no special knowledge of Ketch and simply stores
  54.  * objects and references. Ketch communicates with the stock Git replica using
  55.  * the Git push wire protocol. The
  56.  * {@link org.eclipse.jgit.internal.ketch.KetchLeader} commits an agreed upon
  57.  * state by pushing all references to the Git replica, for example
  58.  * {@code "refs/heads/master"} is pushed during commit. Stock Git replicas use
  59.  * {@link org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod#ALL_REFS} to
  60.  * record the final state.
  61.  * <p>
  62.  * Ketch-aware replicas understand the {@code RefTree} sent during the proposal
  63.  * and during commit are able to update their own reference space to match the
  64.  * state represented by the {@code RefTree}. Ketch-aware replicas typically use
  65.  * a {@link org.eclipse.jgit.internal.storage.reftree.RefTreeDatabase} and
  66.  * {@link org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod#TXN_COMMITTED}
  67.  * to record the final state.
  68.  * <p>
  69.  * KetchReplica instances are tightly coupled with a single
  70.  * {@link org.eclipse.jgit.internal.ketch.KetchLeader}. Some state may be
  71.  * accessed by the leader thread and uses the leader's own
  72.  * {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock} to protect shared
  73.  * data.
  74.  */
  75. public abstract class KetchReplica {
  76.     static final Logger log = LoggerFactory.getLogger(KetchReplica.class);
  77.     private static final byte[] PEEL = { ' ', '^' };

  78.     /** Participation of a replica in establishing consensus. */
  79.     public enum Participation {
  80.         /** Replica can vote. */
  81.         FULL,

  82.         /** Replica does not vote, but tracks leader. */
  83.         FOLLOWER_ONLY;
  84.     }

  85.     /** How this replica wants to receive Ketch commit operations. */
  86.     public enum CommitMethod {
  87.         /** All references are pushed to the peer as standard Git. */
  88.         ALL_REFS,

  89.         /** Only {@code refs/txn/committed} is written/updated. */
  90.         TXN_COMMITTED;
  91.     }

  92.     /** Delay before committing to a replica. */
  93.     public enum CommitSpeed {
  94.         /**
  95.          * Send the commit immediately, even if it could be batched with the
  96.          * next proposal.
  97.          */
  98.         FAST,

  99.         /**
  100.          * If the next proposal is available, batch the commit with it,
  101.          * otherwise just send the commit. This generates less network use, but
  102.          * may provide slower consistency on the replica.
  103.          */
  104.         BATCHED;
  105.     }

  106.     /** Current state of a replica. */
  107.     public enum State {
  108.         /** Leader has not yet contacted the replica. */
  109.         UNKNOWN,

  110.         /** Replica is behind the consensus. */
  111.         LAGGING,

  112.         /** Replica matches the consensus. */
  113.         CURRENT,

  114.         /** Replica has a different (or unknown) history. */
  115.         DIVERGENT,

  116.         /** Replica's history contains the leader's history. */
  117.         AHEAD,

  118.         /** Replica can not be contacted. */
  119.         OFFLINE;
  120.     }

  121.     private final KetchLeader leader;
  122.     private final String replicaName;
  123.     private final Participation participation;
  124.     private final CommitMethod commitMethod;
  125.     private final CommitSpeed commitSpeed;
  126.     private final long minRetryMillis;
  127.     private final long maxRetryMillis;
  128.     private final Map<ObjectId, List<ReceiveCommand>> staged;
  129.     private final Map<String, ReceiveCommand> running;
  130.     private final Map<String, ReceiveCommand> waiting;
  131.     private final List<ReplicaPushRequest> queued;

  132.     /**
  133.      * Value known for {@code "refs/txn/accepted"}.
  134.      * <p>
  135.      * Raft literature refers to this as {@code matchIndex}.
  136.      */
  137.     private ObjectId txnAccepted;

  138.     /**
  139.      * Value known for {@code "refs/txn/committed"}.
  140.      * <p>
  141.      * Raft literature refers to this as {@code commitIndex}. In traditional
  142.      * Raft this is a state variable inside the follower implementation, but
  143.      * Ketch keeps it in the leader.
  144.      */
  145.     private ObjectId txnCommitted;

  146.     /** What is happening with this replica. */
  147.     private State state = UNKNOWN;
  148.     private String error;

  149.     /** Scheduled retry due to communication failure. */
  150.     private Future<?> retryFuture;
  151.     private long lastRetryMillis;
  152.     private long retryAtMillis;

  153.     /**
  154.      * Configure a replica representation.
  155.      *
  156.      * @param leader
  157.      *            instance this replica follows.
  158.      * @param name
  159.      *            unique-ish name identifying this replica for debugging.
  160.      * @param cfg
  161.      *            how Ketch should treat the replica.
  162.      */
  163.     protected KetchReplica(KetchLeader leader, String name, ReplicaConfig cfg) {
  164.         this.leader = leader;
  165.         this.replicaName = name;
  166.         this.participation = cfg.getParticipation();
  167.         this.commitMethod = cfg.getCommitMethod();
  168.         this.commitSpeed = cfg.getCommitSpeed();
  169.         this.minRetryMillis = cfg.getMinRetry(MILLISECONDS);
  170.         this.maxRetryMillis = cfg.getMaxRetry(MILLISECONDS);
  171.         this.staged = new HashMap<>();
  172.         this.running = new HashMap<>();
  173.         this.waiting = new HashMap<>();
  174.         this.queued = new ArrayList<>(4);
  175.     }

  176.     /**
  177.      * Get system configuration.
  178.      *
  179.      * @return system configuration.
  180.      */
  181.     public KetchSystem getSystem() {
  182.         return getLeader().getSystem();
  183.     }

  184.     /**
  185.      * Get leader instance this replica follows.
  186.      *
  187.      * @return leader instance this replica follows.
  188.      */
  189.     public KetchLeader getLeader() {
  190.         return leader;
  191.     }

  192.     /**
  193.      * Get unique-ish name for debugging.
  194.      *
  195.      * @return unique-ish name for debugging.
  196.      */
  197.     public String getName() {
  198.         return replicaName;
  199.     }

  200.     /**
  201.      * Get description of this replica for error/debug logging purposes.
  202.      *
  203.      * @return description of this replica for error/debug logging purposes.
  204.      */
  205.     protected String describeForLog() {
  206.         return getName();
  207.     }

  208.     /**
  209.      * Get how the replica participates in this Ketch system.
  210.      *
  211.      * @return how the replica participates in this Ketch system.
  212.      */
  213.     public Participation getParticipation() {
  214.         return participation;
  215.     }

  216.     /**
  217.      * Get how Ketch will commit to the repository.
  218.      *
  219.      * @return how Ketch will commit to the repository.
  220.      */
  221.     public CommitMethod getCommitMethod() {
  222.         return commitMethod;
  223.     }

  224.     /**
  225.      * Get when Ketch will commit to the repository.
  226.      *
  227.      * @return when Ketch will commit to the repository.
  228.      */
  229.     public CommitSpeed getCommitSpeed() {
  230.         return commitSpeed;
  231.     }

  232.     /**
  233.      * Called by leader to perform graceful shutdown.
  234.      * <p>
  235.      * Default implementation cancels any scheduled retry. Subclasses may add
  236.      * additional logic before or after calling {@code super.shutdown()}.
  237.      * <p>
  238.      * Called with {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock} held
  239.      * by caller.
  240.      */
  241.     protected void shutdown() {
  242.         Future<?> f = retryFuture;
  243.         if (f != null) {
  244.             retryFuture = null;
  245.             f.cancel(true);
  246.         }
  247.     }

  248.     ReplicaSnapshot snapshot() {
  249.         ReplicaSnapshot s = new ReplicaSnapshot(this);
  250.         s.accepted = txnAccepted;
  251.         s.committed = txnCommitted;
  252.         s.state = state;
  253.         s.error = error;
  254.         s.retryAtMillis = waitingForRetry() ? retryAtMillis : 0;
  255.         return s;
  256.     }

  257.     /**
  258.      * Update the leader's view of the replica after a poll.
  259.      * <p>
  260.      * Called with {@link KetchLeader#lock} held by caller.
  261.      *
  262.      * @param refs
  263.      *            map of refs from the replica.
  264.      */
  265.     void initialize(Map<String, Ref> refs) {
  266.         if (txnAccepted == null) {
  267.             txnAccepted = getId(refs.get(getSystem().getTxnAccepted()));
  268.         }
  269.         if (txnCommitted == null) {
  270.             txnCommitted = getId(refs.get(getSystem().getTxnCommitted()));
  271.         }
  272.     }

  273.     ObjectId getTxnAccepted() {
  274.         return txnAccepted;
  275.     }

  276.     boolean hasAccepted(LogIndex id) {
  277.         return equals(txnAccepted, id);
  278.     }

  279.     private static boolean equals(@Nullable ObjectId a, LogIndex b) {
  280.         return a != null && b != null && AnyObjectId.isEqual(a, b);
  281.     }

  282.     /**
  283.      * Schedule a proposal round with the replica.
  284.      * <p>
  285.      * Called with {@link KetchLeader#lock} held by caller.
  286.      *
  287.      * @param round
  288.      *            current round being run by the leader.
  289.      */
  290.     void pushTxnAcceptedAsync(Round round) {
  291.         List<ReceiveCommand> cmds = new ArrayList<>();
  292.         if (commitSpeed == BATCHED) {
  293.             LogIndex committedIndex = leader.getCommitted();
  294.             if (equals(txnAccepted, committedIndex)
  295.                     && !equals(txnCommitted, committedIndex)) {
  296.                 prepareTxnCommitted(cmds, committedIndex);
  297.             }
  298.         }

  299.         // TODO(sop) Lagging replicas should build accept on the fly.
  300.         if (round.stageCommands != null) {
  301.             for (ReceiveCommand cmd : round.stageCommands) {
  302.                 // TODO(sop): Do not send certain object graphs to replica.
  303.                 cmds.add(copy(cmd));
  304.             }
  305.         }
  306.         cmds.add(new ReceiveCommand(
  307.                 round.acceptedOldIndex, round.acceptedNewIndex,
  308.                 getSystem().getTxnAccepted()));
  309.         pushAsync(new ReplicaPushRequest(this, cmds));
  310.     }

  311.     private static ReceiveCommand copy(ReceiveCommand c) {
  312.         return new ReceiveCommand(c.getOldId(), c.getNewId(), c.getRefName());
  313.     }

  314.     boolean shouldPushUnbatchedCommit(LogIndex committed, boolean leaderIdle) {
  315.         return (leaderIdle || commitSpeed == FAST) && hasAccepted(committed);
  316.     }

  317.     void pushCommitAsync(LogIndex committed) {
  318.         List<ReceiveCommand> cmds = new ArrayList<>();
  319.         prepareTxnCommitted(cmds, committed);
  320.         pushAsync(new ReplicaPushRequest(this, cmds));
  321.     }

  322.     private void prepareTxnCommitted(List<ReceiveCommand> cmds,
  323.             ObjectId committed) {
  324.         removeStaged(cmds, committed);
  325.         cmds.add(new ReceiveCommand(
  326.                 txnCommitted, committed,
  327.                 getSystem().getTxnCommitted()));
  328.     }

  329.     private void removeStaged(List<ReceiveCommand> cmds, ObjectId committed) {
  330.         List<ReceiveCommand> a = staged.remove(committed);
  331.         if (a != null) {
  332.             delete(cmds, a);
  333.         }
  334.         if (staged.isEmpty() || !(committed instanceof LogIndex)) {
  335.             return;
  336.         }

  337.         LogIndex committedIndex = (LogIndex) committed;
  338.         Iterator<Map.Entry<ObjectId, List<ReceiveCommand>>> itr = staged
  339.                 .entrySet().iterator();
  340.         while (itr.hasNext()) {
  341.             Map.Entry<ObjectId, List<ReceiveCommand>> e = itr.next();
  342.             if (e.getKey() instanceof LogIndex) {
  343.                 LogIndex stagedIndex = (LogIndex) e.getKey();
  344.                 if (stagedIndex.isBefore(committedIndex)) {
  345.                     delete(cmds, e.getValue());
  346.                     itr.remove();
  347.                 }
  348.             }
  349.         }
  350.     }

  351.     private static void delete(List<ReceiveCommand> cmds,
  352.             List<ReceiveCommand> createCmds) {
  353.         for (ReceiveCommand cmd : createCmds) {
  354.             ObjectId id = cmd.getNewId();
  355.             String name = cmd.getRefName();
  356.             cmds.add(new ReceiveCommand(id, ObjectId.zeroId(), name));
  357.         }
  358.     }

  359.     /**
  360.      * Determine the next push for this replica (if any) and start it.
  361.      * <p>
  362.      * If the replica has successfully accepted the committed state of the
  363.      * leader, this method will push all references to the replica using the
  364.      * configured {@link CommitMethod}.
  365.      * <p>
  366.      * If the replica is {@link State#LAGGING} this method will begin catch up
  367.      * by sending a more recent {@code refs/txn/accepted}.
  368.      * <p>
  369.      * Must be invoked with {@link KetchLeader#lock} held by caller.
  370.      */
  371.     private void runNextPushRequest() {
  372.         LogIndex committed = leader.getCommitted();
  373.         if (!equals(txnCommitted, committed)
  374.                 && shouldPushUnbatchedCommit(committed, leader.isIdle())) {
  375.             pushCommitAsync(committed);
  376.         }

  377.         if (queued.isEmpty() || !running.isEmpty() || waitingForRetry()) {
  378.             return;
  379.         }

  380.         // Collapse all queued requests into a single request.
  381.         Map<String, ReceiveCommand> cmdMap = new HashMap<>();
  382.         for (ReplicaPushRequest req : queued) {
  383.             for (ReceiveCommand cmd : req.getCommands()) {
  384.                 String name = cmd.getRefName();
  385.                 ReceiveCommand old = cmdMap.remove(name);
  386.                 if (old != null) {
  387.                     cmd = new ReceiveCommand(
  388.                             old.getOldId(), cmd.getNewId(),
  389.                             name);
  390.                 }
  391.                 cmdMap.put(name, cmd);
  392.             }
  393.         }
  394.         queued.clear();
  395.         waiting.clear();

  396.         List<ReceiveCommand> next = new ArrayList<>(cmdMap.values());
  397.         for (ReceiveCommand cmd : next) {
  398.             running.put(cmd.getRefName(), cmd);
  399.         }
  400.         startPush(new ReplicaPushRequest(this, next));
  401.     }

  402.     private void pushAsync(ReplicaPushRequest req) {
  403.         if (defer(req)) {
  404.             // TODO(sop) Collapse during long retry outage.
  405.             for (ReceiveCommand cmd : req.getCommands()) {
  406.                 waiting.put(cmd.getRefName(), cmd);
  407.             }
  408.             queued.add(req);
  409.         } else {
  410.             for (ReceiveCommand cmd : req.getCommands()) {
  411.                 running.put(cmd.getRefName(), cmd);
  412.             }
  413.             startPush(req);
  414.         }
  415.     }

  416.     private boolean defer(ReplicaPushRequest req) {
  417.         if (waitingForRetry()) {
  418.             // Prior communication failure; everything is deferred.
  419.             return true;
  420.         }

  421.         for (ReceiveCommand nextCmd : req.getCommands()) {
  422.             ReceiveCommand priorCmd = waiting.get(nextCmd.getRefName());
  423.             if (priorCmd == null) {
  424.                 priorCmd = running.get(nextCmd.getRefName());
  425.             }
  426.             if (priorCmd != null) {
  427.                 // Another request pending on same ref; that must go first.
  428.                 // Verify priorCmd.newId == nextCmd.oldId?
  429.                 return true;
  430.             }
  431.         }
  432.         return false;
  433.     }

  434.     private boolean waitingForRetry() {
  435.         Future<?> f = retryFuture;
  436.         return f != null && !f.isDone();
  437.     }

  438.     private void retryLater(ReplicaPushRequest req) {
  439.         Collection<ReceiveCommand> cmds = req.getCommands();
  440.         for (ReceiveCommand cmd : cmds) {
  441.             cmd.setResult(NOT_ATTEMPTED, null);
  442.             if (!waiting.containsKey(cmd.getRefName())) {
  443.                 waiting.put(cmd.getRefName(), cmd);
  444.             }
  445.         }
  446.         queued.add(0, new ReplicaPushRequest(this, cmds));

  447.         if (!waitingForRetry()) {
  448.             long delay = FileUtils
  449.                 .delay(lastRetryMillis, minRetryMillis, maxRetryMillis);
  450.             if (log.isDebugEnabled()) {
  451.                 log.debug("Retrying {} after {} ms", //$NON-NLS-1$
  452.                         describeForLog(), Long.valueOf(delay));
  453.             }
  454.             lastRetryMillis = delay;
  455.             retryAtMillis = SystemReader.getInstance().getCurrentTime() + delay;
  456.             retryFuture = getSystem().getExecutor()
  457.                     .schedule(new WeakRetryPush(this), delay, MILLISECONDS);
  458.         }
  459.     }

  460.     /** Weakly holds a retrying replica, allowing it to garbage collect. */
  461.     static class WeakRetryPush extends WeakReference<KetchReplica>
  462.             implements Callable<Void> {
  463.         WeakRetryPush(KetchReplica r) {
  464.             super(r);
  465.         }

  466.         @Override
  467.         public Void call() throws Exception {
  468.             KetchReplica r = get();
  469.             if (r != null) {
  470.                 r.doRetryPush();
  471.             }
  472.             return null;
  473.         }
  474.     }

  475.     private void doRetryPush() {
  476.         leader.lock.lock();
  477.         try {
  478.             retryFuture = null;
  479.             runNextPushRequest();
  480.         } finally {
  481.             leader.lock.unlock();
  482.         }
  483.     }

  484.     /**
  485.      * Begin executing a single push.
  486.      * <p>
  487.      * This method must move processing onto another thread. Called with
  488.      * {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock} held by caller.
  489.      *
  490.      * @param req
  491.      *            the request to send to the replica.
  492.      */
  493.     protected abstract void startPush(ReplicaPushRequest req);

  494.     /**
  495.      * Callback from {@link ReplicaPushRequest} upon success or failure.
  496.      * <p>
  497.      * Acquires the {@link KetchLeader#lock} and updates the leader's internal
  498.      * knowledge about this replica to reflect what has been learned during a
  499.      * push to the replica. In some cases of divergence this method may take
  500.      * some time to determine how the replica has diverged; to reduce contention
  501.      * this is evaluated before acquiring the leader lock.
  502.      *
  503.      * @param repo
  504.      *            local repository instance used by the push thread.
  505.      * @param req
  506.      *            push request just attempted.
  507.      */
  508.     void afterPush(@Nullable Repository repo, ReplicaPushRequest req) {
  509.         ReceiveCommand acceptCmd = null;
  510.         ReceiveCommand commitCmd = null;
  511.         List<ReceiveCommand> stages = null;

  512.         for (ReceiveCommand cmd : req.getCommands()) {
  513.             String name = cmd.getRefName();
  514.             if (name.equals(getSystem().getTxnAccepted())) {
  515.                 acceptCmd = cmd;
  516.             } else if (name.equals(getSystem().getTxnCommitted())) {
  517.                 commitCmd = cmd;
  518.             } else if (cmd.getResult() == OK && cmd.getType() == CREATE
  519.                     && name.startsWith(getSystem().getTxnStage())) {
  520.                 if (stages == null) {
  521.                     stages = new ArrayList<>();
  522.                 }
  523.                 stages.add(cmd);
  524.             }
  525.         }

  526.         State newState = null;
  527.         ObjectId acceptId = readId(req, acceptCmd);
  528.         if (repo != null && acceptCmd != null && acceptCmd.getResult() != OK
  529.                 && req.getException() == null) {
  530.             try (LagCheck lag = new LagCheck(this, repo)) {
  531.                 newState = lag.check(acceptId, acceptCmd);
  532.                 acceptId = lag.getRemoteId();
  533.             }
  534.         }

  535.         leader.lock.lock();
  536.         try {
  537.             for (ReceiveCommand cmd : req.getCommands()) {
  538.                 running.remove(cmd.getRefName());
  539.             }

  540.             Throwable err = req.getException();
  541.             if (err != null) {
  542.                 state = OFFLINE;
  543.                 error = err.toString();
  544.                 retryLater(req);
  545.                 leader.onReplicaUpdate(this);
  546.                 return;
  547.             }

  548.             lastRetryMillis = 0;
  549.             error = null;
  550.             updateView(req, acceptId, commitCmd);

  551.             if (acceptCmd != null && acceptCmd.getResult() == OK) {
  552.                 state = hasAccepted(leader.getHead()) ? CURRENT : LAGGING;
  553.                 if (stages != null) {
  554.                     staged.put(acceptCmd.getNewId(), stages);
  555.                 }
  556.             } else if (newState != null) {
  557.                 state = newState;
  558.             }

  559.             leader.onReplicaUpdate(this);
  560.             runNextPushRequest();
  561.         } finally {
  562.             leader.lock.unlock();
  563.         }
  564.     }

  565.     private void updateView(ReplicaPushRequest req, @Nullable ObjectId acceptId,
  566.             ReceiveCommand commitCmd) {
  567.         if (acceptId != null) {
  568.             txnAccepted = acceptId;
  569.         }

  570.         ObjectId committed = readId(req, commitCmd);
  571.         if (committed != null) {
  572.             txnCommitted = committed;
  573.         } else if (acceptId != null && txnCommitted == null) {
  574.             // Initialize during first conversation.
  575.             Map<String, Ref> adv = req.getRefs();
  576.             if (adv != null) {
  577.                 Ref refs = adv.get(getSystem().getTxnCommitted());
  578.                 txnCommitted = getId(refs);
  579.             }
  580.         }
  581.     }

  582.     @Nullable
  583.     private static ObjectId readId(ReplicaPushRequest req,
  584.             @Nullable ReceiveCommand cmd) {
  585.         if (cmd == null) {
  586.             // Ref was not in the command list, do not trust advertisement.
  587.             return null;

  588.         } else if (cmd.getResult() == OK) {
  589.             // Currently at newId.
  590.             return cmd.getNewId();
  591.         }

  592.         Map<String, Ref> refs = req.getRefs();
  593.         return refs != null ? getId(refs.get(cmd.getRefName())) : null;
  594.     }

  595.     /**
  596.      * Fetch objects from the remote using the calling thread.
  597.      * <p>
  598.      * Called without {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock}.
  599.      *
  600.      * @param repo
  601.      *            local repository to fetch objects into.
  602.      * @param req
  603.      *            the request to fetch from a replica.
  604.      * @throws java.io.IOException
  605.      *             communication with the replica was not possible.
  606.      */
  607.     protected abstract void blockingFetch(Repository repo,
  608.             ReplicaFetchRequest req) throws IOException;

  609.     /**
  610.      * Build a list of commands to commit
  611.      * {@link org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod#ALL_REFS}.
  612.      *
  613.      * @param git
  614.      *            local leader repository to read committed state from.
  615.      * @param current
  616.      *            all known references in the replica's repository. Typically
  617.      *            this comes from a push advertisement.
  618.      * @param committed
  619.      *            state being pushed to {@code refs/txn/committed}.
  620.      * @return commands to update during commit.
  621.      * @throws java.io.IOException
  622.      *             cannot read the committed state.
  623.      */
  624.     protected Collection<ReceiveCommand> prepareCommit(Repository git,
  625.             Map<String, Ref> current, ObjectId committed) throws IOException {
  626.         List<ReceiveCommand> delta = new ArrayList<>();
  627.         Map<String, Ref> remote = new HashMap<>(current);
  628.         try (RevWalk rw = new RevWalk(git);
  629.                 TreeWalk tw = new TreeWalk(rw.getObjectReader())) {
  630.             tw.setRecursive(true);
  631.             tw.addTree(rw.parseCommit(committed).getTree());
  632.             while (tw.next()) {
  633.                 if (tw.getRawMode(0) != TYPE_GITLINK
  634.                         || tw.isPathSuffix(PEEL, 2)) {
  635.                     // Symbolic references cannot be pushed.
  636.                     // Caching peeled values is handled remotely.
  637.                     continue;
  638.                 }

  639.                 // TODO(sop) Do not send certain ref names to replica.
  640.                 String name = RefTree.refName(tw.getPathString());
  641.                 Ref oldRef = remote.remove(name);
  642.                 ObjectId oldId = getId(oldRef);
  643.                 ObjectId newId = tw.getObjectId(0);
  644.                 if (!AnyObjectId.isEqual(oldId, newId)) {
  645.                     delta.add(new ReceiveCommand(oldId, newId, name));
  646.                 }
  647.             }
  648.         }

  649.         // Delete any extra references not in the committed state.
  650.         for (Ref ref : remote.values()) {
  651.             if (canDelete(ref)) {
  652.                 delta.add(new ReceiveCommand(
  653.                     ref.getObjectId(), ObjectId.zeroId(),
  654.                     ref.getName()));
  655.             }
  656.         }
  657.         return delta;
  658.     }

  659.     boolean canDelete(Ref ref) {
  660.         String name = ref.getName();
  661.         if (HEAD.equals(name)) {
  662.             return false;
  663.         }
  664.         if (name.startsWith(getSystem().getTxnNamespace())) {
  665.             return false;
  666.         }
  667.         // TODO(sop) Do not delete precious names from replica.
  668.         return true;
  669.     }

  670.     @NonNull
  671.     static ObjectId getId(@Nullable Ref ref) {
  672.         if (ref != null) {
  673.             ObjectId id = ref.getObjectId();
  674.             if (id != null) {
  675.                 return id;
  676.             }
  677.         }
  678.         return ObjectId.zeroId();
  679.     }
  680. }