1 /*
2 * Copyright (C) 2016, Google Inc.
3 * and other copyright owners as documented in the project's IP log.
4 *
5 * This program and the accompanying materials are made available
6 * under the terms of the Eclipse Distribution License v1.0 which
7 * accompanies this distribution, is reproduced below, and is
8 * available at http://www.eclipse.org/org/documents/edl-v10.php
9 *
10 * All rights reserved.
11 *
12 * Redistribution and use in source and binary forms, with or
13 * without modification, are permitted provided that the following
14 * conditions are met:
15 *
16 * - Redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer.
18 *
19 * - Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials provided
22 * with the distribution.
23 *
24 * - Neither the name of the Eclipse Foundation, Inc. nor the
25 * names of its contributors may be used to endorse or promote
26 * products derived from this software without specific prior
27 * written permission.
28 *
29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42 */
43
44 package org.eclipse.jgit.internal.ketch;
45
46 import static org.eclipse.jgit.internal.ketch.KetchLeader.State.CANDIDATE;
47 import static org.eclipse.jgit.internal.ketch.KetchLeader.State.LEADER;
48 import static org.eclipse.jgit.internal.ketch.KetchLeader.State.SHUTDOWN;
49 import static org.eclipse.jgit.internal.ketch.KetchReplica.Participation.FOLLOWER_ONLY;
50 import static org.eclipse.jgit.internal.ketch.Proposal.State.QUEUED;
51
52 import java.io.IOException;
53 import java.text.MessageFormat;
54 import java.util.ArrayList;
55 import java.util.Arrays;
56 import java.util.Collection;
57 import java.util.List;
58 import java.util.concurrent.locks.Lock;
59 import java.util.concurrent.locks.ReentrantLock;
60
61 import org.eclipse.jgit.internal.storage.reftree.RefTree;
62 import org.eclipse.jgit.lib.ObjectId;
63 import org.eclipse.jgit.lib.Repository;
64 import org.eclipse.jgit.revwalk.RevCommit;
65 import org.eclipse.jgit.revwalk.RevWalk;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68
69 /**
70 * A leader managing consensus across remote followers.
71 * <p>
72 * A leader instance starts up in
73 * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#CANDIDATE} and tries
74 * to begin a new term by sending an
75 * {@link org.eclipse.jgit.internal.ketch.ElectionRound} to all replicas. Its
76 * term starts if a majority of replicas have accepted this leader instance for
77 * the term.
78 * <p>
79 * Once elected by a majority the instance enters
80 * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#LEADER} and runs
81 * proposals offered to {@link #queueProposal(Proposal)}. This continues until
82 * the leader is timed out for inactivity, or is deposed by a competing leader
83 * gaining its own majority.
84 * <p>
85 * Once timed out or deposed this {@code KetchLeader} instance should be
86 * discarded, and a new instance takes over.
87 * <p>
88 * Each leader instance coordinates a group of
89 * {@link org.eclipse.jgit.internal.ketch.KetchReplica}s. Replica instances are
90 * owned by the leader instance and must be discarded when the leader is
91 * discarded.
92 * <p>
93 * In Ketch all push requests are issued through the leader. The steps are as
94 * follows (see {@link org.eclipse.jgit.internal.ketch.KetchPreReceive} for an
95 * example):
96 * <ul>
97 * <li>Create a {@link org.eclipse.jgit.internal.ketch.Proposal} with the
98 * {@link org.eclipse.jgit.transport.ReceiveCommand}s that represent the push.
99 * <li>Invoke {@link #queueProposal(Proposal)} on the leader instance.
100 * <li>Wait for consensus with
101 * {@link org.eclipse.jgit.internal.ketch.Proposal#await()}.
102 * <li>To examine the status of the push, check
103 * {@link org.eclipse.jgit.internal.ketch.Proposal#getCommands()}, looking at
104 * {@link org.eclipse.jgit.internal.storage.reftree.Command#getResult()}.
105 * </ul>
106 * <p>
107 * The leader gains consensus by first pushing the needed objects and a
108 * {@link org.eclipse.jgit.internal.storage.reftree.RefTree} representing the
109 * desired target repository state to the {@code refs/txn/accepted} branch on
110 * each of the replicas. Once a majority has succeeded, the leader commits the
111 * state by either pushing the {@code refs/txn/accepted} value to
112 * {@code refs/txn/committed} (for Ketch-aware replicas) or by pushing updates
113 * to {@code refs/heads/master}, etc. for stock Git replicas.
114 * <p>
115 * Internally, the actual transport to replicas is performed on background
116 * threads via the {@link org.eclipse.jgit.internal.ketch.KetchSystem}'s
117 * executor service. For performance, the
118 * {@link org.eclipse.jgit.internal.ketch.KetchLeader},
119 * {@link org.eclipse.jgit.internal.ketch.KetchReplica} and
120 * {@link org.eclipse.jgit.internal.ketch.Proposal} objects share some state,
121 * and may invoke each other's methods on different threads. This access is
122 * protected by the leader's {@link #lock} object. Care must be taken to prevent
123 * concurrent access by correctly obtaining the leader's lock.
124 */
125 public abstract class KetchLeader {
126 private static final Logger log = LoggerFactory.getLogger(KetchLeader.class);
127
128 /** Current state of the leader instance. */
129 public static enum State {
130 /** Newly created instance trying to elect itself leader. */
131 CANDIDATE,
132
133 /** Leader instance elected by a majority. */
134 LEADER,
135
136 /** Instance has been deposed by another with a more recent term. */
137 DEPOSED,
138
139 /** Leader has been gracefully shutdown, e.g. due to inactivity. */
140 SHUTDOWN;
141 }
142
143 private final KetchSystem system;
144
145 /** Leader's knowledge of replicas for this repository. */
146 private KetchReplica[] voters;
147 private KetchReplica[] followers;
148 private LocalReplica self;
149
150 /**
151 * Lock protecting all data within this leader instance.
152 * <p>
153 * This lock extends into the {@link KetchReplica} instances used by the
154 * leader. They share the same lock instance to simplify concurrency.
155 */
156 final Lock lock;
157
158 private State state = CANDIDATE;
159
160 /** Term of this leader, once elected. */
161 private long term;
162
163 /**
164 * Pending proposals accepted into the queue in FIFO order.
165 * <p>
166 * These proposals were preflighted and do not contain any conflicts with
167 * each other and their expectations matched the leader's local view of the
168 * agreed upon {@code refs/txn/accepted} tree.
169 */
170 private final List<Proposal> queued;
171
172 /**
173 * State of the repository's RefTree after applying all entries in
174 * {@link #queued}. New proposals must be consistent with this tree to be
175 * appended to the end of {@link #queued}.
176 * <p>
177 * Must be deep-copied with {@link RefTree#copy()} if
178 * {@link #roundHoldsReferenceToRefTree} is {@code true}.
179 */
180 private RefTree refTree;
181
182 /**
183 * If {@code true} {@link #refTree} must be duplicated before queuing the
184 * next proposal. The {@link #refTree} was passed into the constructor of a
185 * {@link ProposalRound}, and that external reference to the {@link RefTree}
186 * object is held by the proposal until it materializes the tree object in
187 * the object store. This field is set {@code true} when the proposal begins
188 * execution and set {@code false} once tree objects are persisted in the
189 * local repository's object store or {@link #refTree} is replaced with a
190 * copy to isolate it from any running rounds.
191 * <p>
192 * If proposals arrive less frequently than the {@code RefTree} is written
193 * out to the repository the {@link #roundHoldsReferenceToRefTree} behavior
194 * avoids duplicating {@link #refTree}, reducing both time and memory used.
195 * However if proposals arrive more frequently {@link #refTree} must be
196 * duplicated to prevent newly queued proposals from corrupting the
197 * {@link #runningRound}.
198 */
199 volatile boolean roundHoldsReferenceToRefTree;
200
201 /** End of the leader's log. */
202 private LogIndex headIndex;
203
204 /** Leader knows this (and all prior) states are committed. */
205 private LogIndex committedIndex;
206
207 /**
208 * Is the leader idle with no work pending? If {@code true} there is no work
209 * for the leader (normal state). This field is {@code false} when the
210 * leader thread is scheduled for execution, or while {@link #runningRound}
211 * defines a round in progress.
212 */
213 private boolean idle;
214
215 /** Current round the leader is preparing and waiting for a vote on. */
216 private Round runningRound;
217
218 /**
219 * Construct a leader for a Ketch instance.
220 *
221 * @param system
222 * Ketch system configuration the leader must adhere to.
223 */
224 protected KetchLeader(KetchSystem system) {
225 this.system = system;
226 this.lock = new ReentrantLock(true /* fair */);
227 this.queued = new ArrayList<>(4);
228 this.idle = true;
229 }
230
231 /** @return system configuration. */
232 KetchSystem getSystem() {
233 return system;
234 }
235
236 /**
237 * Configure the replicas used by this Ketch instance.
238 * <p>
239 * Replicas should be configured once at creation before any proposals are
240 * executed. Once elections happen, <b>reconfiguration is a complicated
241 * concept that is not currently supported</b>.
242 *
243 * @param replicas
244 * members participating with the same repository.
245 */
246 public void setReplicas(Collection<KetchReplica> replicas) {
247 List<KetchReplica> v = new ArrayList<>(5);
248 List<KetchReplica> f = new ArrayList<>(5);
249 for (KetchReplica r : replicas) {
250 switch (r.getParticipation()) {
251 case FULL:
252 v.add(r);
253 break;
254
255 case FOLLOWER_ONLY:
256 f.add(r);
257 break;
258 }
259 }
260
261 Collection<Integer> validVoters = validVoterCounts();
262 if (!validVoters.contains(Integer.valueOf(v.size()))) {
263 throw new IllegalArgumentException(MessageFormat.format(
264 KetchText.get().unsupportedVoterCount,
265 Integer.valueOf(v.size()),
266 validVoters));
267 }
268
269 LocalReplica me = findLocal(v);
270 if (me == null) {
271 throw new IllegalArgumentException(
272 KetchText.get().localReplicaRequired);
273 }
274
275 lock.lock();
276 try {
277 voters = v.toArray(new KetchReplica[v.size()]);
278 followers = f.toArray(new KetchReplica[f.size()]);
279 self = me;
280 } finally {
281 lock.unlock();
282 }
283 }
284
285 private static Collection<Integer> validVoterCounts() {
286 @SuppressWarnings("boxing")
287 Integer[] valid = {
288 // An odd number of voting replicas is required.
289 1, 3, 5, 7, 9 };
290 return Arrays.asList(valid);
291 }
292
293 private static LocalReplica findLocal(Collection<KetchReplica> voters) {
294 for (KetchReplica r : voters) {
295 if (r instanceof LocalReplica) {
296 return (LocalReplica) r;
297 }
298 }
299 return null;
300 }
301
302 /**
303 * Get an instance of the repository for use by a leader thread.
304 * <p>
305 * The caller will close the repository.
306 *
307 * @return opened repository for use by the leader thread.
308 * @throws java.io.IOException
309 * cannot reopen the repository for the leader.
310 */
311 protected abstract Repository openRepository() throws IOException;
312
313 /**
314 * Queue a reference update proposal for consensus.
315 * <p>
316 * This method does not wait for consensus to be reached. The proposal is
317 * checked to look for risks of conflicts, and then submitted into the queue
318 * for distribution as soon as possible.
319 * <p>
320 * Callers must use {@link org.eclipse.jgit.internal.ketch.Proposal#await()}
321 * to see if the proposal is done.
322 *
323 * @param proposal
324 * the proposed reference updates to queue for consideration.
325 * Once execution is complete the individual reference result
326 * fields will be populated with the outcome.
327 * @throws java.lang.InterruptedException
328 * current thread was interrupted. The proposal may have been
329 * aborted if it was not yet queued for execution.
330 * @throws java.io.IOException
331 * unrecoverable error preventing proposals from being attempted
332 * by this leader.
333 */
334 public void queueProposal(Proposal proposal)
335 throws InterruptedException, IOException {
336 try {
337 lock.lockInterruptibly();
338 } catch (InterruptedException e) {
339 proposal.abort();
340 throw e;
341 }
342 try {
343 if (refTree == null) {
344 initialize();
345 for (Proposal p : queued) {
346 refTree.apply(p.getCommands());
347 }
348 } else if (roundHoldsReferenceToRefTree) {
349 refTree = refTree.copy();
350 roundHoldsReferenceToRefTree = false;
351 }
352
353 if (!refTree.apply(proposal.getCommands())) {
354 // A conflict exists so abort the proposal.
355 proposal.abort();
356 return;
357 }
358
359 queued.add(proposal);
360 proposal.notifyState(QUEUED);
361
362 if (idle) {
363 scheduleLeader();
364 }
365 } finally {
366 lock.unlock();
367 }
368 }
369
370 private void initialize() throws IOException {
371 try (Repository git = openRepository(); RevWalk rw = new RevWalk(git)) {
372 self.initialize(git);
373
374 ObjectId accepted = self.getTxnAccepted();
375 if (!ObjectId.zeroId().equals(accepted)) {
376 RevCommit c = rw.parseCommit(accepted);
377 headIndex = LogIndex.unknown(accepted);
378 refTree = RefTree.read(rw.getObjectReader(), c.getTree());
379 } else {
380 headIndex = LogIndex.unknown(ObjectId.zeroId());
381 refTree = RefTree.newEmptyTree();
382 }
383 }
384 }
385
386 private void scheduleLeader() {
387 idle = false;
388 system.getExecutor().execute(new Runnable() {
389 @Override
390 public void run() {
391 runLeader();
392 }
393 });
394 }
395
396 private void runLeader() {
397 Round round;
398 lock.lock();
399 try {
400 switch (state) {
401 case CANDIDATE:
402 round = new ElectionRound(this, headIndex);
403 break;
404
405 case LEADER:
406 round = newProposalRound();
407 break;
408
409 case DEPOSED:
410 case SHUTDOWN:
411 default:
412 log.warn("Leader cannot run {}", state); //$NON-NLS-1$
413 // TODO(sop): Redirect proposals.
414 return;
415 }
416 } finally {
417 lock.unlock();
418 }
419
420 try {
421 round.start();
422 } catch (IOException e) {
423 // TODO(sop) Depose leader if it cannot use its repository.
424 log.error(KetchText.get().leaderFailedToStore, e);
425 lock.lock();
426 try {
427 nextRound();
428 } finally {
429 lock.unlock();
430 }
431 }
432 }
433
434 private ProposalRound newProposalRound() {
435 List<Proposal> todo = new ArrayList<>(queued);
436 queued.clear();
437 roundHoldsReferenceToRefTree = true;
438 return new ProposalRound(this, headIndex, todo, refTree);
439 }
440
441 /** @return term of this leader's reign. */
442 long getTerm() {
443 return term;
444 }
445
446 /** @return end of the leader's log. */
447 LogIndex getHead() {
448 return headIndex;
449 }
450
451 /**
452 * @return state leader knows it has committed across a quorum of replicas.
453 */
454 LogIndex getCommitted() {
455 return committedIndex;
456 }
457
458 boolean isIdle() {
459 return idle;
460 }
461
462 void runAsync(Round round) {
463 lock.lock();
464 try {
465 // End of the log is this round. Once transport begins it is
466 // reasonable to assume at least one replica will eventually get
467 // this, and there is reasonable probability it commits.
468 headIndex = round.acceptedNewIndex;
469 runningRound = round;
470
471 for (KetchReplica replica : voters) {
472 replica.pushTxnAcceptedAsync(round);
473 }
474 for (KetchReplica replica : followers) {
475 replica.pushTxnAcceptedAsync(round);
476 }
477 } finally {
478 lock.unlock();
479 }
480 }
481
482 /**
483 * Asynchronous signal from a replica after completion.
484 * <p>
485 * Must be called while {@link #lock} is held by the replica.
486 *
487 * @param replica
488 * replica posting a completion event.
489 */
490 void onReplicaUpdate(KetchReplica replica) {
491 if (log.isDebugEnabled()) {
492 log.debug("Replica {} finished:\n{}", //$NON-NLS-1$
493 replica.describeForLog(), snapshot());
494 }
495
496 if (replica.getParticipation() == FOLLOWER_ONLY) {
497 // Followers cannot vote, so votes haven't changed.
498 return;
499 } else if (runningRound == null) {
500 // No round running, no need to tally votes.
501 return;
502 }
503
504 assert headIndex.equals(runningRound.acceptedNewIndex);
505 int matching = 0;
506 for (KetchReplica r : voters) {
507 if (r.hasAccepted(headIndex)) {
508 matching++;
509 }
510 }
511
512 int quorum = voters.length / 2 + 1;
513 boolean success = matching >= quorum;
514 if (!success) {
515 return;
516 }
517
518 switch (state) {
519 case CANDIDATE:
520 term = ((ElectionRound) runningRound).getTerm();
521 state = LEADER;
522 if (log.isDebugEnabled()) {
523 log.debug("Won election, running term " + term); //$NON-NLS-1$
524 }
525
526 //$FALL-THROUGH$
527 case LEADER:
528 committedIndex = headIndex;
529 if (log.isDebugEnabled()) {
530 log.debug("Committed {} in term {}", //$NON-NLS-1$
531 committedIndex.describeForLog(),
532 Long.valueOf(term));
533 }
534 nextRound();
535 commitAsync(replica);
536 notifySuccess(runningRound);
537 if (log.isDebugEnabled()) {
538 log.debug("Leader state:\n{}", snapshot()); //$NON-NLS-1$
539 }
540 break;
541
542 default:
543 log.debug("Leader ignoring replica while in {}", state); //$NON-NLS-1$
544 break;
545 }
546 }
547
548 private void notifySuccess(Round round) {
549 // Drop the leader lock while notifying Proposal listeners.
550 lock.unlock();
551 try {
552 round.success();
553 } finally {
554 lock.lock();
555 }
556 }
557
558 private void commitAsync(KetchReplica caller) {
559 for (KetchReplica r : voters) {
560 if (r == caller) {
561 continue;
562 }
563 if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
564 r.pushCommitAsync(committedIndex);
565 }
566 }
567 for (KetchReplica r : followers) {
568 if (r == caller) {
569 continue;
570 }
571 if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
572 r.pushCommitAsync(committedIndex);
573 }
574 }
575 }
576
577 /** Schedule the next round; invoked while {@link #lock} is held. */
578 void nextRound() {
579 runningRound = null;
580
581 if (queued.isEmpty()) {
582 idle = true;
583 } else {
584 // Caller holds lock. Reschedule leader on a new thread so
585 // the call stack can unwind and lock is not held unexpectedly
586 // during prepare for the next round.
587 scheduleLeader();
588 }
589 }
590
591 /**
592 * Snapshot this leader
593 *
594 * @return snapshot of this leader
595 */
596 public LeaderSnapshot snapshot() {
597 lock.lock();
598 try {
599 LeaderSnapshot s = new LeaderSnapshot();
600 s.state = state;
601 s.term = term;
602 s.headIndex = headIndex;
603 s.committedIndex = committedIndex;
604 s.idle = isIdle();
605 for (KetchReplica r : voters) {
606 s.replicas.add(r.snapshot());
607 }
608 for (KetchReplica r : followers) {
609 s.replicas.add(r.snapshot());
610 }
611 return s;
612 } finally {
613 lock.unlock();
614 }
615 }
616
617 /**
618 * Gracefully shutdown this leader and cancel outstanding operations.
619 */
620 public void shutdown() {
621 lock.lock();
622 try {
623 if (state != SHUTDOWN) {
624 state = SHUTDOWN;
625 for (KetchReplica r : voters) {
626 r.shutdown();
627 }
628 for (KetchReplica r : followers) {
629 r.shutdown();
630 }
631 }
632 } finally {
633 lock.unlock();
634 }
635 }
636
637 /** {@inheritDoc} */
638 @Override
639 public String toString() {
640 return snapshot().toString();
641 }
642 }