View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.ketch;
45  
46  import static java.util.concurrent.TimeUnit.MILLISECONDS;
47  import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.BATCHED;
48  import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.FAST;
49  import static org.eclipse.jgit.internal.ketch.KetchReplica.State.CURRENT;
50  import static org.eclipse.jgit.internal.ketch.KetchReplica.State.LAGGING;
51  import static org.eclipse.jgit.internal.ketch.KetchReplica.State.OFFLINE;
52  import static org.eclipse.jgit.internal.ketch.KetchReplica.State.UNKNOWN;
53  import static org.eclipse.jgit.lib.Constants.HEAD;
54  import static org.eclipse.jgit.lib.FileMode.TYPE_GITLINK;
55  import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
56  import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
57  import static org.eclipse.jgit.transport.ReceiveCommand.Type.CREATE;
58  
59  import java.io.IOException;
60  import java.lang.ref.WeakReference;
61  import java.util.ArrayList;
62  import java.util.Collection;
63  import java.util.HashMap;
64  import java.util.Iterator;
65  import java.util.List;
66  import java.util.Map;
67  import java.util.concurrent.Callable;
68  import java.util.concurrent.Future;
69  
70  import org.eclipse.jgit.annotations.NonNull;
71  import org.eclipse.jgit.annotations.Nullable;
72  import org.eclipse.jgit.internal.storage.reftree.RefTree;
73  import org.eclipse.jgit.lib.AnyObjectId;
74  import org.eclipse.jgit.lib.ObjectId;
75  import org.eclipse.jgit.lib.Ref;
76  import org.eclipse.jgit.lib.Repository;
77  import org.eclipse.jgit.revwalk.RevWalk;
78  import org.eclipse.jgit.transport.ReceiveCommand;
79  import org.eclipse.jgit.treewalk.TreeWalk;
80  import org.eclipse.jgit.util.SystemReader;
81  import org.slf4j.Logger;
82  import org.slf4j.LoggerFactory;
83  
84  /**
85   * A Ketch replica, either {@link LocalReplica} or {@link RemoteGitReplica}.
86   * <p>
87   * Replicas can be either a stock Git replica, or a Ketch-aware replica.
88   * <p>
89   * A stock Git replica has no special knowledge of Ketch and simply stores
90   * objects and references. Ketch communicates with the stock Git replica using
91   * the Git push wire protocol. The {@link KetchLeader} commits an agreed upon
92   * state by pushing all references to the Git replica, for example
93   * {@code "refs/heads/master"} is pushed during commit. Stock Git replicas use
94   * {@link CommitMethod#ALL_REFS} to record the final state.
95   * <p>
96   * Ketch-aware replicas understand the {@code RefTree} sent during the proposal
97   * and during commit are able to update their own reference space to match the
98   * state represented by the {@code RefTree}. Ketch-aware replicas typically use
99   * a {@link org.eclipse.jgit.internal.storage.reftree.RefTreeDatabase} and
100  * {@link CommitMethod#TXN_COMMITTED} to record the final state.
101  * <p>
102  * KetchReplica instances are tightly coupled with a single {@link KetchLeader}.
103  * Some state may be accessed by the leader thread and uses the leader's own
104  * {@link KetchLeader#lock} to protect shared data.
105  */
106 public abstract class KetchReplica {
107 	static final Logger log = LoggerFactory.getLogger(KetchReplica.class);
108 	private static final byte[] PEEL = { ' ', '^' };
109 
110 	/** Participation of a replica in establishing consensus. */
111 	public enum Participation {
112 		/** Replica can vote. */
113 		FULL,
114 
115 		/** Replica does not vote, but tracks leader. */
116 		FOLLOWER_ONLY;
117 	}
118 
119 	/** How this replica wants to receive Ketch commit operations. */
120 	public enum CommitMethod {
121 		/** All references are pushed to the peer as standard Git. */
122 		ALL_REFS,
123 
124 		/** Only {@code refs/txn/committed} is written/updated. */
125 		TXN_COMMITTED;
126 	}
127 
128 	/** Delay before committing to a replica. */
129 	public enum CommitSpeed {
130 		/**
131 		 * Send the commit immediately, even if it could be batched with the
132 		 * next proposal.
133 		 */
134 		FAST,
135 
136 		/**
137 		 * If the next proposal is available, batch the commit with it,
138 		 * otherwise just send the commit. This generates less network use, but
139 		 * may provide slower consistency on the replica.
140 		 */
141 		BATCHED;
142 	}
143 
144 	/** Current state of a replica. */
145 	public enum State {
146 		/** Leader has not yet contacted the replica. */
147 		UNKNOWN,
148 
149 		/** Replica is behind the consensus. */
150 		LAGGING,
151 
152 		/** Replica matches the consensus. */
153 		CURRENT,
154 
155 		/** Replica has a different (or unknown) history. */
156 		DIVERGENT,
157 
158 		/** Replica's history contains the leader's history. */
159 		AHEAD,
160 
161 		/** Replica can not be contacted. */
162 		OFFLINE;
163 	}
164 
165 	private final KetchLeader leader;
166 	private final String replicaName;
167 	private final Participation participation;
168 	private final CommitMethod commitMethod;
169 	private final CommitSpeed commitSpeed;
170 	private final long minRetryMillis;
171 	private final long maxRetryMillis;
172 	private final Map<ObjectId, List<ReceiveCommand>> staged;
173 	private final Map<String, ReceiveCommand> running;
174 	private final Map<String, ReceiveCommand> waiting;
175 	private final List<ReplicaPushRequest> queued;
176 
177 	/**
178 	 * Value known for {@code "refs/txn/accepted"}.
179 	 * <p>
180 	 * Raft literature refers to this as {@code matchIndex}.
181 	 */
182 	private ObjectId txnAccepted;
183 
184 	/**
185 	 * Value known for {@code "refs/txn/committed"}.
186 	 * <p>
187 	 * Raft literature refers to this as {@code commitIndex}. In traditional
188 	 * Raft this is a state variable inside the follower implementation, but
189 	 * Ketch keeps it in the leader.
190 	 */
191 	private ObjectId txnCommitted;
192 
193 	/** What is happening with this replica. */
194 	private State state = UNKNOWN;
195 	private String error;
196 
197 	/** Scheduled retry due to communication failure. */
198 	private Future<?> retryFuture;
199 	private long lastRetryMillis;
200 	private long retryAtMillis;
201 
202 	/**
203 	 * Configure a replica representation.
204 	 *
205 	 * @param leader
206 	 *            instance this replica follows.
207 	 * @param name
208 	 *            unique-ish name identifying this replica for debugging.
209 	 * @param cfg
210 	 *            how Ketch should treat the replica.
211 	 */
212 	protected KetchReplica(KetchLeader leader, String name, ReplicaConfig cfg) {
213 		this.leader = leader;
214 		this.replicaName = name;
215 		this.participation = cfg.getParticipation();
216 		this.commitMethod = cfg.getCommitMethod();
217 		this.commitSpeed = cfg.getCommitSpeed();
218 		this.minRetryMillis = cfg.getMinRetry(MILLISECONDS);
219 		this.maxRetryMillis = cfg.getMaxRetry(MILLISECONDS);
220 		this.staged = new HashMap<>();
221 		this.running = new HashMap<>();
222 		this.waiting = new HashMap<>();
223 		this.queued = new ArrayList<>(4);
224 	}
225 
226 	/** @return system configuration. */
227 	public KetchSystem getSystem() {
228 		return getLeader().getSystem();
229 	}
230 
231 	/** @return leader instance this replica follows. */
232 	public KetchLeader getLeader() {
233 		return leader;
234 	}
235 
236 	/** @return unique-ish name for debugging. */
237 	public String getName() {
238 		return replicaName;
239 	}
240 
241 	/** @return description of this replica for error/debug logging purposes. */
242 	protected String describeForLog() {
243 		return getName();
244 	}
245 
246 	/** @return how the replica participates in this Ketch system. */
247 	public Participation getParticipation() {
248 		return participation;
249 	}
250 
251 	/** @return how Ketch will commit to the repository. */
252 	public CommitMethod getCommitMethod() {
253 		return commitMethod;
254 	}
255 
256 	/** @return when Ketch will commit to the repository. */
257 	public CommitSpeed getCommitSpeed() {
258 		return commitSpeed;
259 	}
260 
261 	/**
262 	 * Called by leader to perform graceful shutdown.
263 	 * <p>
264 	 * Default implementation cancels any scheduled retry. Subclasses may add
265 	 * additional logic before or after calling {@code super.shutdown()}.
266 	 * <p>
267 	 * Called with {@link KetchLeader#lock} held by caller.
268 	 */
269 	protected void shutdown() {
270 		Future<?> f = retryFuture;
271 		if (f != null) {
272 			retryFuture = null;
273 			f.cancel(true);
274 		}
275 	}
276 
277 	ReplicaSnapshot snapshot() {
278 		ReplicaSnapshot s = new ReplicaSnapshot(this);
279 		s.accepted = txnAccepted;
280 		s.committed = txnCommitted;
281 		s.state = state;
282 		s.error = error;
283 		s.retryAtMillis = waitingForRetry() ? retryAtMillis : 0;
284 		return s;
285 	}
286 
287 	/**
288 	 * Update the leader's view of the replica after a poll.
289 	 * <p>
290 	 * Called with {@link KetchLeader#lock} held by caller.
291 	 *
292 	 * @param refs
293 	 *            map of refs from the replica.
294 	 */
295 	void initialize(Map<String, Ref> refs) {
296 		if (txnAccepted == null) {
297 			txnAccepted = getId(refs.get(getSystem().getTxnAccepted()));
298 		}
299 		if (txnCommitted == null) {
300 			txnCommitted = getId(refs.get(getSystem().getTxnCommitted()));
301 		}
302 	}
303 
304 	ObjectId getTxnAccepted() {
305 		return txnAccepted;
306 	}
307 
308 	boolean hasAccepted(LogIndex id) {
309 		return equals(txnAccepted, id);
310 	}
311 
312 	private static boolean equals(@Nullable ObjectId a, LogIndex b) {
313 		return a != null && b != null && AnyObjectId.equals(a, b);
314 	}
315 
316 	/**
317 	 * Schedule a proposal round with the replica.
318 	 * <p>
319 	 * Called with {@link KetchLeader#lock} held by caller.
320 	 *
321 	 * @param round
322 	 *            current round being run by the leader.
323 	 */
324 	void pushTxnAcceptedAsync(Round round) {
325 		List<ReceiveCommand> cmds = new ArrayList<>();
326 		if (commitSpeed == BATCHED) {
327 			LogIndex committedIndex = leader.getCommitted();
328 			if (equals(txnAccepted, committedIndex)
329 					&& !equals(txnCommitted, committedIndex)) {
330 				prepareTxnCommitted(cmds, committedIndex);
331 			}
332 		}
333 
334 		// TODO(sop) Lagging replicas should build accept on the fly.
335 		if (round.stageCommands != null) {
336 			for (ReceiveCommand cmd : round.stageCommands) {
337 				// TODO(sop): Do not send certain object graphs to replica.
338 				cmds.add(copy(cmd));
339 			}
340 		}
341 		cmds.add(new ReceiveCommand(
342 				round.acceptedOldIndex, round.acceptedNewIndex,
343 				getSystem().getTxnAccepted()));
344 		pushAsync(new ReplicaPushRequest(this, cmds));
345 	}
346 
347 	private static ReceiveCommand copy(ReceiveCommand c) {
348 		return new ReceiveCommand(c.getOldId(), c.getNewId(), c.getRefName());
349 	}
350 
351 	boolean shouldPushUnbatchedCommit(LogIndex committed, boolean leaderIdle) {
352 		return (leaderIdle || commitSpeed == FAST) && hasAccepted(committed);
353 	}
354 
355 	void pushCommitAsync(LogIndex committed) {
356 		List<ReceiveCommand> cmds = new ArrayList<>();
357 		prepareTxnCommitted(cmds, committed);
358 		pushAsync(new ReplicaPushRequest(this, cmds));
359 	}
360 
361 	private void prepareTxnCommitted(List<ReceiveCommand> cmds,
362 			ObjectId committed) {
363 		removeStaged(cmds, committed);
364 		cmds.add(new ReceiveCommand(
365 				txnCommitted, committed,
366 				getSystem().getTxnCommitted()));
367 	}
368 
369 	private void removeStaged(List<ReceiveCommand> cmds, ObjectId committed) {
370 		List<ReceiveCommand> a = staged.remove(committed);
371 		if (a != null) {
372 			delete(cmds, a);
373 		}
374 		if (staged.isEmpty() || !(committed instanceof LogIndex)) {
375 			return;
376 		}
377 
378 		LogIndex committedIndex = (LogIndex) committed;
379 		Iterator<Map.Entry<ObjectId, List<ReceiveCommand>>> itr = staged
380 				.entrySet().iterator();
381 		while (itr.hasNext()) {
382 			Map.Entry<ObjectId, List<ReceiveCommand>> e = itr.next();
383 			if (e.getKey() instanceof LogIndex) {
384 				LogIndex stagedIndex = (LogIndex) e.getKey();
385 				if (stagedIndex.isBefore(committedIndex)) {
386 					delete(cmds, e.getValue());
387 					itr.remove();
388 				}
389 			}
390 		}
391 	}
392 
393 	private static void delete(List<ReceiveCommand> cmds,
394 			List<ReceiveCommand> createCmds) {
395 		for (ReceiveCommand cmd : createCmds) {
396 			ObjectId id = cmd.getNewId();
397 			String name = cmd.getRefName();
398 			cmds.add(new ReceiveCommand(id, ObjectId.zeroId(), name));
399 		}
400 	}
401 
402 	/**
403 	 * Determine the next push for this replica (if any) and start it.
404 	 * <p>
405 	 * If the replica has successfully accepted the committed state of the
406 	 * leader, this method will push all references to the replica using the
407 	 * configured {@link CommitMethod}.
408 	 * <p>
409 	 * If the replica is {@link State#LAGGING} this method will begin catch up
410 	 * by sending a more recent {@code refs/txn/accepted}.
411 	 * <p>
412 	 * Must be invoked with {@link KetchLeader#lock} held by caller.
413 	 */
414 	private void runNextPushRequest() {
415 		LogIndex committed = leader.getCommitted();
416 		if (!equals(txnCommitted, committed)
417 				&& shouldPushUnbatchedCommit(committed, leader.isIdle())) {
418 			pushCommitAsync(committed);
419 		}
420 
421 		if (queued.isEmpty() || !running.isEmpty() || waitingForRetry()) {
422 			return;
423 		}
424 
425 		// Collapse all queued requests into a single request.
426 		Map<String, ReceiveCommand> cmdMap = new HashMap<>();
427 		for (ReplicaPushRequest req : queued) {
428 			for (ReceiveCommand cmd : req.getCommands()) {
429 				String name = cmd.getRefName();
430 				ReceiveCommand old = cmdMap.remove(name);
431 				if (old != null) {
432 					cmd = new ReceiveCommand(
433 							old.getOldId(), cmd.getNewId(),
434 							name);
435 				}
436 				cmdMap.put(name, cmd);
437 			}
438 		}
439 		queued.clear();
440 		waiting.clear();
441 
442 		List<ReceiveCommand> next = new ArrayList<>(cmdMap.values());
443 		for (ReceiveCommand cmd : next) {
444 			running.put(cmd.getRefName(), cmd);
445 		}
446 		startPush(new ReplicaPushRequest(this, next));
447 	}
448 
449 	private void pushAsync(ReplicaPushRequest req) {
450 		if (defer(req)) {
451 			// TODO(sop) Collapse during long retry outage.
452 			for (ReceiveCommand cmd : req.getCommands()) {
453 				waiting.put(cmd.getRefName(), cmd);
454 			}
455 			queued.add(req);
456 		} else {
457 			for (ReceiveCommand cmd : req.getCommands()) {
458 				running.put(cmd.getRefName(), cmd);
459 			}
460 			startPush(req);
461 		}
462 	}
463 
464 	private boolean defer(ReplicaPushRequest req) {
465 		if (waitingForRetry()) {
466 			// Prior communication failure; everything is deferred.
467 			return true;
468 		}
469 
470 		for (ReceiveCommand nextCmd : req.getCommands()) {
471 			ReceiveCommand priorCmd = waiting.get(nextCmd.getRefName());
472 			if (priorCmd == null) {
473 				priorCmd = running.get(nextCmd.getRefName());
474 			}
475 			if (priorCmd != null) {
476 				// Another request pending on same ref; that must go first.
477 				// Verify priorCmd.newId == nextCmd.oldId?
478 				return true;
479 			}
480 		}
481 		return false;
482 	}
483 
484 	private boolean waitingForRetry() {
485 		Future<?> f = retryFuture;
486 		return f != null && !f.isDone();
487 	}
488 
489 	private void retryLater(ReplicaPushRequest req) {
490 		Collection<ReceiveCommand> cmds = req.getCommands();
491 		for (ReceiveCommand cmd : cmds) {
492 			cmd.setResult(NOT_ATTEMPTED, null);
493 			if (!waiting.containsKey(cmd.getRefName())) {
494 				waiting.put(cmd.getRefName(), cmd);
495 			}
496 		}
497 		queued.add(0, new ReplicaPushRequest(this, cmds));
498 
499 		if (!waitingForRetry()) {
500 			long delay = KetchSystem.delay(
501 					lastRetryMillis,
502 					minRetryMillis, maxRetryMillis);
503 			if (log.isDebugEnabled()) {
504 				log.debug("Retrying {} after {} ms", //$NON-NLS-1$
505 						describeForLog(), Long.valueOf(delay));
506 			}
507 			lastRetryMillis = delay;
508 			retryAtMillis = SystemReader.getInstance().getCurrentTime() + delay;
509 			retryFuture = getSystem().getExecutor()
510 					.schedule(new WeakRetryPush(this), delay, MILLISECONDS);
511 		}
512 	}
513 
514 	/** Weakly holds a retrying replica, allowing it to garbage collect. */
515 	static class WeakRetryPush extends WeakReference<KetchReplica>
516 			implements Callable<Void> {
517 		WeakRetryPush(KetchReplica r) {
518 			super(r);
519 		}
520 
521 		@Override
522 		public Void call() throws Exception {
523 			KetchReplica r = get();
524 			if (r != null) {
525 				r.doRetryPush();
526 			}
527 			return null;
528 		}
529 	}
530 
531 	private void doRetryPush() {
532 		leader.lock.lock();
533 		try {
534 			retryFuture = null;
535 			runNextPushRequest();
536 		} finally {
537 			leader.lock.unlock();
538 		}
539 	}
540 
541 	/**
542 	 * Begin executing a single push.
543 	 * <p>
544 	 * This method must move processing onto another thread.
545 	 * Called with {@link KetchLeader#lock} held by caller.
546 	 *
547 	 * @param req
548 	 *            the request to send to the replica.
549 	 */
550 	protected abstract void startPush(ReplicaPushRequest req);
551 
552 	/**
553 	 * Callback from {@link ReplicaPushRequest} upon success or failure.
554 	 * <p>
555 	 * Acquires the {@link KetchLeader#lock} and updates the leader's internal
556 	 * knowledge about this replica to reflect what has been learned during a
557 	 * push to the replica. In some cases of divergence this method may take
558 	 * some time to determine how the replica has diverged; to reduce contention
559 	 * this is evaluated before acquiring the leader lock.
560 	 *
561 	 * @param repo
562 	 *            local repository instance used by the push thread.
563 	 * @param req
564 	 *            push request just attempted.
565 	 */
566 	void afterPush(@Nullable Repository repo, ReplicaPushRequest req) {
567 		ReceiveCommand acceptCmd = null;
568 		ReceiveCommand commitCmd = null;
569 		List<ReceiveCommand> stages = null;
570 
571 		for (ReceiveCommand cmd : req.getCommands()) {
572 			String name = cmd.getRefName();
573 			if (name.equals(getSystem().getTxnAccepted())) {
574 				acceptCmd = cmd;
575 			} else if (name.equals(getSystem().getTxnCommitted())) {
576 				commitCmd = cmd;
577 			} else if (cmd.getResult() == OK && cmd.getType() == CREATE
578 					&& name.startsWith(getSystem().getTxnStage())) {
579 				if (stages == null) {
580 					stages = new ArrayList<>();
581 				}
582 				stages.add(cmd);
583 			}
584 		}
585 
586 		State newState = null;
587 		ObjectId acceptId = readId(req, acceptCmd);
588 		if (repo != null && acceptCmd != null && acceptCmd.getResult() != OK
589 				&& req.getException() == null) {
590 			try (LagCheck lag = new LagCheck(this, repo)) {
591 				newState = lag.check(acceptId, acceptCmd);
592 				acceptId = lag.getRemoteId();
593 			}
594 		}
595 
596 		leader.lock.lock();
597 		try {
598 			for (ReceiveCommand cmd : req.getCommands()) {
599 				running.remove(cmd.getRefName());
600 			}
601 
602 			Throwable err = req.getException();
603 			if (err != null) {
604 				state = OFFLINE;
605 				error = err.toString();
606 				retryLater(req);
607 				leader.onReplicaUpdate(this);
608 				return;
609 			}
610 
611 			lastRetryMillis = 0;
612 			error = null;
613 			updateView(req, acceptId, commitCmd);
614 
615 			if (acceptCmd != null && acceptCmd.getResult() == OK) {
616 				state = hasAccepted(leader.getHead()) ? CURRENT : LAGGING;
617 				if (stages != null) {
618 					staged.put(acceptCmd.getNewId(), stages);
619 				}
620 			} else if (newState != null) {
621 				state = newState;
622 			}
623 
624 			leader.onReplicaUpdate(this);
625 			runNextPushRequest();
626 		} finally {
627 			leader.lock.unlock();
628 		}
629 	}
630 
631 	private void updateView(ReplicaPushRequest req, @Nullable ObjectId acceptId,
632 			ReceiveCommand commitCmd) {
633 		if (acceptId != null) {
634 			txnAccepted = acceptId;
635 		}
636 
637 		ObjectId committed = readId(req, commitCmd);
638 		if (committed != null) {
639 			txnCommitted = committed;
640 		} else if (acceptId != null && txnCommitted == null) {
641 			// Initialize during first conversation.
642 			Map<String, Ref> adv = req.getRefs();
643 			if (adv != null) {
644 				Ref refs = adv.get(getSystem().getTxnCommitted());
645 				txnCommitted = getId(refs);
646 			}
647 		}
648 	}
649 
650 	@Nullable
651 	private static ObjectId readId(ReplicaPushRequest req,
652 			@Nullable ReceiveCommand cmd) {
653 		if (cmd == null) {
654 			// Ref was not in the command list, do not trust advertisement.
655 			return null;
656 
657 		} else if (cmd.getResult() == OK) {
658 			// Currently at newId.
659 			return cmd.getNewId();
660 		}
661 
662 		Map<String, Ref> refs = req.getRefs();
663 		return refs != null ? getId(refs.get(cmd.getRefName())) : null;
664 	}
665 
666 	/**
667 	 * Fetch objects from the remote using the calling thread.
668 	 * <p>
669 	 * Called without {@link KetchLeader#lock}.
670 	 *
671 	 * @param repo
672 	 *            local repository to fetch objects into.
673 	 * @param req
674 	 *            the request to fetch from a replica.
675 	 * @throws IOException
676 	 *             communication with the replica was not possible.
677 	 */
678 	protected abstract void blockingFetch(Repository repo,
679 			ReplicaFetchRequest req) throws IOException;
680 
681 	/**
682 	 * Build a list of commands to commit {@link CommitMethod#ALL_REFS}.
683 	 *
684 	 * @param git
685 	 *            local leader repository to read committed state from.
686 	 * @param current
687 	 *            all known references in the replica's repository. Typically
688 	 *            this comes from a push advertisement.
689 	 * @param committed
690 	 *            state being pushed to {@code refs/txn/committed}.
691 	 * @return commands to update during commit.
692 	 * @throws IOException
693 	 *             cannot read the committed state.
694 	 */
695 	protected Collection<ReceiveCommand> prepareCommit(Repository git,
696 			Map<String, Ref> current, ObjectId committed) throws IOException {
697 		List<ReceiveCommand> delta = new ArrayList<>();
698 		Map<String, Ref> remote = new HashMap<>(current);
699 		try (RevWalk rw = new RevWalk(git);
700 				TreeWalk tw = new TreeWalk(rw.getObjectReader())) {
701 			tw.setRecursive(true);
702 			tw.addTree(rw.parseCommit(committed).getTree());
703 			while (tw.next()) {
704 				if (tw.getRawMode(0) != TYPE_GITLINK
705 						|| tw.isPathSuffix(PEEL, 2)) {
706 					// Symbolic references cannot be pushed.
707 					// Caching peeled values is handled remotely.
708 					continue;
709 				}
710 
711 				// TODO(sop) Do not send certain ref names to replica.
712 				String name = RefTree.refName(tw.getPathString());
713 				Ref oldRef = remote.remove(name);
714 				ObjectId oldId = getId(oldRef);
715 				ObjectId newId = tw.getObjectId(0);
716 				if (!AnyObjectId.equals(oldId, newId)) {
717 					delta.add(new ReceiveCommand(oldId, newId, name));
718 				}
719 			}
720 		}
721 
722 		// Delete any extra references not in the committed state.
723 		for (Ref ref : remote.values()) {
724 			if (canDelete(ref)) {
725 				delta.add(new ReceiveCommand(
726 					ref.getObjectId(), ObjectId.zeroId(),
727 					ref.getName()));
728 			}
729 		}
730 		return delta;
731 	}
732 
733 	boolean canDelete(Ref ref) {
734 		String name = ref.getName();
735 		if (HEAD.equals(name)) {
736 			return false;
737 		}
738 		if (name.startsWith(getSystem().getTxnNamespace())) {
739 			return false;
740 		}
741 		// TODO(sop) Do not delete precious names from replica.
742 		return true;
743 	}
744 
745 	@NonNull
746 	static ObjectId getId(@Nullable Ref ref) {
747 		if (ref != null) {
748 			ObjectId id = ref.getObjectId();
749 			if (id != null) {
750 				return id;
751 			}
752 		}
753 		return ObjectId.zeroId();
754 	}
755 }