1 /* 2 * Copyright (C) 2016, Google Inc. 3 * and other copyright owners as documented in the project's IP log. 4 * 5 * This program and the accompanying materials are made available 6 * under the terms of the Eclipse Distribution License v1.0 which 7 * accompanies this distribution, is reproduced below, and is 8 * available at http://www.eclipse.org/org/documents/edl-v10.php 9 * 10 * All rights reserved. 11 * 12 * Redistribution and use in source and binary forms, with or 13 * without modification, are permitted provided that the following 14 * conditions are met: 15 * 16 * - Redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer. 18 * 19 * - Redistributions in binary form must reproduce the above 20 * copyright notice, this list of conditions and the following 21 * disclaimer in the documentation and/or other materials provided 22 * with the distribution. 23 * 24 * - Neither the name of the Eclipse Foundation, Inc. nor the 25 * names of its contributors may be used to endorse or promote 26 * products derived from this software without specific prior 27 * written permission. 28 * 29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND 30 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, 31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 32 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 33 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 34 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 35 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 36 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 37 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 38 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 39 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 40 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 41 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 42 */ 43 44 package org.eclipse.jgit.internal.ketch; 45 46 import static org.eclipse.jgit.internal.ketch.KetchLeader.State.CANDIDATE; 47 import static org.eclipse.jgit.internal.ketch.KetchLeader.State.LEADER; 48 import static org.eclipse.jgit.internal.ketch.KetchLeader.State.SHUTDOWN; 49 import static org.eclipse.jgit.internal.ketch.KetchReplica.Participation.FOLLOWER_ONLY; 50 import static org.eclipse.jgit.internal.ketch.Proposal.State.QUEUED; 51 52 import java.io.IOException; 53 import java.text.MessageFormat; 54 import java.util.ArrayList; 55 import java.util.Arrays; 56 import java.util.Collection; 57 import java.util.List; 58 import java.util.concurrent.locks.Lock; 59 import java.util.concurrent.locks.ReentrantLock; 60 61 import org.eclipse.jgit.internal.storage.reftree.RefTree; 62 import org.eclipse.jgit.lib.ObjectId; 63 import org.eclipse.jgit.lib.Repository; 64 import org.eclipse.jgit.revwalk.RevCommit; 65 import org.eclipse.jgit.revwalk.RevWalk; 66 import org.slf4j.Logger; 67 import org.slf4j.LoggerFactory; 68 69 /** 70 * A leader managing consensus across remote followers. 71 * <p> 72 * A leader instance starts up in 73 * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#CANDIDATE} and tries 74 * to begin a new term by sending an 75 * {@link org.eclipse.jgit.internal.ketch.ElectionRound} to all replicas. Its 76 * term starts if a majority of replicas have accepted this leader instance for 77 * the term. 78 * <p> 79 * Once elected by a majority the instance enters 80 * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#LEADER} and runs 81 * proposals offered to {@link #queueProposal(Proposal)}. This continues until 82 * the leader is timed out for inactivity, or is deposed by a competing leader 83 * gaining its own majority. 84 * <p> 85 * Once timed out or deposed this {@code KetchLeader} instance should be 86 * discarded, and a new instance takes over. 87 * <p> 88 * Each leader instance coordinates a group of 89 * {@link org.eclipse.jgit.internal.ketch.KetchReplica}s. Replica instances are 90 * owned by the leader instance and must be discarded when the leader is 91 * discarded. 92 * <p> 93 * In Ketch all push requests are issued through the leader. The steps are as 94 * follows (see {@link org.eclipse.jgit.internal.ketch.KetchPreReceive} for an 95 * example): 96 * <ul> 97 * <li>Create a {@link org.eclipse.jgit.internal.ketch.Proposal} with the 98 * {@link org.eclipse.jgit.transport.ReceiveCommand}s that represent the push. 99 * <li>Invoke {@link #queueProposal(Proposal)} on the leader instance. 100 * <li>Wait for consensus with 101 * {@link org.eclipse.jgit.internal.ketch.Proposal#await()}. 102 * <li>To examine the status of the push, check 103 * {@link org.eclipse.jgit.internal.ketch.Proposal#getCommands()}, looking at 104 * {@link org.eclipse.jgit.internal.storage.reftree.Command#getResult()}. 105 * </ul> 106 * <p> 107 * The leader gains consensus by first pushing the needed objects and a 108 * {@link org.eclipse.jgit.internal.storage.reftree.RefTree} representing the 109 * desired target repository state to the {@code refs/txn/accepted} branch on 110 * each of the replicas. Once a majority has succeeded, the leader commits the 111 * state by either pushing the {@code refs/txn/accepted} value to 112 * {@code refs/txn/committed} (for Ketch-aware replicas) or by pushing updates 113 * to {@code refs/heads/master}, etc. for stock Git replicas. 114 * <p> 115 * Internally, the actual transport to replicas is performed on background 116 * threads via the {@link org.eclipse.jgit.internal.ketch.KetchSystem}'s 117 * executor service. For performance, the 118 * {@link org.eclipse.jgit.internal.ketch.KetchLeader}, 119 * {@link org.eclipse.jgit.internal.ketch.KetchReplica} and 120 * {@link org.eclipse.jgit.internal.ketch.Proposal} objects share some state, 121 * and may invoke each other's methods on different threads. This access is 122 * protected by the leader's {@link #lock} object. Care must be taken to prevent 123 * concurrent access by correctly obtaining the leader's lock. 124 */ 125 public abstract class KetchLeader { 126 private static final Logger log = LoggerFactory.getLogger(KetchLeader.class); 127 128 /** Current state of the leader instance. */ 129 public static enum State { 130 /** Newly created instance trying to elect itself leader. */ 131 CANDIDATE, 132 133 /** Leader instance elected by a majority. */ 134 LEADER, 135 136 /** Instance has been deposed by another with a more recent term. */ 137 DEPOSED, 138 139 /** Leader has been gracefully shutdown, e.g. due to inactivity. */ 140 SHUTDOWN; 141 } 142 143 private final KetchSystem system; 144 145 /** Leader's knowledge of replicas for this repository. */ 146 private KetchReplica[] voters; 147 private KetchReplica[] followers; 148 private LocalReplica self; 149 150 /** 151 * Lock protecting all data within this leader instance. 152 * <p> 153 * This lock extends into the {@link KetchReplica} instances used by the 154 * leader. They share the same lock instance to simplify concurrency. 155 */ 156 final Lock lock; 157 158 private State state = CANDIDATE; 159 160 /** Term of this leader, once elected. */ 161 private long term; 162 163 /** 164 * Pending proposals accepted into the queue in FIFO order. 165 * <p> 166 * These proposals were preflighted and do not contain any conflicts with 167 * each other and their expectations matched the leader's local view of the 168 * agreed upon {@code refs/txn/accepted} tree. 169 */ 170 private final List<Proposal> queued; 171 172 /** 173 * State of the repository's RefTree after applying all entries in 174 * {@link #queued}. New proposals must be consistent with this tree to be 175 * appended to the end of {@link #queued}. 176 * <p> 177 * Must be deep-copied with {@link RefTree#copy()} if 178 * {@link #roundHoldsReferenceToRefTree} is {@code true}. 179 */ 180 private RefTree refTree; 181 182 /** 183 * If {@code true} {@link #refTree} must be duplicated before queuing the 184 * next proposal. The {@link #refTree} was passed into the constructor of a 185 * {@link ProposalRound}, and that external reference to the {@link RefTree} 186 * object is held by the proposal until it materializes the tree object in 187 * the object store. This field is set {@code true} when the proposal begins 188 * execution and set {@code false} once tree objects are persisted in the 189 * local repository's object store or {@link #refTree} is replaced with a 190 * copy to isolate it from any running rounds. 191 * <p> 192 * If proposals arrive less frequently than the {@code RefTree} is written 193 * out to the repository the {@link #roundHoldsReferenceToRefTree} behavior 194 * avoids duplicating {@link #refTree}, reducing both time and memory used. 195 * However if proposals arrive more frequently {@link #refTree} must be 196 * duplicated to prevent newly queued proposals from corrupting the 197 * {@link #runningRound}. 198 */ 199 volatile boolean roundHoldsReferenceToRefTree; 200 201 /** End of the leader's log. */ 202 private LogIndex headIndex; 203 204 /** Leader knows this (and all prior) states are committed. */ 205 private LogIndex committedIndex; 206 207 /** 208 * Is the leader idle with no work pending? If {@code true} there is no work 209 * for the leader (normal state). This field is {@code false} when the 210 * leader thread is scheduled for execution, or while {@link #runningRound} 211 * defines a round in progress. 212 */ 213 private boolean idle; 214 215 /** Current round the leader is preparing and waiting for a vote on. */ 216 private Round runningRound; 217 218 /** 219 * Construct a leader for a Ketch instance. 220 * 221 * @param system 222 * Ketch system configuration the leader must adhere to. 223 */ 224 protected KetchLeader(KetchSystem system) { 225 this.system = system; 226 this.lock = new ReentrantLock(true /* fair */); 227 this.queued = new ArrayList<>(4); 228 this.idle = true; 229 } 230 231 /** @return system configuration. */ 232 KetchSystem getSystem() { 233 return system; 234 } 235 236 /** 237 * Configure the replicas used by this Ketch instance. 238 * <p> 239 * Replicas should be configured once at creation before any proposals are 240 * executed. Once elections happen, <b>reconfiguration is a complicated 241 * concept that is not currently supported</b>. 242 * 243 * @param replicas 244 * members participating with the same repository. 245 */ 246 public void setReplicas(Collection<KetchReplica> replicas) { 247 List<KetchReplica> v = new ArrayList<>(5); 248 List<KetchReplica> f = new ArrayList<>(5); 249 for (KetchReplica r : replicas) { 250 switch (r.getParticipation()) { 251 case FULL: 252 v.add(r); 253 break; 254 255 case FOLLOWER_ONLY: 256 f.add(r); 257 break; 258 } 259 } 260 261 Collection<Integer> validVoters = validVoterCounts(); 262 if (!validVoters.contains(Integer.valueOf(v.size()))) { 263 throw new IllegalArgumentException(MessageFormat.format( 264 KetchText.get().unsupportedVoterCount, 265 Integer.valueOf(v.size()), 266 validVoters)); 267 } 268 269 LocalReplica me = findLocal(v); 270 if (me == null) { 271 throw new IllegalArgumentException( 272 KetchText.get().localReplicaRequired); 273 } 274 275 lock.lock(); 276 try { 277 voters = v.toArray(new KetchReplica[v.size()]); 278 followers = f.toArray(new KetchReplica[f.size()]); 279 self = me; 280 } finally { 281 lock.unlock(); 282 } 283 } 284 285 private static Collection<Integer> validVoterCounts() { 286 @SuppressWarnings("boxing") 287 Integer[] valid = { 288 // An odd number of voting replicas is required. 289 1, 3, 5, 7, 9 }; 290 return Arrays.asList(valid); 291 } 292 293 private static LocalReplica findLocal(Collection<KetchReplica> voters) { 294 for (KetchReplica r : voters) { 295 if (r instanceof LocalReplica) { 296 return (LocalReplica) r; 297 } 298 } 299 return null; 300 } 301 302 /** 303 * Get an instance of the repository for use by a leader thread. 304 * <p> 305 * The caller will close the repository. 306 * 307 * @return opened repository for use by the leader thread. 308 * @throws java.io.IOException 309 * cannot reopen the repository for the leader. 310 */ 311 protected abstract Repository openRepository() throws IOException; 312 313 /** 314 * Queue a reference update proposal for consensus. 315 * <p> 316 * This method does not wait for consensus to be reached. The proposal is 317 * checked to look for risks of conflicts, and then submitted into the queue 318 * for distribution as soon as possible. 319 * <p> 320 * Callers must use {@link org.eclipse.jgit.internal.ketch.Proposal#await()} 321 * to see if the proposal is done. 322 * 323 * @param proposal 324 * the proposed reference updates to queue for consideration. 325 * Once execution is complete the individual reference result 326 * fields will be populated with the outcome. 327 * @throws java.lang.InterruptedException 328 * current thread was interrupted. The proposal may have been 329 * aborted if it was not yet queued for execution. 330 * @throws java.io.IOException 331 * unrecoverable error preventing proposals from being attempted 332 * by this leader. 333 */ 334 public void queueProposal(Proposal proposal) 335 throws InterruptedException, IOException { 336 try { 337 lock.lockInterruptibly(); 338 } catch (InterruptedException e) { 339 proposal.abort(); 340 throw e; 341 } 342 try { 343 if (refTree == null) { 344 initialize(); 345 for (Proposal p : queued) { 346 refTree.apply(p.getCommands()); 347 } 348 } else if (roundHoldsReferenceToRefTree) { 349 refTree = refTree.copy(); 350 roundHoldsReferenceToRefTree = false; 351 } 352 353 if (!refTree.apply(proposal.getCommands())) { 354 // A conflict exists so abort the proposal. 355 proposal.abort(); 356 return; 357 } 358 359 queued.add(proposal); 360 proposal.notifyState(QUEUED); 361 362 if (idle) { 363 scheduleLeader(); 364 } 365 } finally { 366 lock.unlock(); 367 } 368 } 369 370 private void initialize() throws IOException { 371 try (Repository git = openRepository(); RevWalk rw = new RevWalk(git)) { 372 self.initialize(git); 373 374 ObjectId accepted = self.getTxnAccepted(); 375 if (!ObjectId.zeroId().equals(accepted)) { 376 RevCommit c = rw.parseCommit(accepted); 377 headIndex = LogIndex.unknown(accepted); 378 refTree = RefTree.read(rw.getObjectReader(), c.getTree()); 379 } else { 380 headIndex = LogIndex.unknown(ObjectId.zeroId()); 381 refTree = RefTree.newEmptyTree(); 382 } 383 } 384 } 385 386 private void scheduleLeader() { 387 idle = false; 388 system.getExecutor().execute(new Runnable() { 389 @Override 390 public void run() { 391 runLeader(); 392 } 393 }); 394 } 395 396 private void runLeader() { 397 Round round; 398 lock.lock(); 399 try { 400 switch (state) { 401 case CANDIDATE: 402 round = new ElectionRound(this, headIndex); 403 break; 404 405 case LEADER: 406 round = newProposalRound(); 407 break; 408 409 case DEPOSED: 410 case SHUTDOWN: 411 default: 412 log.warn("Leader cannot run {}", state); //$NON-NLS-1$ 413 // TODO(sop): Redirect proposals. 414 return; 415 } 416 } finally { 417 lock.unlock(); 418 } 419 420 try { 421 round.start(); 422 } catch (IOException e) { 423 // TODO(sop) Depose leader if it cannot use its repository. 424 log.error(KetchText.get().leaderFailedToStore, e); 425 lock.lock(); 426 try { 427 nextRound(); 428 } finally { 429 lock.unlock(); 430 } 431 } 432 } 433 434 private ProposalRound newProposalRound() { 435 List<Proposal> todo = new ArrayList<>(queued); 436 queued.clear(); 437 roundHoldsReferenceToRefTree = true; 438 return new ProposalRound(this, headIndex, todo, refTree); 439 } 440 441 /** @return term of this leader's reign. */ 442 long getTerm() { 443 return term; 444 } 445 446 /** @return end of the leader's log. */ 447 LogIndex getHead() { 448 return headIndex; 449 } 450 451 /** 452 * @return state leader knows it has committed across a quorum of replicas. 453 */ 454 LogIndex getCommitted() { 455 return committedIndex; 456 } 457 458 boolean isIdle() { 459 return idle; 460 } 461 462 void runAsync(Round round) { 463 lock.lock(); 464 try { 465 // End of the log is this round. Once transport begins it is 466 // reasonable to assume at least one replica will eventually get 467 // this, and there is reasonable probability it commits. 468 headIndex = round.acceptedNewIndex; 469 runningRound = round; 470 471 for (KetchReplica replica : voters) { 472 replica.pushTxnAcceptedAsync(round); 473 } 474 for (KetchReplica replica : followers) { 475 replica.pushTxnAcceptedAsync(round); 476 } 477 } finally { 478 lock.unlock(); 479 } 480 } 481 482 /** 483 * Asynchronous signal from a replica after completion. 484 * <p> 485 * Must be called while {@link #lock} is held by the replica. 486 * 487 * @param replica 488 * replica posting a completion event. 489 */ 490 void onReplicaUpdate(KetchReplica replica) { 491 if (log.isDebugEnabled()) { 492 log.debug("Replica {} finished:\n{}", //$NON-NLS-1$ 493 replica.describeForLog(), snapshot()); 494 } 495 496 if (replica.getParticipation() == FOLLOWER_ONLY) { 497 // Followers cannot vote, so votes haven't changed. 498 return; 499 } else if (runningRound == null) { 500 // No round running, no need to tally votes. 501 return; 502 } 503 504 assert headIndex.equals(runningRound.acceptedNewIndex); 505 int matching = 0; 506 for (KetchReplica r : voters) { 507 if (r.hasAccepted(headIndex)) { 508 matching++; 509 } 510 } 511 512 int quorum = voters.length / 2 + 1; 513 boolean success = matching >= quorum; 514 if (!success) { 515 return; 516 } 517 518 switch (state) { 519 case CANDIDATE: 520 term = ((ElectionRound) runningRound).getTerm(); 521 state = LEADER; 522 if (log.isDebugEnabled()) { 523 log.debug("Won election, running term " + term); //$NON-NLS-1$ 524 } 525 526 //$FALL-THROUGH$ 527 case LEADER: 528 committedIndex = headIndex; 529 if (log.isDebugEnabled()) { 530 log.debug("Committed {} in term {}", //$NON-NLS-1$ 531 committedIndex.describeForLog(), 532 Long.valueOf(term)); 533 } 534 nextRound(); 535 commitAsync(replica); 536 notifySuccess(runningRound); 537 if (log.isDebugEnabled()) { 538 log.debug("Leader state:\n{}", snapshot()); //$NON-NLS-1$ 539 } 540 break; 541 542 default: 543 log.debug("Leader ignoring replica while in {}", state); //$NON-NLS-1$ 544 break; 545 } 546 } 547 548 private void notifySuccess(Round round) { 549 // Drop the leader lock while notifying Proposal listeners. 550 lock.unlock(); 551 try { 552 round.success(); 553 } finally { 554 lock.lock(); 555 } 556 } 557 558 private void commitAsync(KetchReplica caller) { 559 for (KetchReplica r : voters) { 560 if (r == caller) { 561 continue; 562 } 563 if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) { 564 r.pushCommitAsync(committedIndex); 565 } 566 } 567 for (KetchReplica r : followers) { 568 if (r == caller) { 569 continue; 570 } 571 if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) { 572 r.pushCommitAsync(committedIndex); 573 } 574 } 575 } 576 577 /** Schedule the next round; invoked while {@link #lock} is held. */ 578 void nextRound() { 579 runningRound = null; 580 581 if (queued.isEmpty()) { 582 idle = true; 583 } else { 584 // Caller holds lock. Reschedule leader on a new thread so 585 // the call stack can unwind and lock is not held unexpectedly 586 // during prepare for the next round. 587 scheduleLeader(); 588 } 589 } 590 591 /** 592 * Snapshot this leader 593 * 594 * @return snapshot of this leader 595 */ 596 public LeaderSnapshot snapshot() { 597 lock.lock(); 598 try { 599 LeaderSnapshot s = new LeaderSnapshot(); 600 s.state = state; 601 s.term = term; 602 s.headIndex = headIndex; 603 s.committedIndex = committedIndex; 604 s.idle = isIdle(); 605 for (KetchReplica r : voters) { 606 s.replicas.add(r.snapshot()); 607 } 608 for (KetchReplica r : followers) { 609 s.replicas.add(r.snapshot()); 610 } 611 return s; 612 } finally { 613 lock.unlock(); 614 } 615 } 616 617 /** 618 * Gracefully shutdown this leader and cancel outstanding operations. 619 */ 620 public void shutdown() { 621 lock.lock(); 622 try { 623 if (state != SHUTDOWN) { 624 state = SHUTDOWN; 625 for (KetchReplica r : voters) { 626 r.shutdown(); 627 } 628 for (KetchReplica r : followers) { 629 r.shutdown(); 630 } 631 } 632 } finally { 633 lock.unlock(); 634 } 635 } 636 637 /** {@inheritDoc} */ 638 @Override 639 public String toString() { 640 return snapshot().toString(); 641 } 642 }