KetchLeader.java

  1. /*
  2.  * Copyright (C) 2016, Google Inc.
  3.  * and other copyright owners as documented in the project's IP log.
  4.  *
  5.  * This program and the accompanying materials are made available
  6.  * under the terms of the Eclipse Distribution License v1.0 which
  7.  * accompanies this distribution, is reproduced below, and is
  8.  * available at http://www.eclipse.org/org/documents/edl-v10.php
  9.  *
  10.  * All rights reserved.
  11.  *
  12.  * Redistribution and use in source and binary forms, with or
  13.  * without modification, are permitted provided that the following
  14.  * conditions are met:
  15.  *
  16.  * - Redistributions of source code must retain the above copyright
  17.  *   notice, this list of conditions and the following disclaimer.
  18.  *
  19.  * - Redistributions in binary form must reproduce the above
  20.  *   copyright notice, this list of conditions and the following
  21.  *   disclaimer in the documentation and/or other materials provided
  22.  *   with the distribution.
  23.  *
  24.  * - Neither the name of the Eclipse Foundation, Inc. nor the
  25.  *   names of its contributors may be used to endorse or promote
  26.  *   products derived from this software without specific prior
  27.  *   written permission.
  28.  *
  29.  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
  30.  * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
  31.  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  32.  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  33.  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  34.  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  35.  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  36.  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  37.  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
  38.  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
  39.  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  40.  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
  41.  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  42.  */

  43. package org.eclipse.jgit.internal.ketch;

  44. import static org.eclipse.jgit.internal.ketch.KetchLeader.State.CANDIDATE;
  45. import static org.eclipse.jgit.internal.ketch.KetchLeader.State.LEADER;
  46. import static org.eclipse.jgit.internal.ketch.KetchLeader.State.SHUTDOWN;
  47. import static org.eclipse.jgit.internal.ketch.KetchReplica.Participation.FOLLOWER_ONLY;
  48. import static org.eclipse.jgit.internal.ketch.Proposal.State.QUEUED;

  49. import java.io.IOException;
  50. import java.text.MessageFormat;
  51. import java.util.ArrayList;
  52. import java.util.Arrays;
  53. import java.util.Collection;
  54. import java.util.List;
  55. import java.util.concurrent.locks.Lock;
  56. import java.util.concurrent.locks.ReentrantLock;

  57. import org.eclipse.jgit.internal.storage.reftree.RefTree;
  58. import org.eclipse.jgit.lib.ObjectId;
  59. import org.eclipse.jgit.lib.Repository;
  60. import org.eclipse.jgit.revwalk.RevCommit;
  61. import org.eclipse.jgit.revwalk.RevWalk;
  62. import org.slf4j.Logger;
  63. import org.slf4j.LoggerFactory;

  64. /**
  65.  * A leader managing consensus across remote followers.
  66.  * <p>
  67.  * A leader instance starts up in
  68.  * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#CANDIDATE} and tries
  69.  * to begin a new term by sending an
  70.  * {@link org.eclipse.jgit.internal.ketch.ElectionRound} to all replicas. Its
  71.  * term starts if a majority of replicas have accepted this leader instance for
  72.  * the term.
  73.  * <p>
  74.  * Once elected by a majority the instance enters
  75.  * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#LEADER} and runs
  76.  * proposals offered to {@link #queueProposal(Proposal)}. This continues until
  77.  * the leader is timed out for inactivity, or is deposed by a competing leader
  78.  * gaining its own majority.
  79.  * <p>
  80.  * Once timed out or deposed this {@code KetchLeader} instance should be
  81.  * discarded, and a new instance takes over.
  82.  * <p>
  83.  * Each leader instance coordinates a group of
  84.  * {@link org.eclipse.jgit.internal.ketch.KetchReplica}s. Replica instances are
  85.  * owned by the leader instance and must be discarded when the leader is
  86.  * discarded.
  87.  * <p>
  88.  * In Ketch all push requests are issued through the leader. The steps are as
  89.  * follows (see {@link org.eclipse.jgit.internal.ketch.KetchPreReceive} for an
  90.  * example):
  91.  * <ul>
  92.  * <li>Create a {@link org.eclipse.jgit.internal.ketch.Proposal} with the
  93.  * {@link org.eclipse.jgit.transport.ReceiveCommand}s that represent the push.
  94.  * <li>Invoke {@link #queueProposal(Proposal)} on the leader instance.
  95.  * <li>Wait for consensus with
  96.  * {@link org.eclipse.jgit.internal.ketch.Proposal#await()}.
  97.  * <li>To examine the status of the push, check
  98.  * {@link org.eclipse.jgit.internal.ketch.Proposal#getCommands()}, looking at
  99.  * {@link org.eclipse.jgit.internal.storage.reftree.Command#getResult()}.
  100.  * </ul>
  101.  * <p>
  102.  * The leader gains consensus by first pushing the needed objects and a
  103.  * {@link org.eclipse.jgit.internal.storage.reftree.RefTree} representing the
  104.  * desired target repository state to the {@code refs/txn/accepted} branch on
  105.  * each of the replicas. Once a majority has succeeded, the leader commits the
  106.  * state by either pushing the {@code refs/txn/accepted} value to
  107.  * {@code refs/txn/committed} (for Ketch-aware replicas) or by pushing updates
  108.  * to {@code refs/heads/master}, etc. for stock Git replicas.
  109.  * <p>
  110.  * Internally, the actual transport to replicas is performed on background
  111.  * threads via the {@link org.eclipse.jgit.internal.ketch.KetchSystem}'s
  112.  * executor service. For performance, the
  113.  * {@link org.eclipse.jgit.internal.ketch.KetchLeader},
  114.  * {@link org.eclipse.jgit.internal.ketch.KetchReplica} and
  115.  * {@link org.eclipse.jgit.internal.ketch.Proposal} objects share some state,
  116.  * and may invoke each other's methods on different threads. This access is
  117.  * protected by the leader's {@link #lock} object. Care must be taken to prevent
  118.  * concurrent access by correctly obtaining the leader's lock.
  119.  */
  120. public abstract class KetchLeader {
  121.     private static final Logger log = LoggerFactory.getLogger(KetchLeader.class);

  122.     /** Current state of the leader instance. */
  123.     public static enum State {
  124.         /** Newly created instance trying to elect itself leader. */
  125.         CANDIDATE,

  126.         /** Leader instance elected by a majority. */
  127.         LEADER,

  128.         /** Instance has been deposed by another with a more recent term. */
  129.         DEPOSED,

  130.         /** Leader has been gracefully shutdown, e.g. due to inactivity. */
  131.         SHUTDOWN;
  132.     }

  133.     private final KetchSystem system;

  134.     /** Leader's knowledge of replicas for this repository. */
  135.     private KetchReplica[] voters;
  136.     private KetchReplica[] followers;
  137.     private LocalReplica self;

  138.     /**
  139.      * Lock protecting all data within this leader instance.
  140.      * <p>
  141.      * This lock extends into the {@link KetchReplica} instances used by the
  142.      * leader. They share the same lock instance to simplify concurrency.
  143.      */
  144.     final Lock lock;

  145.     private State state = CANDIDATE;

  146.     /** Term of this leader, once elected. */
  147.     private long term;

  148.     /**
  149.      * Pending proposals accepted into the queue in FIFO order.
  150.      * <p>
  151.      * These proposals were preflighted and do not contain any conflicts with
  152.      * each other and their expectations matched the leader's local view of the
  153.      * agreed upon {@code refs/txn/accepted} tree.
  154.      */
  155.     private final List<Proposal> queued;

  156.     /**
  157.      * State of the repository's RefTree after applying all entries in
  158.      * {@link #queued}. New proposals must be consistent with this tree to be
  159.      * appended to the end of {@link #queued}.
  160.      * <p>
  161.      * Must be deep-copied with {@link RefTree#copy()} if
  162.      * {@link #roundHoldsReferenceToRefTree} is {@code true}.
  163.      */
  164.     private RefTree refTree;

  165.     /**
  166.      * If {@code true} {@link #refTree} must be duplicated before queuing the
  167.      * next proposal. The {@link #refTree} was passed into the constructor of a
  168.      * {@link ProposalRound}, and that external reference to the {@link RefTree}
  169.      * object is held by the proposal until it materializes the tree object in
  170.      * the object store. This field is set {@code true} when the proposal begins
  171.      * execution and set {@code false} once tree objects are persisted in the
  172.      * local repository's object store or {@link #refTree} is replaced with a
  173.      * copy to isolate it from any running rounds.
  174.      * <p>
  175.      * If proposals arrive less frequently than the {@code RefTree} is written
  176.      * out to the repository the {@link #roundHoldsReferenceToRefTree} behavior
  177.      * avoids duplicating {@link #refTree}, reducing both time and memory used.
  178.      * However if proposals arrive more frequently {@link #refTree} must be
  179.      * duplicated to prevent newly queued proposals from corrupting the
  180.      * {@link #runningRound}.
  181.      */
  182.     volatile boolean roundHoldsReferenceToRefTree;

  183.     /** End of the leader's log. */
  184.     private LogIndex headIndex;

  185.     /** Leader knows this (and all prior) states are committed. */
  186.     private LogIndex committedIndex;

  187.     /**
  188.      * Is the leader idle with no work pending? If {@code true} there is no work
  189.      * for the leader (normal state). This field is {@code false} when the
  190.      * leader thread is scheduled for execution, or while {@link #runningRound}
  191.      * defines a round in progress.
  192.      */
  193.     private boolean idle;

  194.     /** Current round the leader is preparing and waiting for a vote on. */
  195.     private Round runningRound;

  196.     /**
  197.      * Construct a leader for a Ketch instance.
  198.      *
  199.      * @param system
  200.      *            Ketch system configuration the leader must adhere to.
  201.      */
  202.     protected KetchLeader(KetchSystem system) {
  203.         this.system = system;
  204.         this.lock = new ReentrantLock(true /* fair */);
  205.         this.queued = new ArrayList<>(4);
  206.         this.idle = true;
  207.     }

  208.     /** @return system configuration. */
  209.     KetchSystem getSystem() {
  210.         return system;
  211.     }

  212.     /**
  213.      * Configure the replicas used by this Ketch instance.
  214.      * <p>
  215.      * Replicas should be configured once at creation before any proposals are
  216.      * executed. Once elections happen, <b>reconfiguration is a complicated
  217.      * concept that is not currently supported</b>.
  218.      *
  219.      * @param replicas
  220.      *            members participating with the same repository.
  221.      */
  222.     public void setReplicas(Collection<KetchReplica> replicas) {
  223.         List<KetchReplica> v = new ArrayList<>(5);
  224.         List<KetchReplica> f = new ArrayList<>(5);
  225.         for (KetchReplica r : replicas) {
  226.             switch (r.getParticipation()) {
  227.             case FULL:
  228.                 v.add(r);
  229.                 break;

  230.             case FOLLOWER_ONLY:
  231.                 f.add(r);
  232.                 break;
  233.             }
  234.         }

  235.         Collection<Integer> validVoters = validVoterCounts();
  236.         if (!validVoters.contains(Integer.valueOf(v.size()))) {
  237.             throw new IllegalArgumentException(MessageFormat.format(
  238.                     KetchText.get().unsupportedVoterCount,
  239.                     Integer.valueOf(v.size()),
  240.                     validVoters));
  241.         }

  242.         LocalReplica me = findLocal(v);
  243.         if (me == null) {
  244.             throw new IllegalArgumentException(
  245.                     KetchText.get().localReplicaRequired);
  246.         }

  247.         lock.lock();
  248.         try {
  249.             voters = v.toArray(new KetchReplica[0]);
  250.             followers = f.toArray(new KetchReplica[0]);
  251.             self = me;
  252.         } finally {
  253.             lock.unlock();
  254.         }
  255.     }

  256.     private static Collection<Integer> validVoterCounts() {
  257.         @SuppressWarnings("boxing")
  258.         Integer[] valid = {
  259.                 // An odd number of voting replicas is required.
  260.                 1, 3, 5, 7, 9 };
  261.         return Arrays.asList(valid);
  262.     }

  263.     private static LocalReplica findLocal(Collection<KetchReplica> voters) {
  264.         for (KetchReplica r : voters) {
  265.             if (r instanceof LocalReplica) {
  266.                 return (LocalReplica) r;
  267.             }
  268.         }
  269.         return null;
  270.     }

  271.     /**
  272.      * Get an instance of the repository for use by a leader thread.
  273.      * <p>
  274.      * The caller will close the repository.
  275.      *
  276.      * @return opened repository for use by the leader thread.
  277.      * @throws java.io.IOException
  278.      *             cannot reopen the repository for the leader.
  279.      */
  280.     protected abstract Repository openRepository() throws IOException;

  281.     /**
  282.      * Queue a reference update proposal for consensus.
  283.      * <p>
  284.      * This method does not wait for consensus to be reached. The proposal is
  285.      * checked to look for risks of conflicts, and then submitted into the queue
  286.      * for distribution as soon as possible.
  287.      * <p>
  288.      * Callers must use {@link org.eclipse.jgit.internal.ketch.Proposal#await()}
  289.      * to see if the proposal is done.
  290.      *
  291.      * @param proposal
  292.      *            the proposed reference updates to queue for consideration.
  293.      *            Once execution is complete the individual reference result
  294.      *            fields will be populated with the outcome.
  295.      * @throws java.lang.InterruptedException
  296.      *             current thread was interrupted. The proposal may have been
  297.      *             aborted if it was not yet queued for execution.
  298.      * @throws java.io.IOException
  299.      *             unrecoverable error preventing proposals from being attempted
  300.      *             by this leader.
  301.      */
  302.     public void queueProposal(Proposal proposal)
  303.             throws InterruptedException, IOException {
  304.         try {
  305.             lock.lockInterruptibly();
  306.         } catch (InterruptedException e) {
  307.             proposal.abort();
  308.             throw e;
  309.         }
  310.         try {
  311.             if (refTree == null) {
  312.                 initialize();
  313.                 for (Proposal p : queued) {
  314.                     refTree.apply(p.getCommands());
  315.                 }
  316.             } else if (roundHoldsReferenceToRefTree) {
  317.                 refTree = refTree.copy();
  318.                 roundHoldsReferenceToRefTree = false;
  319.             }

  320.             if (!refTree.apply(proposal.getCommands())) {
  321.                 // A conflict exists so abort the proposal.
  322.                 proposal.abort();
  323.                 return;
  324.             }

  325.             queued.add(proposal);
  326.             proposal.notifyState(QUEUED);

  327.             if (idle) {
  328.                 scheduleLeader();
  329.             }
  330.         } finally {
  331.             lock.unlock();
  332.         }
  333.     }

  334.     private void initialize() throws IOException {
  335.         try (Repository git = openRepository(); RevWalk rw = new RevWalk(git)) {
  336.             self.initialize(git);

  337.             ObjectId accepted = self.getTxnAccepted();
  338.             if (!ObjectId.zeroId().equals(accepted)) {
  339.                 RevCommit c = rw.parseCommit(accepted);
  340.                 headIndex = LogIndex.unknown(accepted);
  341.                 refTree = RefTree.read(rw.getObjectReader(), c.getTree());
  342.             } else {
  343.                 headIndex = LogIndex.unknown(ObjectId.zeroId());
  344.                 refTree = RefTree.newEmptyTree();
  345.             }
  346.         }
  347.     }

  348.     private void scheduleLeader() {
  349.         idle = false;
  350.         system.getExecutor().execute(new Runnable() {
  351.             @Override
  352.             public void run() {
  353.                 runLeader();
  354.             }
  355.         });
  356.     }

  357.     private void runLeader() {
  358.         Round round;
  359.         lock.lock();
  360.         try {
  361.             switch (state) {
  362.             case CANDIDATE:
  363.                 round = new ElectionRound(this, headIndex);
  364.                 break;

  365.             case LEADER:
  366.                 round = newProposalRound();
  367.                 break;

  368.             case DEPOSED:
  369.             case SHUTDOWN:
  370.             default:
  371.                 log.warn("Leader cannot run {}", state); //$NON-NLS-1$
  372.                 // TODO(sop): Redirect proposals.
  373.                 return;
  374.             }
  375.         } finally {
  376.             lock.unlock();
  377.         }

  378.         try {
  379.             round.start();
  380.         } catch (IOException e) {
  381.             // TODO(sop) Depose leader if it cannot use its repository.
  382.             log.error(KetchText.get().leaderFailedToStore, e);
  383.             lock.lock();
  384.             try {
  385.                 nextRound();
  386.             } finally {
  387.                 lock.unlock();
  388.             }
  389.         }
  390.     }

  391.     private ProposalRound newProposalRound() {
  392.         List<Proposal> todo = new ArrayList<>(queued);
  393.         queued.clear();
  394.         roundHoldsReferenceToRefTree = true;
  395.         return new ProposalRound(this, headIndex, todo, refTree);
  396.     }

  397.     /** @return term of this leader's reign. */
  398.     long getTerm() {
  399.         return term;
  400.     }

  401.     /** @return end of the leader's log. */
  402.     LogIndex getHead() {
  403.         return headIndex;
  404.     }

  405.     /**
  406.      * @return state leader knows it has committed across a quorum of replicas.
  407.      */
  408.     LogIndex getCommitted() {
  409.         return committedIndex;
  410.     }

  411.     boolean isIdle() {
  412.         return idle;
  413.     }

  414.     void runAsync(Round round) {
  415.         lock.lock();
  416.         try {
  417.             // End of the log is this round. Once transport begins it is
  418.             // reasonable to assume at least one replica will eventually get
  419.             // this, and there is reasonable probability it commits.
  420.             headIndex = round.acceptedNewIndex;
  421.             runningRound = round;

  422.             for (KetchReplica replica : voters) {
  423.                 replica.pushTxnAcceptedAsync(round);
  424.             }
  425.             for (KetchReplica replica : followers) {
  426.                 replica.pushTxnAcceptedAsync(round);
  427.             }
  428.         } finally {
  429.             lock.unlock();
  430.         }
  431.     }

  432.     /**
  433.      * Asynchronous signal from a replica after completion.
  434.      * <p>
  435.      * Must be called while {@link #lock} is held by the replica.
  436.      *
  437.      * @param replica
  438.      *            replica posting a completion event.
  439.      */
  440.     void onReplicaUpdate(KetchReplica replica) {
  441.         if (log.isDebugEnabled()) {
  442.             log.debug("Replica {} finished:\n{}", //$NON-NLS-1$
  443.                     replica.describeForLog(), snapshot());
  444.         }

  445.         if (replica.getParticipation() == FOLLOWER_ONLY) {
  446.             // Followers cannot vote, so votes haven't changed.
  447.             return;
  448.         } else if (runningRound == null) {
  449.             // No round running, no need to tally votes.
  450.             return;
  451.         }

  452.         assert headIndex.equals(runningRound.acceptedNewIndex);
  453.         int matching = 0;
  454.         for (KetchReplica r : voters) {
  455.             if (r.hasAccepted(headIndex)) {
  456.                 matching++;
  457.             }
  458.         }

  459.         int quorum = voters.length / 2 + 1;
  460.         boolean success = matching >= quorum;
  461.         if (!success) {
  462.             return;
  463.         }

  464.         switch (state) {
  465.         case CANDIDATE:
  466.             term = ((ElectionRound) runningRound).getTerm();
  467.             state = LEADER;
  468.             if (log.isDebugEnabled()) {
  469.                 log.debug("Won election, running term " + term); //$NON-NLS-1$
  470.             }

  471.             //$FALL-THROUGH$
  472.         case LEADER:
  473.             committedIndex = headIndex;
  474.             if (log.isDebugEnabled()) {
  475.                 log.debug("Committed {} in term {}", //$NON-NLS-1$
  476.                         committedIndex.describeForLog(),
  477.                         Long.valueOf(term));
  478.             }
  479.             nextRound();
  480.             commitAsync(replica);
  481.             notifySuccess(runningRound);
  482.             if (log.isDebugEnabled()) {
  483.                 log.debug("Leader state:\n{}", snapshot()); //$NON-NLS-1$
  484.             }
  485.             break;

  486.         default:
  487.             log.debug("Leader ignoring replica while in {}", state); //$NON-NLS-1$
  488.             break;
  489.         }
  490.     }

  491.     private void notifySuccess(Round round) {
  492.         // Drop the leader lock while notifying Proposal listeners.
  493.         lock.unlock();
  494.         try {
  495.             round.success();
  496.         } finally {
  497.             lock.lock();
  498.         }
  499.     }

  500.     private void commitAsync(KetchReplica caller) {
  501.         for (KetchReplica r : voters) {
  502.             if (r == caller) {
  503.                 continue;
  504.             }
  505.             if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
  506.                 r.pushCommitAsync(committedIndex);
  507.             }
  508.         }
  509.         for (KetchReplica r : followers) {
  510.             if (r == caller) {
  511.                 continue;
  512.             }
  513.             if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
  514.                 r.pushCommitAsync(committedIndex);
  515.             }
  516.         }
  517.     }

  518.     /** Schedule the next round; invoked while {@link #lock} is held. */
  519.     void nextRound() {
  520.         runningRound = null;

  521.         if (queued.isEmpty()) {
  522.             idle = true;
  523.         } else {
  524.             // Caller holds lock. Reschedule leader on a new thread so
  525.             // the call stack can unwind and lock is not held unexpectedly
  526.             // during prepare for the next round.
  527.             scheduleLeader();
  528.         }
  529.     }

  530.     /**
  531.      * Snapshot this leader
  532.      *
  533.      * @return snapshot of this leader
  534.      */
  535.     public LeaderSnapshot snapshot() {
  536.         lock.lock();
  537.         try {
  538.             LeaderSnapshot s = new LeaderSnapshot();
  539.             s.state = state;
  540.             s.term = term;
  541.             s.headIndex = headIndex;
  542.             s.committedIndex = committedIndex;
  543.             s.idle = isIdle();
  544.             for (KetchReplica r : voters) {
  545.                 s.replicas.add(r.snapshot());
  546.             }
  547.             for (KetchReplica r : followers) {
  548.                 s.replicas.add(r.snapshot());
  549.             }
  550.             return s;
  551.         } finally {
  552.             lock.unlock();
  553.         }
  554.     }

  555.     /**
  556.      * Gracefully shutdown this leader and cancel outstanding operations.
  557.      */
  558.     public void shutdown() {
  559.         lock.lock();
  560.         try {
  561.             if (state != SHUTDOWN) {
  562.                 state = SHUTDOWN;
  563.                 for (KetchReplica r : voters) {
  564.                     r.shutdown();
  565.                 }
  566.                 for (KetchReplica r : followers) {
  567.                     r.shutdown();
  568.                 }
  569.             }
  570.         } finally {
  571.             lock.unlock();
  572.         }
  573.     }

  574.     /** {@inheritDoc} */
  575.     @Override
  576.     public String toString() {
  577.         return snapshot().toString();
  578.     }
  579. }