View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.ketch;
45  
46  import static java.util.concurrent.TimeUnit.MILLISECONDS;
47  import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.BATCHED;
48  import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.FAST;
49  import static org.eclipse.jgit.internal.ketch.KetchReplica.State.CURRENT;
50  import static org.eclipse.jgit.internal.ketch.KetchReplica.State.LAGGING;
51  import static org.eclipse.jgit.internal.ketch.KetchReplica.State.OFFLINE;
52  import static org.eclipse.jgit.internal.ketch.KetchReplica.State.UNKNOWN;
53  import static org.eclipse.jgit.lib.Constants.HEAD;
54  import static org.eclipse.jgit.lib.FileMode.TYPE_GITLINK;
55  import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
56  import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
57  import static org.eclipse.jgit.transport.ReceiveCommand.Type.CREATE;
58  
59  import java.io.IOException;
60  import java.lang.ref.WeakReference;
61  import java.util.ArrayList;
62  import java.util.Collection;
63  import java.util.HashMap;
64  import java.util.Iterator;
65  import java.util.List;
66  import java.util.Map;
67  import java.util.concurrent.Callable;
68  import java.util.concurrent.Future;
69  
70  import org.eclipse.jgit.annotations.NonNull;
71  import org.eclipse.jgit.annotations.Nullable;
72  import org.eclipse.jgit.internal.storage.reftree.RefTree;
73  import org.eclipse.jgit.lib.AnyObjectId;
74  import org.eclipse.jgit.lib.ObjectId;
75  import org.eclipse.jgit.lib.Ref;
76  import org.eclipse.jgit.lib.Repository;
77  import org.eclipse.jgit.revwalk.RevWalk;
78  import org.eclipse.jgit.transport.ReceiveCommand;
79  import org.eclipse.jgit.treewalk.TreeWalk;
80  import org.eclipse.jgit.util.SystemReader;
81  import org.slf4j.Logger;
82  import org.slf4j.LoggerFactory;
83  
84  /**
85   * A Ketch replica, either {@link org.eclipse.jgit.internal.ketch.LocalReplica}
86   * or {@link org.eclipse.jgit.internal.ketch.RemoteGitReplica}.
87   * <p>
88   * Replicas can be either a stock Git replica, or a Ketch-aware replica.
89   * <p>
90   * A stock Git replica has no special knowledge of Ketch and simply stores
91   * objects and references. Ketch communicates with the stock Git replica using
92   * the Git push wire protocol. The
93   * {@link org.eclipse.jgit.internal.ketch.KetchLeader} commits an agreed upon
94   * state by pushing all references to the Git replica, for example
95   * {@code "refs/heads/master"} is pushed during commit. Stock Git replicas use
96   * {@link org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod#ALL_REFS} to
97   * record the final state.
98   * <p>
99   * Ketch-aware replicas understand the {@code RefTree} sent during the proposal
100  * and during commit are able to update their own reference space to match the
101  * state represented by the {@code RefTree}. Ketch-aware replicas typically use
102  * a {@link org.eclipse.jgit.internal.storage.reftree.RefTreeDatabase} and
103  * {@link org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod#TXN_COMMITTED}
104  * to record the final state.
105  * <p>
106  * KetchReplica instances are tightly coupled with a single
107  * {@link org.eclipse.jgit.internal.ketch.KetchLeader}. Some state may be
108  * accessed by the leader thread and uses the leader's own
109  * {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock} to protect shared
110  * data.
111  */
112 public abstract class KetchReplica {
113 	static final Logger log = LoggerFactory.getLogger(KetchReplica.class);
114 	private static final byte[] PEEL = { ' ', '^' };
115 
116 	/** Participation of a replica in establishing consensus. */
117 	public enum Participation {
118 		/** Replica can vote. */
119 		FULL,
120 
121 		/** Replica does not vote, but tracks leader. */
122 		FOLLOWER_ONLY;
123 	}
124 
125 	/** How this replica wants to receive Ketch commit operations. */
126 	public enum CommitMethod {
127 		/** All references are pushed to the peer as standard Git. */
128 		ALL_REFS,
129 
130 		/** Only {@code refs/txn/committed} is written/updated. */
131 		TXN_COMMITTED;
132 	}
133 
134 	/** Delay before committing to a replica. */
135 	public enum CommitSpeed {
136 		/**
137 		 * Send the commit immediately, even if it could be batched with the
138 		 * next proposal.
139 		 */
140 		FAST,
141 
142 		/**
143 		 * If the next proposal is available, batch the commit with it,
144 		 * otherwise just send the commit. This generates less network use, but
145 		 * may provide slower consistency on the replica.
146 		 */
147 		BATCHED;
148 	}
149 
150 	/** Current state of a replica. */
151 	public enum State {
152 		/** Leader has not yet contacted the replica. */
153 		UNKNOWN,
154 
155 		/** Replica is behind the consensus. */
156 		LAGGING,
157 
158 		/** Replica matches the consensus. */
159 		CURRENT,
160 
161 		/** Replica has a different (or unknown) history. */
162 		DIVERGENT,
163 
164 		/** Replica's history contains the leader's history. */
165 		AHEAD,
166 
167 		/** Replica can not be contacted. */
168 		OFFLINE;
169 	}
170 
171 	private final KetchLeader leader;
172 	private final String replicaName;
173 	private final Participation participation;
174 	private final CommitMethod commitMethod;
175 	private final CommitSpeed commitSpeed;
176 	private final long minRetryMillis;
177 	private final long maxRetryMillis;
178 	private final Map<ObjectId, List<ReceiveCommand>> staged;
179 	private final Map<String, ReceiveCommand> running;
180 	private final Map<String, ReceiveCommand> waiting;
181 	private final List<ReplicaPushRequest> queued;
182 
183 	/**
184 	 * Value known for {@code "refs/txn/accepted"}.
185 	 * <p>
186 	 * Raft literature refers to this as {@code matchIndex}.
187 	 */
188 	private ObjectId txnAccepted;
189 
190 	/**
191 	 * Value known for {@code "refs/txn/committed"}.
192 	 * <p>
193 	 * Raft literature refers to this as {@code commitIndex}. In traditional
194 	 * Raft this is a state variable inside the follower implementation, but
195 	 * Ketch keeps it in the leader.
196 	 */
197 	private ObjectId txnCommitted;
198 
199 	/** What is happening with this replica. */
200 	private State state = UNKNOWN;
201 	private String error;
202 
203 	/** Scheduled retry due to communication failure. */
204 	private Future<?> retryFuture;
205 	private long lastRetryMillis;
206 	private long retryAtMillis;
207 
208 	/**
209 	 * Configure a replica representation.
210 	 *
211 	 * @param leader
212 	 *            instance this replica follows.
213 	 * @param name
214 	 *            unique-ish name identifying this replica for debugging.
215 	 * @param cfg
216 	 *            how Ketch should treat the replica.
217 	 */
218 	protected KetchReplica(KetchLeader leader, String name, ReplicaConfig cfg) {
219 		this.leader = leader;
220 		this.replicaName = name;
221 		this.participation = cfg.getParticipation();
222 		this.commitMethod = cfg.getCommitMethod();
223 		this.commitSpeed = cfg.getCommitSpeed();
224 		this.minRetryMillis = cfg.getMinRetry(MILLISECONDS);
225 		this.maxRetryMillis = cfg.getMaxRetry(MILLISECONDS);
226 		this.staged = new HashMap<>();
227 		this.running = new HashMap<>();
228 		this.waiting = new HashMap<>();
229 		this.queued = new ArrayList<>(4);
230 	}
231 
232 	/**
233 	 * Get system configuration.
234 	 *
235 	 * @return system configuration.
236 	 */
237 	public KetchSystem getSystem() {
238 		return getLeader().getSystem();
239 	}
240 
241 	/**
242 	 * Get leader instance this replica follows.
243 	 *
244 	 * @return leader instance this replica follows.
245 	 */
246 	public KetchLeader getLeader() {
247 		return leader;
248 	}
249 
250 	/**
251 	 * Get unique-ish name for debugging.
252 	 *
253 	 * @return unique-ish name for debugging.
254 	 */
255 	public String getName() {
256 		return replicaName;
257 	}
258 
259 	/**
260 	 * Get description of this replica for error/debug logging purposes.
261 	 *
262 	 * @return description of this replica for error/debug logging purposes.
263 	 */
264 	protected String describeForLog() {
265 		return getName();
266 	}
267 
268 	/**
269 	 * Get how the replica participates in this Ketch system.
270 	 *
271 	 * @return how the replica participates in this Ketch system.
272 	 */
273 	public Participation getParticipation() {
274 		return participation;
275 	}
276 
277 	/**
278 	 * Get how Ketch will commit to the repository.
279 	 *
280 	 * @return how Ketch will commit to the repository.
281 	 */
282 	public CommitMethod getCommitMethod() {
283 		return commitMethod;
284 	}
285 
286 	/**
287 	 * Get when Ketch will commit to the repository.
288 	 *
289 	 * @return when Ketch will commit to the repository.
290 	 */
291 	public CommitSpeed getCommitSpeed() {
292 		return commitSpeed;
293 	}
294 
295 	/**
296 	 * Called by leader to perform graceful shutdown.
297 	 * <p>
298 	 * Default implementation cancels any scheduled retry. Subclasses may add
299 	 * additional logic before or after calling {@code super.shutdown()}.
300 	 * <p>
301 	 * Called with {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock} held
302 	 * by caller.
303 	 */
304 	protected void shutdown() {
305 		Future<?> f = retryFuture;
306 		if (f != null) {
307 			retryFuture = null;
308 			f.cancel(true);
309 		}
310 	}
311 
312 	ReplicaSnapshot snapshot() {
313 		ReplicaSnapshot s = new ReplicaSnapshot(this);
314 		s.accepted = txnAccepted;
315 		s.committed = txnCommitted;
316 		s.state = state;
317 		s.error = error;
318 		s.retryAtMillis = waitingForRetry() ? retryAtMillis : 0;
319 		return s;
320 	}
321 
322 	/**
323 	 * Update the leader's view of the replica after a poll.
324 	 * <p>
325 	 * Called with {@link KetchLeader#lock} held by caller.
326 	 *
327 	 * @param refs
328 	 *            map of refs from the replica.
329 	 */
330 	void initialize(Map<String, Ref> refs) {
331 		if (txnAccepted == null) {
332 			txnAccepted = getId(refs.get(getSystem().getTxnAccepted()));
333 		}
334 		if (txnCommitted == null) {
335 			txnCommitted = getId(refs.get(getSystem().getTxnCommitted()));
336 		}
337 	}
338 
339 	ObjectId getTxnAccepted() {
340 		return txnAccepted;
341 	}
342 
343 	boolean hasAccepted(LogIndex id) {
344 		return equals(txnAccepted, id);
345 	}
346 
347 	private static boolean equals(@Nullable ObjectId a, LogIndex b) {
348 		return a != null && b != null && AnyObjectId.isEqual(a, b);
349 	}
350 
351 	/**
352 	 * Schedule a proposal round with the replica.
353 	 * <p>
354 	 * Called with {@link KetchLeader#lock} held by caller.
355 	 *
356 	 * @param round
357 	 *            current round being run by the leader.
358 	 */
359 	void pushTxnAcceptedAsync(Round round) {
360 		List<ReceiveCommand> cmds = new ArrayList<>();
361 		if (commitSpeed == BATCHED) {
362 			LogIndex committedIndex = leader.getCommitted();
363 			if (equals(txnAccepted, committedIndex)
364 					&& !equals(txnCommitted, committedIndex)) {
365 				prepareTxnCommitted(cmds, committedIndex);
366 			}
367 		}
368 
369 		// TODO(sop) Lagging replicas should build accept on the fly.
370 		if (round.stageCommands != null) {
371 			for (ReceiveCommand cmd : round.stageCommands) {
372 				// TODO(sop): Do not send certain object graphs to replica.
373 				cmds.add(copy(cmd));
374 			}
375 		}
376 		cmds.add(new ReceiveCommand(
377 				round.acceptedOldIndex, round.acceptedNewIndex,
378 				getSystem().getTxnAccepted()));
379 		pushAsync(new ReplicaPushRequest(this, cmds));
380 	}
381 
382 	private static ReceiveCommand../../../../org/eclipse/jgit/transport/ReceiveCommand.html#ReceiveCommand">ReceiveCommand copy(ReceiveCommand c) {
383 		return new ReceiveCommand(c.getOldId(), c.getNewId(), c.getRefName());
384 	}
385 
386 	boolean shouldPushUnbatchedCommit(LogIndex committed, boolean leaderIdle) {
387 		return (leaderIdle || commitSpeed == FAST) && hasAccepted(committed);
388 	}
389 
390 	void pushCommitAsync(LogIndex committed) {
391 		List<ReceiveCommand> cmds = new ArrayList<>();
392 		prepareTxnCommitted(cmds, committed);
393 		pushAsync(new ReplicaPushRequest(this, cmds));
394 	}
395 
396 	private void prepareTxnCommitted(List<ReceiveCommand> cmds,
397 			ObjectId committed) {
398 		removeStaged(cmds, committed);
399 		cmds.add(new ReceiveCommand(
400 				txnCommitted, committed,
401 				getSystem().getTxnCommitted()));
402 	}
403 
404 	private void removeStaged(List<ReceiveCommand> cmds, ObjectId committed) {
405 		List<ReceiveCommand> a = staged.remove(committed);
406 		if (a != null) {
407 			delete(cmds, a);
408 		}
409 		if (staged.isEmpty() || !(committed instanceof LogIndex)) {
410 			return;
411 		}
412 
413 		LogIndex committedIndex = (LogIndex) committed;
414 		Iterator<Map.Entry<ObjectId, List<ReceiveCommand>>> itr = staged
415 				.entrySet().iterator();
416 		while (itr.hasNext()) {
417 			Map.Entry<ObjectId, List<ReceiveCommand>> e = itr.next();
418 			if (e.getKey() instanceof LogIndex) {
419 				LogIndex stagedIndex = (LogIndex) e.getKey();
420 				if (stagedIndex.isBefore(committedIndex)) {
421 					delete(cmds, e.getValue());
422 					itr.remove();
423 				}
424 			}
425 		}
426 	}
427 
428 	private static void delete(List<ReceiveCommand> cmds,
429 			List<ReceiveCommand> createCmds) {
430 		for (ReceiveCommand cmd : createCmds) {
431 			ObjectId id = cmd.getNewId();
432 			String name = cmd.getRefName();
433 			cmds.add(new ReceiveCommand(id, ObjectId.zeroId(), name));
434 		}
435 	}
436 
437 	/**
438 	 * Determine the next push for this replica (if any) and start it.
439 	 * <p>
440 	 * If the replica has successfully accepted the committed state of the
441 	 * leader, this method will push all references to the replica using the
442 	 * configured {@link CommitMethod}.
443 	 * <p>
444 	 * If the replica is {@link State#LAGGING} this method will begin catch up
445 	 * by sending a more recent {@code refs/txn/accepted}.
446 	 * <p>
447 	 * Must be invoked with {@link KetchLeader#lock} held by caller.
448 	 */
449 	private void runNextPushRequest() {
450 		LogIndex committed = leader.getCommitted();
451 		if (!equals(txnCommitted, committed)
452 				&& shouldPushUnbatchedCommit(committed, leader.isIdle())) {
453 			pushCommitAsync(committed);
454 		}
455 
456 		if (queued.isEmpty() || !running.isEmpty() || waitingForRetry()) {
457 			return;
458 		}
459 
460 		// Collapse all queued requests into a single request.
461 		Map<String, ReceiveCommand> cmdMap = new HashMap<>();
462 		for (ReplicaPushRequest req : queued) {
463 			for (ReceiveCommand cmd : req.getCommands()) {
464 				String name = cmd.getRefName();
465 				ReceiveCommand old = cmdMap.remove(name);
466 				if (old != null) {
467 					cmd = new ReceiveCommand(
468 							old.getOldId(), cmd.getNewId(),
469 							name);
470 				}
471 				cmdMap.put(name, cmd);
472 			}
473 		}
474 		queued.clear();
475 		waiting.clear();
476 
477 		List<ReceiveCommand> next = new ArrayList<>(cmdMap.values());
478 		for (ReceiveCommand cmd : next) {
479 			running.put(cmd.getRefName(), cmd);
480 		}
481 		startPush(new ReplicaPushRequest(this, next));
482 	}
483 
484 	private void pushAsync(ReplicaPushRequest req) {
485 		if (defer(req)) {
486 			// TODO(sop) Collapse during long retry outage.
487 			for (ReceiveCommand cmd : req.getCommands()) {
488 				waiting.put(cmd.getRefName(), cmd);
489 			}
490 			queued.add(req);
491 		} else {
492 			for (ReceiveCommand cmd : req.getCommands()) {
493 				running.put(cmd.getRefName(), cmd);
494 			}
495 			startPush(req);
496 		}
497 	}
498 
499 	private boolean defer(ReplicaPushRequest req) {
500 		if (waitingForRetry()) {
501 			// Prior communication failure; everything is deferred.
502 			return true;
503 		}
504 
505 		for (ReceiveCommand nextCmd : req.getCommands()) {
506 			ReceiveCommand priorCmd = waiting.get(nextCmd.getRefName());
507 			if (priorCmd == null) {
508 				priorCmd = running.get(nextCmd.getRefName());
509 			}
510 			if (priorCmd != null) {
511 				// Another request pending on same ref; that must go first.
512 				// Verify priorCmd.newId == nextCmd.oldId?
513 				return true;
514 			}
515 		}
516 		return false;
517 	}
518 
519 	private boolean waitingForRetry() {
520 		Future<?> f = retryFuture;
521 		return f != null && !f.isDone();
522 	}
523 
524 	private void retryLater(ReplicaPushRequest req) {
525 		Collection<ReceiveCommand> cmds = req.getCommands();
526 		for (ReceiveCommand cmd : cmds) {
527 			cmd.setResult(NOT_ATTEMPTED, null);
528 			if (!waiting.containsKey(cmd.getRefName())) {
529 				waiting.put(cmd.getRefName(), cmd);
530 			}
531 		}
532 		queued.add(0, new ReplicaPushRequest(this, cmds));
533 
534 		if (!waitingForRetry()) {
535 			long delay = KetchSystem.delay(
536 					lastRetryMillis,
537 					minRetryMillis, maxRetryMillis);
538 			if (log.isDebugEnabled()) {
539 				log.debug("Retrying {} after {} ms", //$NON-NLS-1$
540 						describeForLog(), Long.valueOf(delay));
541 			}
542 			lastRetryMillis = delay;
543 			retryAtMillis = SystemReader.getInstance().getCurrentTime() + delay;
544 			retryFuture = getSystem().getExecutor()
545 					.schedule(new WeakRetryPush(this), delay, MILLISECONDS);
546 		}
547 	}
548 
549 	/** Weakly holds a retrying replica, allowing it to garbage collect. */
550 	static class WeakRetryPush extends WeakReference<KetchReplica>
551 			implements Callable<Void> {
552 		WeakRetryPush(KetchReplica r) {
553 			super(r);
554 		}
555 
556 		@Override
557 		public Void call() throws Exception {
558 			KetchReplica r = get();
559 			if (r != null) {
560 				r.doRetryPush();
561 			}
562 			return null;
563 		}
564 	}
565 
566 	private void doRetryPush() {
567 		leader.lock.lock();
568 		try {
569 			retryFuture = null;
570 			runNextPushRequest();
571 		} finally {
572 			leader.lock.unlock();
573 		}
574 	}
575 
576 	/**
577 	 * Begin executing a single push.
578 	 * <p>
579 	 * This method must move processing onto another thread. Called with
580 	 * {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock} held by caller.
581 	 *
582 	 * @param req
583 	 *            the request to send to the replica.
584 	 */
585 	protected abstract void startPush(ReplicaPushRequest req);
586 
587 	/**
588 	 * Callback from {@link ReplicaPushRequest} upon success or failure.
589 	 * <p>
590 	 * Acquires the {@link KetchLeader#lock} and updates the leader's internal
591 	 * knowledge about this replica to reflect what has been learned during a
592 	 * push to the replica. In some cases of divergence this method may take
593 	 * some time to determine how the replica has diverged; to reduce contention
594 	 * this is evaluated before acquiring the leader lock.
595 	 *
596 	 * @param repo
597 	 *            local repository instance used by the push thread.
598 	 * @param req
599 	 *            push request just attempted.
600 	 */
601 	void afterPush(@Nullable Repository repo, ReplicaPushRequest req) {
602 		ReceiveCommand acceptCmd = null;
603 		ReceiveCommand commitCmd = null;
604 		List<ReceiveCommand> stages = null;
605 
606 		for (ReceiveCommand cmd : req.getCommands()) {
607 			String name = cmd.getRefName();
608 			if (name.equals(getSystem().getTxnAccepted())) {
609 				acceptCmd = cmd;
610 			} else if (name.equals(getSystem().getTxnCommitted())) {
611 				commitCmd = cmd;
612 			} else if (cmd.getResult() == OK && cmd.getType() == CREATE
613 					&& name.startsWith(getSystem().getTxnStage())) {
614 				if (stages == null) {
615 					stages = new ArrayList<>();
616 				}
617 				stages.add(cmd);
618 			}
619 		}
620 
621 		State newState = null;
622 		ObjectId acceptId = readId(req, acceptCmd);
623 		if (repo != null && acceptCmd != null && acceptCmd.getResult() != OK
624 				&& req.getException() == null) {
625 			try (LagCheckl/ketch/LagCheck.html#LagCheck">LagCheck lag = new LagCheck(this, repo)) {
626 				newState = lag.check(acceptId, acceptCmd);
627 				acceptId = lag.getRemoteId();
628 			}
629 		}
630 
631 		leader.lock.lock();
632 		try {
633 			for (ReceiveCommand cmd : req.getCommands()) {
634 				running.remove(cmd.getRefName());
635 			}
636 
637 			Throwable err = req.getException();
638 			if (err != null) {
639 				state = OFFLINE;
640 				error = err.toString();
641 				retryLater(req);
642 				leader.onReplicaUpdate(this);
643 				return;
644 			}
645 
646 			lastRetryMillis = 0;
647 			error = null;
648 			updateView(req, acceptId, commitCmd);
649 
650 			if (acceptCmd != null && acceptCmd.getResult() == OK) {
651 				state = hasAccepted(leader.getHead()) ? CURRENT : LAGGING;
652 				if (stages != null) {
653 					staged.put(acceptCmd.getNewId(), stages);
654 				}
655 			} else if (newState != null) {
656 				state = newState;
657 			}
658 
659 			leader.onReplicaUpdate(this);
660 			runNextPushRequest();
661 		} finally {
662 			leader.lock.unlock();
663 		}
664 	}
665 
666 	private void updateView(ReplicaPushRequest req, @Nullable ObjectId acceptId,
667 			ReceiveCommand commitCmd) {
668 		if (acceptId != null) {
669 			txnAccepted = acceptId;
670 		}
671 
672 		ObjectId committed = readId(req, commitCmd);
673 		if (committed != null) {
674 			txnCommitted = committed;
675 		} else if (acceptId != null && txnCommitted == null) {
676 			// Initialize during first conversation.
677 			Map<String, Ref> adv = req.getRefs();
678 			if (adv != null) {
679 				Ref refs = adv.get(getSystem().getTxnCommitted());
680 				txnCommitted = getId(refs);
681 			}
682 		}
683 	}
684 
685 	@Nullable
686 	private static ObjectId readId(ReplicaPushRequest req,
687 			@Nullable ReceiveCommand cmd) {
688 		if (cmd == null) {
689 			// Ref was not in the command list, do not trust advertisement.
690 			return null;
691 
692 		} else if (cmd.getResult() == OK) {
693 			// Currently at newId.
694 			return cmd.getNewId();
695 		}
696 
697 		Map<String, Ref> refs = req.getRefs();
698 		return refs != null ? getId(refs.get(cmd.getRefName())) : null;
699 	}
700 
701 	/**
702 	 * Fetch objects from the remote using the calling thread.
703 	 * <p>
704 	 * Called without {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock}.
705 	 *
706 	 * @param repo
707 	 *            local repository to fetch objects into.
708 	 * @param req
709 	 *            the request to fetch from a replica.
710 	 * @throws java.io.IOException
711 	 *             communication with the replica was not possible.
712 	 */
713 	protected abstract void blockingFetch(Repository repo,
714 			ReplicaFetchRequest req) throws IOException;
715 
716 	/**
717 	 * Build a list of commands to commit
718 	 * {@link org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod#ALL_REFS}.
719 	 *
720 	 * @param git
721 	 *            local leader repository to read committed state from.
722 	 * @param current
723 	 *            all known references in the replica's repository. Typically
724 	 *            this comes from a push advertisement.
725 	 * @param committed
726 	 *            state being pushed to {@code refs/txn/committed}.
727 	 * @return commands to update during commit.
728 	 * @throws java.io.IOException
729 	 *             cannot read the committed state.
730 	 */
731 	protected Collection<ReceiveCommand> prepareCommit(Repository git,
732 			Map<String, Ref> current, ObjectId committed) throws IOException {
733 		List<ReceiveCommand> delta = new ArrayList<>();
734 		Map<String, Ref> remote = new HashMap<>(current);
735 		try (RevWalklk/RevWalk.html#RevWalk">RevWalk rw = new RevWalk(git);
736 				TreeWalk tw = new TreeWalk(rw.getObjectReader())) {
737 			tw.setRecursive(true);
738 			tw.addTree(rw.parseCommit(committed).getTree());
739 			while (tw.next()) {
740 				if (tw.getRawMode(0) != TYPE_GITLINK
741 						|| tw.isPathSuffix(PEEL, 2)) {
742 					// Symbolic references cannot be pushed.
743 					// Caching peeled values is handled remotely.
744 					continue;
745 				}
746 
747 				// TODO(sop) Do not send certain ref names to replica.
748 				String name = RefTree.refName(tw.getPathString());
749 				Ref oldRef = remote.remove(name);
750 				ObjectId oldId = getId(oldRef);
751 				ObjectId newId = tw.getObjectId(0);
752 				if (!AnyObjectId.isEqual(oldId, newId)) {
753 					delta.add(new ReceiveCommand(oldId, newId, name));
754 				}
755 			}
756 		}
757 
758 		// Delete any extra references not in the committed state.
759 		for (Ref ref : remote.values()) {
760 			if (canDelete(ref)) {
761 				delta.add(new ReceiveCommand(
762 					ref.getObjectId(), ObjectId.zeroId(),
763 					ref.getName()));
764 			}
765 		}
766 		return delta;
767 	}
768 
769 	boolean canDelete(Ref ref) {
770 		String name = ref.getName();
771 		if (HEAD.equals(name)) {
772 			return false;
773 		}
774 		if (name.startsWith(getSystem().getTxnNamespace())) {
775 			return false;
776 		}
777 		// TODO(sop) Do not delete precious names from replica.
778 		return true;
779 	}
780 
781 	@NonNull
782 	static ObjectId getId(@Nullable Ref ref) {
783 		if (ref != null) {
784 			ObjectId id = ref.getObjectId();
785 			if (id != null) {
786 				return id;
787 			}
788 		}
789 		return ObjectId.zeroId();
790 	}
791 }