1 /*
2 * Copyright (C) 2016, Google Inc. and others
3 *
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Distribution License v. 1.0 which is available at
6 * https://www.eclipse.org/org/documents/edl-v10.php.
7 *
8 * SPDX-License-Identifier: BSD-3-Clause
9 */
10
11 package org.eclipse.jgit.internal.ketch;
12
13 import static org.eclipse.jgit.internal.ketch.KetchLeader.State.CANDIDATE;
14 import static org.eclipse.jgit.internal.ketch.KetchLeader.State.LEADER;
15 import static org.eclipse.jgit.internal.ketch.KetchLeader.State.SHUTDOWN;
16 import static org.eclipse.jgit.internal.ketch.KetchReplica.Participation.FOLLOWER_ONLY;
17 import static org.eclipse.jgit.internal.ketch.Proposal.State.QUEUED;
18
19 import java.io.IOException;
20 import java.text.MessageFormat;
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.concurrent.locks.Lock;
26 import java.util.concurrent.locks.ReentrantLock;
27
28 import org.eclipse.jgit.internal.storage.reftree.RefTree;
29 import org.eclipse.jgit.lib.ObjectId;
30 import org.eclipse.jgit.lib.Repository;
31 import org.eclipse.jgit.revwalk.RevCommit;
32 import org.eclipse.jgit.revwalk.RevWalk;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37 * A leader managing consensus across remote followers.
38 * <p>
39 * A leader instance starts up in
40 * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#CANDIDATE} and tries
41 * to begin a new term by sending an
42 * {@link org.eclipse.jgit.internal.ketch.ElectionRound} to all replicas. Its
43 * term starts if a majority of replicas have accepted this leader instance for
44 * the term.
45 * <p>
46 * Once elected by a majority the instance enters
47 * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#LEADER} and runs
48 * proposals offered to {@link #queueProposal(Proposal)}. This continues until
49 * the leader is timed out for inactivity, or is deposed by a competing leader
50 * gaining its own majority.
51 * <p>
52 * Once timed out or deposed this {@code KetchLeader} instance should be
53 * discarded, and a new instance takes over.
54 * <p>
55 * Each leader instance coordinates a group of
56 * {@link org.eclipse.jgit.internal.ketch.KetchReplica}s. Replica instances are
57 * owned by the leader instance and must be discarded when the leader is
58 * discarded.
59 * <p>
60 * In Ketch all push requests are issued through the leader. The steps are as
61 * follows (see {@link org.eclipse.jgit.internal.ketch.KetchPreReceive} for an
62 * example):
63 * <ul>
64 * <li>Create a {@link org.eclipse.jgit.internal.ketch.Proposal} with the
65 * {@link org.eclipse.jgit.transport.ReceiveCommand}s that represent the push.
66 * <li>Invoke {@link #queueProposal(Proposal)} on the leader instance.
67 * <li>Wait for consensus with
68 * {@link org.eclipse.jgit.internal.ketch.Proposal#await()}.
69 * <li>To examine the status of the push, check
70 * {@link org.eclipse.jgit.internal.ketch.Proposal#getCommands()}, looking at
71 * {@link org.eclipse.jgit.internal.storage.reftree.Command#getResult()}.
72 * </ul>
73 * <p>
74 * The leader gains consensus by first pushing the needed objects and a
75 * {@link org.eclipse.jgit.internal.storage.reftree.RefTree} representing the
76 * desired target repository state to the {@code refs/txn/accepted} branch on
77 * each of the replicas. Once a majority has succeeded, the leader commits the
78 * state by either pushing the {@code refs/txn/accepted} value to
79 * {@code refs/txn/committed} (for Ketch-aware replicas) or by pushing updates
80 * to {@code refs/heads/master}, etc. for stock Git replicas.
81 * <p>
82 * Internally, the actual transport to replicas is performed on background
83 * threads via the {@link org.eclipse.jgit.internal.ketch.KetchSystem}'s
84 * executor service. For performance, the
85 * {@link org.eclipse.jgit.internal.ketch.KetchLeader},
86 * {@link org.eclipse.jgit.internal.ketch.KetchReplica} and
87 * {@link org.eclipse.jgit.internal.ketch.Proposal} objects share some state,
88 * and may invoke each other's methods on different threads. This access is
89 * protected by the leader's {@link #lock} object. Care must be taken to prevent
90 * concurrent access by correctly obtaining the leader's lock.
91 */
92 public abstract class KetchLeader {
93 private static final Logger log = LoggerFactory.getLogger(KetchLeader.class);
94
95 /** Current state of the leader instance. */
96 public enum State {
97 /** Newly created instance trying to elect itself leader. */
98 CANDIDATE,
99
100 /** Leader instance elected by a majority. */
101 LEADER,
102
103 /** Instance has been deposed by another with a more recent term. */
104 DEPOSED,
105
106 /** Leader has been gracefully shutdown, e.g. due to inactivity. */
107 SHUTDOWN;
108 }
109
110 private final KetchSystem system;
111
112 /** Leader's knowledge of replicas for this repository. */
113 private KetchReplica[] voters;
114 private KetchReplica[] followers;
115 private LocalReplica self;
116
117 /**
118 * Lock protecting all data within this leader instance.
119 * <p>
120 * This lock extends into the {@link KetchReplica} instances used by the
121 * leader. They share the same lock instance to simplify concurrency.
122 */
123 final Lock lock;
124
125 private State state = CANDIDATE;
126
127 /** Term of this leader, once elected. */
128 private long term;
129
130 /**
131 * Pending proposals accepted into the queue in FIFO order.
132 * <p>
133 * These proposals were preflighted and do not contain any conflicts with
134 * each other and their expectations matched the leader's local view of the
135 * agreed upon {@code refs/txn/accepted} tree.
136 */
137 private final List<Proposal> queued;
138
139 /**
140 * State of the repository's RefTree after applying all entries in
141 * {@link #queued}. New proposals must be consistent with this tree to be
142 * appended to the end of {@link #queued}.
143 * <p>
144 * Must be deep-copied with {@link RefTree#copy()} if
145 * {@link #roundHoldsReferenceToRefTree} is {@code true}.
146 */
147 private RefTree refTree;
148
149 /**
150 * If {@code true} {@link #refTree} must be duplicated before queuing the
151 * next proposal. The {@link #refTree} was passed into the constructor of a
152 * {@link ProposalRound}, and that external reference to the {@link RefTree}
153 * object is held by the proposal until it materializes the tree object in
154 * the object store. This field is set {@code true} when the proposal begins
155 * execution and set {@code false} once tree objects are persisted in the
156 * local repository's object store or {@link #refTree} is replaced with a
157 * copy to isolate it from any running rounds.
158 * <p>
159 * If proposals arrive less frequently than the {@code RefTree} is written
160 * out to the repository the {@link #roundHoldsReferenceToRefTree} behavior
161 * avoids duplicating {@link #refTree}, reducing both time and memory used.
162 * However if proposals arrive more frequently {@link #refTree} must be
163 * duplicated to prevent newly queued proposals from corrupting the
164 * {@link #runningRound}.
165 */
166 volatile boolean roundHoldsReferenceToRefTree;
167
168 /** End of the leader's log. */
169 private LogIndex headIndex;
170
171 /** Leader knows this (and all prior) states are committed. */
172 private LogIndex committedIndex;
173
174 /**
175 * Is the leader idle with no work pending? If {@code true} there is no work
176 * for the leader (normal state). This field is {@code false} when the
177 * leader thread is scheduled for execution, or while {@link #runningRound}
178 * defines a round in progress.
179 */
180 private boolean idle;
181
182 /** Current round the leader is preparing and waiting for a vote on. */
183 private Round runningRound;
184
185 /**
186 * Construct a leader for a Ketch instance.
187 *
188 * @param system
189 * Ketch system configuration the leader must adhere to.
190 */
191 protected KetchLeader(KetchSystem system) {
192 this.system = system;
193 this.lock = new ReentrantLock(true /* fair */);
194 this.queued = new ArrayList<>(4);
195 this.idle = true;
196 }
197
198 /** @return system configuration. */
199 KetchSystem getSystem() {
200 return system;
201 }
202
203 /**
204 * Configure the replicas used by this Ketch instance.
205 * <p>
206 * Replicas should be configured once at creation before any proposals are
207 * executed. Once elections happen, <b>reconfiguration is a complicated
208 * concept that is not currently supported</b>.
209 *
210 * @param replicas
211 * members participating with the same repository.
212 */
213 public void setReplicas(Collection<KetchReplica> replicas) {
214 List<KetchReplica> v = new ArrayList<>(5);
215 List<KetchReplica> f = new ArrayList<>(5);
216 for (KetchReplica r : replicas) {
217 switch (r.getParticipation()) {
218 case FULL:
219 v.add(r);
220 break;
221
222 case FOLLOWER_ONLY:
223 f.add(r);
224 break;
225 }
226 }
227
228 Collection<Integer> validVoters = validVoterCounts();
229 if (!validVoters.contains(Integer.valueOf(v.size()))) {
230 throw new IllegalArgumentException(MessageFormat.format(
231 KetchText.get().unsupportedVoterCount,
232 Integer.valueOf(v.size()),
233 validVoters));
234 }
235
236 LocalReplica me = findLocal(v);
237 if (me == null) {
238 throw new IllegalArgumentException(
239 KetchText.get().localReplicaRequired);
240 }
241
242 lock.lock();
243 try {
244 voters = v.toArray(new KetchReplica[0]);
245 followers = f.toArray(new KetchReplica[0]);
246 self = me;
247 } finally {
248 lock.unlock();
249 }
250 }
251
252 private static Collection<Integer> validVoterCounts() {
253 @SuppressWarnings("boxing")
254 Integer[] valid = {
255 // An odd number of voting replicas is required.
256 1, 3, 5, 7, 9 };
257 return Arrays.asList(valid);
258 }
259
260 private static LocalReplica findLocal(Collection<KetchReplica> voters) {
261 for (KetchReplica r : voters) {
262 if (r instanceof LocalReplica) {
263 return (LocalReplica) r;
264 }
265 }
266 return null;
267 }
268
269 /**
270 * Get an instance of the repository for use by a leader thread.
271 * <p>
272 * The caller will close the repository.
273 *
274 * @return opened repository for use by the leader thread.
275 * @throws java.io.IOException
276 * cannot reopen the repository for the leader.
277 */
278 protected abstract Repository openRepository() throws IOException;
279
280 /**
281 * Queue a reference update proposal for consensus.
282 * <p>
283 * This method does not wait for consensus to be reached. The proposal is
284 * checked to look for risks of conflicts, and then submitted into the queue
285 * for distribution as soon as possible.
286 * <p>
287 * Callers must use {@link org.eclipse.jgit.internal.ketch.Proposal#await()}
288 * to see if the proposal is done.
289 *
290 * @param proposal
291 * the proposed reference updates to queue for consideration.
292 * Once execution is complete the individual reference result
293 * fields will be populated with the outcome.
294 * @throws java.lang.InterruptedException
295 * current thread was interrupted. The proposal may have been
296 * aborted if it was not yet queued for execution.
297 * @throws java.io.IOException
298 * unrecoverable error preventing proposals from being attempted
299 * by this leader.
300 */
301 public void queueProposal(Proposal proposal)
302 throws InterruptedException, IOException {
303 try {
304 lock.lockInterruptibly();
305 } catch (InterruptedException e) {
306 proposal.abort();
307 throw e;
308 }
309 try {
310 if (refTree == null) {
311 initialize();
312 for (Proposal p : queued) {
313 refTree.apply(p.getCommands());
314 }
315 } else if (roundHoldsReferenceToRefTree) {
316 refTree = refTree.copy();
317 roundHoldsReferenceToRefTree = false;
318 }
319
320 if (!refTree.apply(proposal.getCommands())) {
321 // A conflict exists so abort the proposal.
322 proposal.abort();
323 return;
324 }
325
326 queued.add(proposal);
327 proposal.notifyState(QUEUED);
328
329 if (idle) {
330 scheduleLeader();
331 }
332 } finally {
333 lock.unlock();
334 }
335 }
336
337 private void initialize() throws IOException {
338 try (Repository git = openRepository(); RevWalklk/RevWalk.html#RevWalk">RevWalk rw = new RevWalk(git)) {
339 self.initialize(git);
340
341 ObjectId accepted = self.getTxnAccepted();
342 if (!ObjectId.zeroId().equals(accepted)) {
343 RevCommit c = rw.parseCommit(accepted);
344 headIndex = LogIndex.unknown(accepted);
345 refTree = RefTree.read(rw.getObjectReader(), c.getTree());
346 } else {
347 headIndex = LogIndex.unknown(ObjectId.zeroId());
348 refTree = RefTree.newEmptyTree();
349 }
350 }
351 }
352
353 private void scheduleLeader() {
354 idle = false;
355 system.getExecutor().execute(this::runLeader);
356 }
357
358 private void runLeader() {
359 Round round;
360 lock.lock();
361 try {
362 switch (state) {
363 case CANDIDATE:
364 round = new ElectionRound(this, headIndex);
365 break;
366
367 case LEADER:
368 round = newProposalRound();
369 break;
370
371 case DEPOSED:
372 case SHUTDOWN:
373 default:
374 log.warn("Leader cannot run {}", state); //$NON-NLS-1$
375 // TODO(sop): Redirect proposals.
376 return;
377 }
378 } finally {
379 lock.unlock();
380 }
381
382 try {
383 round.start();
384 } catch (IOException e) {
385 // TODO(sop) Depose leader if it cannot use its repository.
386 log.error(KetchText.get().leaderFailedToStore, e);
387 lock.lock();
388 try {
389 nextRound();
390 } finally {
391 lock.unlock();
392 }
393 }
394 }
395
396 private ProposalRound newProposalRound() {
397 List<Proposal> todo = new ArrayList<>(queued);
398 queued.clear();
399 roundHoldsReferenceToRefTree = true;
400 return new ProposalRound(this, headIndex, todo, refTree);
401 }
402
403 /** @return term of this leader's reign. */
404 long getTerm() {
405 return term;
406 }
407
408 /** @return end of the leader's log. */
409 LogIndex getHead() {
410 return headIndex;
411 }
412
413 /**
414 * @return state leader knows it has committed across a quorum of replicas.
415 */
416 LogIndex getCommitted() {
417 return committedIndex;
418 }
419
420 boolean isIdle() {
421 return idle;
422 }
423
424 void runAsync(Round round) {
425 lock.lock();
426 try {
427 // End of the log is this round. Once transport begins it is
428 // reasonable to assume at least one replica will eventually get
429 // this, and there is reasonable probability it commits.
430 headIndex = round.acceptedNewIndex;
431 runningRound = round;
432
433 for (KetchReplica replica : voters) {
434 replica.pushTxnAcceptedAsync(round);
435 }
436 for (KetchReplica replica : followers) {
437 replica.pushTxnAcceptedAsync(round);
438 }
439 } finally {
440 lock.unlock();
441 }
442 }
443
444 /**
445 * Asynchronous signal from a replica after completion.
446 * <p>
447 * Must be called while {@link #lock} is held by the replica.
448 *
449 * @param replica
450 * replica posting a completion event.
451 */
452 void onReplicaUpdate(KetchReplica replica) {
453 if (log.isDebugEnabled()) {
454 log.debug("Replica {} finished:\n{}", //$NON-NLS-1$
455 replica.describeForLog(), snapshot());
456 }
457
458 if (replica.getParticipation() == FOLLOWER_ONLY) {
459 // Followers cannot vote, so votes haven't changed.
460 return;
461 } else if (runningRound == null) {
462 // No round running, no need to tally votes.
463 return;
464 }
465
466 assert headIndex.equals(runningRound.acceptedNewIndex);
467 int matching = 0;
468 for (KetchReplica r : voters) {
469 if (r.hasAccepted(headIndex)) {
470 matching++;
471 }
472 }
473
474 int quorum = voters.length / 2 + 1;
475 boolean success = matching >= quorum;
476 if (!success) {
477 return;
478 }
479
480 switch (state) {
481 case CANDIDATE:
482 term = ((ElectionRound) runningRound).getTerm();
483 state = LEADER;
484 if (log.isDebugEnabled()) {
485 log.debug("Won election, running term " + term); //$NON-NLS-1$
486 }
487
488 //$FALL-THROUGH$
489 case LEADER:
490 committedIndex = headIndex;
491 if (log.isDebugEnabled()) {
492 log.debug("Committed {} in term {}", //$NON-NLS-1$
493 committedIndex.describeForLog(),
494 Long.valueOf(term));
495 }
496 nextRound();
497 commitAsync(replica);
498 notifySuccess(runningRound);
499 if (log.isDebugEnabled()) {
500 log.debug("Leader state:\n{}", snapshot()); //$NON-NLS-1$
501 }
502 break;
503
504 default:
505 log.debug("Leader ignoring replica while in {}", state); //$NON-NLS-1$
506 break;
507 }
508 }
509
510 private void notifySuccess(Round round) {
511 // Drop the leader lock while notifying Proposal listeners.
512 lock.unlock();
513 try {
514 round.success();
515 } finally {
516 lock.lock();
517 }
518 }
519
520 private void commitAsync(KetchReplica caller) {
521 for (KetchReplica r : voters) {
522 if (r == caller) {
523 continue;
524 }
525 if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
526 r.pushCommitAsync(committedIndex);
527 }
528 }
529 for (KetchReplica r : followers) {
530 if (r == caller) {
531 continue;
532 }
533 if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
534 r.pushCommitAsync(committedIndex);
535 }
536 }
537 }
538
539 /** Schedule the next round; invoked while {@link #lock} is held. */
540 void nextRound() {
541 runningRound = null;
542
543 if (queued.isEmpty()) {
544 idle = true;
545 } else {
546 // Caller holds lock. Reschedule leader on a new thread so
547 // the call stack can unwind and lock is not held unexpectedly
548 // during prepare for the next round.
549 scheduleLeader();
550 }
551 }
552
553 /**
554 * Snapshot this leader
555 *
556 * @return snapshot of this leader
557 */
558 public LeaderSnapshot snapshot() {
559 lock.lock();
560 try {
561 LeaderSnapshot s = new LeaderSnapshot();
562 s.state = state;
563 s.term = term;
564 s.headIndex = headIndex;
565 s.committedIndex = committedIndex;
566 s.idle = isIdle();
567 for (KetchReplica r : voters) {
568 s.replicas.add(r.snapshot());
569 }
570 for (KetchReplica r : followers) {
571 s.replicas.add(r.snapshot());
572 }
573 return s;
574 } finally {
575 lock.unlock();
576 }
577 }
578
579 /**
580 * Gracefully shutdown this leader and cancel outstanding operations.
581 */
582 public void shutdown() {
583 lock.lock();
584 try {
585 if (state != SHUTDOWN) {
586 state = SHUTDOWN;
587 for (KetchReplica r : voters) {
588 r.shutdown();
589 }
590 for (KetchReplica r : followers) {
591 r.shutdown();
592 }
593 }
594 } finally {
595 lock.unlock();
596 }
597 }
598
599 /** {@inheritDoc} */
600 @Override
601 public String toString() {
602 return snapshot().toString();
603 }
604 }