BasePackFetchConnection.java

  1. /*
  2.  * Copyright (C) 2008, 2010 Google Inc.
  3.  * Copyright (C) 2008, Robin Rosenberg <robin.rosenberg@dewire.com>
  4.  * Copyright (C) 2008, 2020 Shawn O. Pearce <spearce@spearce.org> and others
  5.  *
  6.  * This program and the accompanying materials are made available under the
  7.  * terms of the Eclipse Distribution License v. 1.0 which is available at
  8.  * https://www.eclipse.org/org/documents/edl-v10.php.
  9.  *
  10.  * SPDX-License-Identifier: BSD-3-Clause
  11.  */

  12. package org.eclipse.jgit.transport;

  13. import java.io.IOException;
  14. import java.io.InputStream;
  15. import java.io.OutputStream;
  16. import java.text.MessageFormat;
  17. import java.util.Arrays;
  18. import java.util.Collection;
  19. import java.util.Collections;
  20. import java.util.Date;
  21. import java.util.HashSet;
  22. import java.util.LinkedHashSet;
  23. import java.util.Set;

  24. import org.eclipse.jgit.errors.PackProtocolException;
  25. import org.eclipse.jgit.errors.RemoteRepositoryException;
  26. import org.eclipse.jgit.errors.TransportException;
  27. import org.eclipse.jgit.internal.JGitText;
  28. import org.eclipse.jgit.internal.storage.file.PackLock;
  29. import org.eclipse.jgit.lib.AnyObjectId;
  30. import org.eclipse.jgit.lib.Config;
  31. import org.eclipse.jgit.lib.MutableObjectId;
  32. import org.eclipse.jgit.lib.NullProgressMonitor;
  33. import org.eclipse.jgit.lib.ObjectId;
  34. import org.eclipse.jgit.lib.ObjectInserter;
  35. import org.eclipse.jgit.lib.ProgressMonitor;
  36. import org.eclipse.jgit.lib.Ref;
  37. import org.eclipse.jgit.revwalk.RevCommit;
  38. import org.eclipse.jgit.revwalk.RevCommitList;
  39. import org.eclipse.jgit.revwalk.RevFlag;
  40. import org.eclipse.jgit.revwalk.RevObject;
  41. import org.eclipse.jgit.revwalk.RevSort;
  42. import org.eclipse.jgit.revwalk.RevWalk;
  43. import org.eclipse.jgit.revwalk.filter.CommitTimeRevFilter;
  44. import org.eclipse.jgit.revwalk.filter.RevFilter;
  45. import org.eclipse.jgit.transport.GitProtocolConstants.MultiAck;
  46. import org.eclipse.jgit.transport.PacketLineIn.AckNackResult;
  47. import org.eclipse.jgit.util.StringUtils;
  48. import org.eclipse.jgit.util.TemporaryBuffer;

  49. /**
  50.  * Fetch implementation using the native Git pack transfer service.
  51.  * <p>
  52.  * This is the canonical implementation for transferring objects from the remote
  53.  * repository to the local repository by talking to the 'git-upload-pack'
  54.  * service. Objects are packed on the remote side into a pack file and then sent
  55.  * down the pipe to us.
  56.  * <p>
  57.  * This connection requires only a bi-directional pipe or socket, and thus is
  58.  * easily wrapped up into a local process pipe, anonymous TCP socket, or a
  59.  * command executed through an SSH tunnel.
  60.  * <p>
  61.  * If {@link org.eclipse.jgit.transport.BasePackConnection#statelessRPC} is
  62.  * {@code true}, this connection can be tunneled over a request-response style
  63.  * RPC system like HTTP. The RPC call boundary is determined by this class
  64.  * switching from writing to the OutputStream to reading from the InputStream.
  65.  * <p>
  66.  * Concrete implementations should just call
  67.  * {@link #init(java.io.InputStream, java.io.OutputStream)} and
  68.  * {@link #readAdvertisedRefs()} methods in constructor or before any use. They
  69.  * should also handle resources releasing in {@link #close()} method if needed.
  70.  */
  71. public abstract class BasePackFetchConnection extends BasePackConnection
  72.         implements FetchConnection {
  73.     /**
  74.      * Maximum number of 'have' lines to send before giving up.
  75.      * <p>
  76.      * During {@link #negotiate(ProgressMonitor)} we send at most this many
  77.      * commits to the remote peer as 'have' lines without an ACK response before
  78.      * we give up.
  79.      */
  80.     private static final int MAX_HAVES = 256;

  81.     /**
  82.      * Amount of data the client sends before starting to read.
  83.      * <p>
  84.      * Any output stream given to the client must be able to buffer this many
  85.      * bytes before the client will stop writing and start reading from the
  86.      * input stream. If the output stream blocks before this many bytes are in
  87.      * the send queue, the system will deadlock.
  88.      */
  89.     protected static final int MIN_CLIENT_BUFFER = 2 * 32 * 46 + 8;

  90.     /**
  91.      * Include tags if we are also including the referenced objects.
  92.      * @since 2.0
  93.      */
  94.     public static final String OPTION_INCLUDE_TAG = GitProtocolConstants.OPTION_INCLUDE_TAG;

  95.     /**
  96.      * Multi-ACK support for improved negotiation.
  97.      * @since 2.0
  98.      */
  99.     public static final String OPTION_MULTI_ACK = GitProtocolConstants.OPTION_MULTI_ACK;

  100.     /**
  101.      * Multi-ACK detailed support for improved negotiation.
  102.      * @since 2.0
  103.      */
  104.     public static final String OPTION_MULTI_ACK_DETAILED = GitProtocolConstants.OPTION_MULTI_ACK_DETAILED;

  105.     /**
  106.      * The client supports packs with deltas but not their bases.
  107.      * @since 2.0
  108.      */
  109.     public static final String OPTION_THIN_PACK = GitProtocolConstants.OPTION_THIN_PACK;

  110.     /**
  111.      * The client supports using the side-band for progress messages.
  112.      * @since 2.0
  113.      */
  114.     public static final String OPTION_SIDE_BAND = GitProtocolConstants.OPTION_SIDE_BAND;

  115.     /**
  116.      * The client supports using the 64K side-band for progress messages.
  117.      * @since 2.0
  118.      */
  119.     public static final String OPTION_SIDE_BAND_64K = GitProtocolConstants.OPTION_SIDE_BAND_64K;

  120.     /**
  121.      * The client supports packs with OFS deltas.
  122.      * @since 2.0
  123.      */
  124.     public static final String OPTION_OFS_DELTA = GitProtocolConstants.OPTION_OFS_DELTA;

  125.     /**
  126.      * The client supports shallow fetches.
  127.      * @since 2.0
  128.      */
  129.     public static final String OPTION_SHALLOW = GitProtocolConstants.OPTION_SHALLOW;

  130.     /**
  131.      * The client does not want progress messages and will ignore them.
  132.      * @since 2.0
  133.      */
  134.     public static final String OPTION_NO_PROGRESS = GitProtocolConstants.OPTION_NO_PROGRESS;

  135.     /**
  136.      * The client supports receiving a pack before it has sent "done".
  137.      * @since 2.0
  138.      */
  139.     public static final String OPTION_NO_DONE = GitProtocolConstants.OPTION_NO_DONE;

  140.     /**
  141.      * The client supports fetching objects at the tip of any ref, even if not
  142.      * advertised.
  143.      * @since 3.1
  144.      */
  145.     public static final String OPTION_ALLOW_TIP_SHA1_IN_WANT = GitProtocolConstants.OPTION_ALLOW_TIP_SHA1_IN_WANT;

  146.     /**
  147.      * The client supports fetching objects that are reachable from a tip of a
  148.      * ref that is allowed to fetch.
  149.      * @since 4.1
  150.      */
  151.     public static final String OPTION_ALLOW_REACHABLE_SHA1_IN_WANT = GitProtocolConstants.OPTION_ALLOW_REACHABLE_SHA1_IN_WANT;

  152.     /**
  153.      * The client specified a filter expression.
  154.      *
  155.      * @since 5.0
  156.      */
  157.     public static final String OPTION_FILTER = GitProtocolConstants.OPTION_FILTER;

  158.     private final RevWalk walk;

  159.     /** All commits that are immediately reachable by a local ref. */
  160.     private RevCommitList<RevCommit> reachableCommits;

  161.     /** Marks an object as having all its dependencies. */
  162.     final RevFlag REACHABLE;

  163.     /** Marks a commit known to both sides of the connection. */
  164.     final RevFlag COMMON;

  165.     /** Like {@link #COMMON} but means its also in {@link #pckState}. */
  166.     private final RevFlag STATE;

  167.     /** Marks a commit listed in the advertised refs. */
  168.     final RevFlag ADVERTISED;

  169.     private MultiAck multiAck = MultiAck.OFF;

  170.     private boolean thinPack;

  171.     private boolean sideband;

  172.     private boolean includeTags;

  173.     private boolean allowOfsDelta;

  174.     private boolean noDone;

  175.     private boolean noProgress;

  176.     private String lockMessage;

  177.     private PackLock packLock;

  178.     private int maxHaves;

  179.     /**
  180.      * RPC state, if {@link BasePackConnection#statelessRPC} is true or protocol
  181.      * V2 is used.
  182.      */
  183.     private TemporaryBuffer.Heap state;

  184.     private PacketLineOut pckState;

  185.     /**
  186.      * Either FilterSpec.NO_FILTER for a filter that doesn't filter
  187.      * anything, or a filter that indicates what and what not to send to the
  188.      * server.
  189.      */
  190.     private final FilterSpec filterSpec;

  191.     /**
  192.      * Create a new connection to fetch using the native git transport.
  193.      *
  194.      * @param packTransport
  195.      *            the transport.
  196.      */
  197.     public BasePackFetchConnection(PackTransport packTransport) {
  198.         super(packTransport);

  199.         if (local != null) {
  200.             final FetchConfig cfg = getFetchConfig();
  201.             allowOfsDelta = cfg.allowOfsDelta;
  202.             maxHaves = cfg.maxHaves;
  203.         } else {
  204.             allowOfsDelta = true;
  205.             maxHaves = Integer.MAX_VALUE;
  206.         }

  207.         includeTags = transport.getTagOpt() != TagOpt.NO_TAGS;
  208.         thinPack = transport.isFetchThin();
  209.         filterSpec = transport.getFilterSpec();

  210.         if (local != null) {
  211.             walk = new RevWalk(local);
  212.             walk.setRetainBody(false);
  213.             reachableCommits = new RevCommitList<>();
  214.             REACHABLE = walk.newFlag("REACHABLE"); //$NON-NLS-1$
  215.             COMMON = walk.newFlag("COMMON"); //$NON-NLS-1$
  216.             STATE = walk.newFlag("STATE"); //$NON-NLS-1$
  217.             ADVERTISED = walk.newFlag("ADVERTISED"); //$NON-NLS-1$

  218.             walk.carry(COMMON);
  219.             walk.carry(REACHABLE);
  220.             walk.carry(ADVERTISED);
  221.         } else {
  222.             walk = null;
  223.             REACHABLE = null;
  224.             COMMON = null;
  225.             STATE = null;
  226.             ADVERTISED = null;
  227.         }
  228.     }

  229.     static class FetchConfig {
  230.         final boolean allowOfsDelta;

  231.         final int maxHaves;

  232.         FetchConfig(Config c) {
  233.             allowOfsDelta = c.getBoolean("repack", "usedeltabaseoffset", true); //$NON-NLS-1$ //$NON-NLS-2$
  234.             maxHaves = c.getInt("fetch", "maxhaves", Integer.MAX_VALUE); //$NON-NLS-1$ //$NON-NLS-2$
  235.         }

  236.         FetchConfig(boolean allowOfsDelta, int maxHaves) {
  237.             this.allowOfsDelta = allowOfsDelta;
  238.             this.maxHaves = maxHaves;
  239.         }
  240.     }

  241.     /** {@inheritDoc} */
  242.     @Override
  243.     public final void fetch(final ProgressMonitor monitor,
  244.             final Collection<Ref> want, final Set<ObjectId> have)
  245.             throws TransportException {
  246.         fetch(monitor, want, have, null);
  247.     }

  248.     /** {@inheritDoc} */
  249.     @Override
  250.     public final void fetch(final ProgressMonitor monitor,
  251.             final Collection<Ref> want, final Set<ObjectId> have,
  252.             OutputStream outputStream) throws TransportException {
  253.         markStartedOperation();
  254.         doFetch(monitor, want, have, outputStream);
  255.     }

  256.     /** {@inheritDoc} */
  257.     @Override
  258.     public boolean didFetchIncludeTags() {
  259.         return false;
  260.     }

  261.     /** {@inheritDoc} */
  262.     @Override
  263.     public boolean didFetchTestConnectivity() {
  264.         return false;
  265.     }

  266.     /** {@inheritDoc} */
  267.     @Override
  268.     public void setPackLockMessage(String message) {
  269.         lockMessage = message;
  270.     }

  271.     /** {@inheritDoc} */
  272.     @Override
  273.     public Collection<PackLock> getPackLocks() {
  274.         if (packLock != null)
  275.             return Collections.singleton(packLock);
  276.         return Collections.<PackLock> emptyList();
  277.     }

  278.     private void clearState() {
  279.         walk.dispose();
  280.         reachableCommits = null;
  281.         state = null;
  282.         pckState = null;
  283.     }

  284.     /**
  285.      * Execute common ancestor negotiation and fetch the objects.
  286.      *
  287.      * @param monitor
  288.      *            progress monitor to receive status updates. If the monitor is
  289.      *            the {@link org.eclipse.jgit.lib.NullProgressMonitor#INSTANCE}, then the no-progress
  290.      *            option enabled.
  291.      * @param want
  292.      *            the advertised remote references the caller wants to fetch.
  293.      * @param have
  294.      *            additional objects to assume that already exist locally. This
  295.      *            will be added to the set of objects reachable from the
  296.      *            destination repository's references.
  297.      * @param outputStream
  298.      *            ouputStream to write sideband messages to
  299.      * @throws org.eclipse.jgit.errors.TransportException
  300.      *             if any exception occurs.
  301.      * @since 3.0
  302.      */
  303.     protected void doFetch(final ProgressMonitor monitor,
  304.             final Collection<Ref> want, final Set<ObjectId> have,
  305.             OutputStream outputStream) throws TransportException {
  306.         try {
  307.             noProgress = monitor == NullProgressMonitor.INSTANCE;

  308.             markRefsAdvertised();
  309.             markReachable(have, maxTimeWanted(want));

  310.             if (TransferConfig.ProtocolVersion.V2
  311.                     .equals(getProtocolVersion())) {
  312.                 // Protocol V2 always is a "stateless" protocol, even over a
  313.                 // bidirectional pipe: the server serves one "fetch" request and
  314.                 // then forgets anything it has learned, so the next fetch
  315.                 // request has to re-send all wants and previously determined
  316.                 // common objects as "have"s again.
  317.                 state = new TemporaryBuffer.Heap(Integer.MAX_VALUE);
  318.                 pckState = new PacketLineOut(state);
  319.                 try {
  320.                     doFetchV2(monitor, want, outputStream);
  321.                 } finally {
  322.                     clearState();
  323.                 }
  324.                 return;
  325.             }
  326.             // Protocol V0/1
  327.             if (statelessRPC) {
  328.                 state = new TemporaryBuffer.Heap(Integer.MAX_VALUE);
  329.                 pckState = new PacketLineOut(state);
  330.             }
  331.             PacketLineOut output = statelessRPC ? pckState : pckOut;
  332.             if (sendWants(want, output)) {
  333.                 output.end();
  334.                 outNeedsEnd = false;
  335.                 negotiate(monitor);

  336.                 clearState();

  337.                 receivePack(monitor, outputStream);
  338.             }
  339.         } catch (CancelledException ce) {
  340.             close();
  341.             return; // Caller should test (or just know) this themselves.
  342.         } catch (IOException | RuntimeException err) {
  343.             close();
  344.             throw new TransportException(err.getMessage(), err);
  345.         }
  346.     }

  347.     private void doFetchV2(ProgressMonitor monitor, Collection<Ref> want,
  348.             OutputStream outputStream) throws IOException, CancelledException {
  349.         sideband = true;
  350.         negotiateBegin();

  351.         pckState.writeString("command=" + GitProtocolConstants.COMMAND_FETCH); //$NON-NLS-1$
  352.         // Capabilities are sent as command arguments in protocol V2
  353.         String agent = UserAgent.get();
  354.         if (agent != null && isCapableOf(GitProtocolConstants.OPTION_AGENT)) {
  355.             pckState.writeString(
  356.                     GitProtocolConstants.OPTION_AGENT + '=' + agent);
  357.         }
  358.         Set<String> capabilities = new HashSet<>();
  359.         String advertised = getCapability(GitProtocolConstants.COMMAND_FETCH);
  360.         if (!StringUtils.isEmptyOrNull(advertised)) {
  361.             capabilities.addAll(Arrays.asList(advertised.split("\\s+"))); //$NON-NLS-1$
  362.         }
  363.         // Arguments
  364.         pckState.writeDelim();
  365.         for (String capability : getCapabilitiesV2(capabilities)) {
  366.             pckState.writeString(capability);
  367.         }
  368.         if (!sendWants(want, pckState)) {
  369.             // We already have everything we wanted.
  370.             return;
  371.         }
  372.         // If we send something, we always close it properly ourselves.
  373.         outNeedsEnd = false;

  374.         FetchStateV2 fetchState = new FetchStateV2();
  375.         boolean sentDone = false;
  376.         for (;;) {
  377.             // The "state" buffer contains the full fetch request with all
  378.             // common objects found so far.
  379.             state.writeTo(out, monitor);
  380.             sentDone = sendNextHaveBatch(fetchState, pckOut, monitor);
  381.             if (sentDone) {
  382.                 break;
  383.             }
  384.             if (readAcknowledgments(fetchState, pckIn, monitor)) {
  385.                 // We got a "ready": next should be a patch file.
  386.                 break;
  387.             }
  388.             // Note: C git reads and requires here (and after a packfile) a
  389.             // "0002" packet in stateless RPC transports (https). This "response
  390.             // end" packet is even mentioned in the protocol V2 technical
  391.             // documentation. However, it is not actually part of the public
  392.             // protocol; it occurs only in an internal protocol wrapper in the C
  393.             // git implementation.
  394.         }
  395.         clearState();
  396.         String line = pckIn.readString();
  397.         // If we sent a done, we may have an error reply here.
  398.         if (sentDone && line.startsWith("ERR ")) { //$NON-NLS-1$
  399.             throw new RemoteRepositoryException(uri, line.substring(4));
  400.         }
  401.         // "shallow-info", "wanted-refs", and "packfile-uris" would have to be
  402.         // handled here in that order.
  403.         if (!GitProtocolConstants.SECTION_PACKFILE.equals(line)) {
  404.             throw new PackProtocolException(
  405.                     MessageFormat.format(JGitText.get().expectedGot,
  406.                             GitProtocolConstants.SECTION_PACKFILE, line));
  407.         }
  408.         receivePack(monitor, outputStream);
  409.     }

  410.     /**
  411.      * Sends the next batch of "have"s and terminates the {@code output}.
  412.      *
  413.      * @param fetchState
  414.      *            is updated with information about the number of items written,
  415.      *            and whether to expect a packfile next
  416.      * @param output
  417.      *            to write to
  418.      * @param monitor
  419.      *            for progress reporting and cancellation
  420.      * @return {@code true} if a "done" was written and we should thus expect a
  421.      *         packfile next
  422.      * @throws IOException
  423.      *             on errors
  424.      * @throws CancelledException
  425.      *             on cancellation
  426.      */
  427.     private boolean sendNextHaveBatch(FetchStateV2 fetchState,
  428.             PacketLineOut output, ProgressMonitor monitor)
  429.             throws IOException, CancelledException {
  430.         long n = 0;
  431.         while (n < fetchState.havesToSend) {
  432.             final RevCommit c = walk.next();
  433.             if (c == null) {
  434.                 break;
  435.             }
  436.             output.writeString("have " + c.getId().name() + '\n'); //$NON-NLS-1$
  437.             n++;
  438.             if (n % 10 == 0 && monitor.isCancelled()) {
  439.                 throw new CancelledException();
  440.             }
  441.         }
  442.         fetchState.havesTotal += n;
  443.         if (n == 0
  444.                 || (fetchState.hadAcks
  445.                         && fetchState.havesWithoutAck > MAX_HAVES)
  446.                 || fetchState.havesTotal > maxHaves) {
  447.             output.writeString("done\n"); //$NON-NLS-1$
  448.             output.end();
  449.             return true;
  450.         }
  451.         // Increment only after the test above. Of course we have no ACKs yet
  452.         // for the newly added "have"s, so it makes no sense to count them
  453.         // against the MAX_HAVES limit.
  454.         fetchState.havesWithoutAck += n;
  455.         output.end();
  456.         fetchState.incHavesToSend(statelessRPC);
  457.         return false;
  458.     }

  459.     /**
  460.      * Reads and processes acknowledgments, adding ACKed objects as "have"s to
  461.      * the global state {@link TemporaryBuffer}.
  462.      *
  463.      * @param fetchState
  464.      *            to update
  465.      * @param input
  466.      *            to read from
  467.      * @param monitor
  468.      *            for progress reporting and cancellation
  469.      * @return {@code true} if a "ready" was received and a packfile is expected
  470.      *         next
  471.      * @throws IOException
  472.      *             on errors
  473.      * @throws CancelledException
  474.      *             on cancellation
  475.      */
  476.     private boolean readAcknowledgments(FetchStateV2 fetchState,
  477.             PacketLineIn input, ProgressMonitor monitor)
  478.             throws IOException, CancelledException {
  479.         String line = input.readString();
  480.         if (!GitProtocolConstants.SECTION_ACKNOWLEDGMENTS.equals(line)) {
  481.             throw new PackProtocolException(MessageFormat.format(
  482.                     JGitText.get().expectedGot,
  483.                     GitProtocolConstants.SECTION_ACKNOWLEDGMENTS, line));
  484.         }
  485.         MutableObjectId returnedId = new MutableObjectId();
  486.         line = input.readString();
  487.         boolean gotReady = false;
  488.         long n = 0;
  489.         while (!PacketLineIn.isEnd(line) && !PacketLineIn.isDelimiter(line)) {
  490.             AckNackResult ack = PacketLineIn.parseACKv2(line, returnedId);
  491.             // If we got a "ready", we just skip the remaining lines after
  492.             // having checked them for being valid. (Normally, the "ready"
  493.             // should be the last line anyway.)
  494.             if (!gotReady) {
  495.                 if (ack == AckNackResult.ACK_COMMON) {
  496.                     // markCommon appends the object to the "state"
  497.                     markCommon(walk.parseAny(returnedId), ack, true);
  498.                     fetchState.havesWithoutAck = 0;
  499.                     fetchState.hadAcks = true;
  500.                 } else if (ack == AckNackResult.ACK_READY) {
  501.                     gotReady = true;
  502.                 }
  503.             }
  504.             n++;
  505.             if (n % 10 == 0 && monitor.isCancelled()) {
  506.                 throw new CancelledException();
  507.             }
  508.             line = input.readString();
  509.         }
  510.         if (gotReady) {
  511.             if (!PacketLineIn.isDelimiter(line)) {
  512.                 throw new PackProtocolException(MessageFormat
  513.                         .format(JGitText.get().expectedGot, "0001", line)); //$NON-NLS-1$
  514.             }
  515.         } else if (!PacketLineIn.isEnd(line)) {
  516.             throw new PackProtocolException(MessageFormat
  517.                     .format(JGitText.get().expectedGot, "0000", line)); //$NON-NLS-1$
  518.         }
  519.         return gotReady;
  520.     }

  521.     /** {@inheritDoc} */
  522.     @Override
  523.     public void close() {
  524.         if (walk != null)
  525.             walk.close();
  526.         super.close();
  527.     }

  528.     FetchConfig getFetchConfig() {
  529.         return local.getConfig().get(FetchConfig::new);
  530.     }

  531.     private int maxTimeWanted(Collection<Ref> wants) {
  532.         int maxTime = 0;
  533.         for (Ref r : wants) {
  534.             try {
  535.                 final RevObject obj = walk.parseAny(r.getObjectId());
  536.                 if (obj instanceof RevCommit) {
  537.                     final int cTime = ((RevCommit) obj).getCommitTime();
  538.                     if (maxTime < cTime)
  539.                         maxTime = cTime;
  540.                 }
  541.             } catch (IOException error) {
  542.                 // We don't have it, but we want to fetch (thus fixing error).
  543.             }
  544.         }
  545.         return maxTime;
  546.     }

  547.     private void markReachable(Set<ObjectId> have, int maxTime)
  548.             throws IOException {
  549.         for (Ref r : local.getRefDatabase().getRefs()) {
  550.             ObjectId id = r.getPeeledObjectId();
  551.             if (id == null)
  552.                 id = r.getObjectId();
  553.             if (id == null)
  554.                 continue;
  555.             parseReachable(id);
  556.         }

  557.         for (ObjectId id : local.getAdditionalHaves())
  558.             parseReachable(id);

  559.         for (ObjectId id : have)
  560.             parseReachable(id);

  561.         if (maxTime > 0) {
  562.             // Mark reachable commits until we reach maxTime. These may
  563.             // wind up later matching up against things we want and we
  564.             // can avoid asking for something we already happen to have.
  565.             //
  566.             final Date maxWhen = new Date(maxTime * 1000L);
  567.             walk.sort(RevSort.COMMIT_TIME_DESC);
  568.             walk.markStart(reachableCommits);
  569.             walk.setRevFilter(CommitTimeRevFilter.after(maxWhen));
  570.             for (;;) {
  571.                 final RevCommit c = walk.next();
  572.                 if (c == null)
  573.                     break;
  574.                 if (c.has(ADVERTISED) && !c.has(COMMON)) {
  575.                     // This is actually going to be a common commit, but
  576.                     // our peer doesn't know that fact yet.
  577.                     //
  578.                     c.add(COMMON);
  579.                     c.carry(COMMON);
  580.                     reachableCommits.add(c);
  581.                 }
  582.             }
  583.         }
  584.     }

  585.     private void parseReachable(ObjectId id) {
  586.         try {
  587.             RevCommit o = walk.parseCommit(id);
  588.             if (!o.has(REACHABLE)) {
  589.                 o.add(REACHABLE);
  590.                 reachableCommits.add(o);
  591.             }
  592.         } catch (IOException readError) {
  593.             // If we cannot read the value of the ref skip it.
  594.         }
  595.     }

  596.     private boolean sendWants(Collection<Ref> want, PacketLineOut p)
  597.             throws IOException {
  598.         boolean first = true;
  599.         for (Ref r : want) {
  600.             ObjectId objectId = r.getObjectId();
  601.             if (objectId == null) {
  602.                 continue;
  603.             }
  604.             try {
  605.                 if (walk.parseAny(objectId).has(REACHABLE)) {
  606.                     // We already have this object. Asking for it is
  607.                     // not a very good idea.
  608.                     //
  609.                     continue;
  610.                 }
  611.             } catch (IOException err) {
  612.                 // Its OK, we don't have it, but we want to fix that
  613.                 // by fetching the object from the other side.
  614.             }

  615.             final StringBuilder line = new StringBuilder(46);
  616.             line.append("want "); //$NON-NLS-1$
  617.             line.append(objectId.name());
  618.             if (first && TransferConfig.ProtocolVersion.V0
  619.                     .equals(getProtocolVersion())) {
  620.                 line.append(enableCapabilities());
  621.             }
  622.             first = false;
  623.             line.append('\n');
  624.             p.writeString(line.toString());
  625.         }
  626.         if (first) {
  627.             return false;
  628.         }
  629.         if (!filterSpec.isNoOp()) {
  630.             p.writeString(filterSpec.filterLine());
  631.         }
  632.         return true;
  633.     }

  634.     private Set<String> getCapabilitiesV2(Set<String> advertisedCapabilities)
  635.             throws TransportException {
  636.         Set<String> capabilities = new LinkedHashSet<>();
  637.         // Protocol V2 is implicitly capable of all these.
  638.         if (noProgress) {
  639.             capabilities.add(OPTION_NO_PROGRESS);
  640.         }
  641.         if (includeTags) {
  642.             capabilities.add(OPTION_INCLUDE_TAG);
  643.         }
  644.         if (allowOfsDelta) {
  645.             capabilities.add(OPTION_OFS_DELTA);
  646.         }
  647.         if (thinPack) {
  648.             capabilities.add(OPTION_THIN_PACK);
  649.         }
  650.         if (!filterSpec.isNoOp()
  651.                 && !advertisedCapabilities.contains(OPTION_FILTER)) {
  652.             throw new PackProtocolException(uri,
  653.                     JGitText.get().filterRequiresCapability);
  654.         }
  655.         // The FilterSpec will be added later in sendWants().
  656.         return capabilities;
  657.     }

  658.     private String enableCapabilities() throws TransportException {
  659.         final StringBuilder line = new StringBuilder();
  660.         if (noProgress)
  661.             wantCapability(line, OPTION_NO_PROGRESS);
  662.         if (includeTags)
  663.             includeTags = wantCapability(line, OPTION_INCLUDE_TAG);
  664.         if (allowOfsDelta)
  665.             wantCapability(line, OPTION_OFS_DELTA);

  666.         if (wantCapability(line, OPTION_MULTI_ACK_DETAILED)) {
  667.             multiAck = MultiAck.DETAILED;
  668.             if (statelessRPC)
  669.                 noDone = wantCapability(line, OPTION_NO_DONE);
  670.         } else if (wantCapability(line, OPTION_MULTI_ACK))
  671.             multiAck = MultiAck.CONTINUE;
  672.         else
  673.             multiAck = MultiAck.OFF;

  674.         if (thinPack)
  675.             thinPack = wantCapability(line, OPTION_THIN_PACK);
  676.         if (wantCapability(line, OPTION_SIDE_BAND_64K))
  677.             sideband = true;
  678.         else if (wantCapability(line, OPTION_SIDE_BAND))
  679.             sideband = true;

  680.         if (statelessRPC && multiAck != MultiAck.DETAILED) {
  681.             // Our stateless RPC implementation relies upon the detailed
  682.             // ACK status to tell us common objects for reuse in future
  683.             // requests.  If its not enabled, we can't talk to the peer.
  684.             //
  685.             throw new PackProtocolException(uri, MessageFormat.format(
  686.                     JGitText.get().statelessRPCRequiresOptionToBeEnabled,
  687.                     OPTION_MULTI_ACK_DETAILED));
  688.         }

  689.         if (!filterSpec.isNoOp() && !wantCapability(line, OPTION_FILTER)) {
  690.             throw new PackProtocolException(uri,
  691.                     JGitText.get().filterRequiresCapability);
  692.         }

  693.         addUserAgentCapability(line);
  694.         return line.toString();
  695.     }

  696.     private void negotiate(ProgressMonitor monitor) throws IOException,
  697.             CancelledException {
  698.         final MutableObjectId ackId = new MutableObjectId();
  699.         int resultsPending = 0;
  700.         int havesSent = 0;
  701.         int havesSinceLastContinue = 0;
  702.         boolean receivedContinue = false;
  703.         boolean receivedAck = false;
  704.         boolean receivedReady = false;

  705.         if (statelessRPC) {
  706.             state.writeTo(out, null);
  707.         }

  708.         negotiateBegin();
  709.         SEND_HAVES: for (;;) {
  710.             final RevCommit c = walk.next();
  711.             if (c == null) {
  712.                 break SEND_HAVES;
  713.             }

  714.             ObjectId o = c.getId();
  715.             pckOut.writeString("have " + o.name() + "\n"); //$NON-NLS-1$ //$NON-NLS-2$
  716.             havesSent++;
  717.             havesSinceLastContinue++;

  718.             if ((31 & havesSent) != 0) {
  719.                 // We group the have lines into blocks of 32, each marked
  720.                 // with a flush (aka end). This one is within a block so
  721.                 // continue with another have line.
  722.                 //
  723.                 continue;
  724.             }

  725.             if (monitor.isCancelled()) {
  726.                 throw new CancelledException();
  727.             }

  728.             pckOut.end();
  729.             resultsPending++; // Each end will cause a result to come back.

  730.             if (havesSent == 32 && !statelessRPC) {
  731.                 // On the first block we race ahead and try to send
  732.                 // more of the second block while waiting for the
  733.                 // remote to respond to our first block request.
  734.                 // This keeps us one block ahead of the peer.
  735.                 //
  736.                 continue;
  737.             }

  738.             READ_RESULT: for (;;) {
  739.                 final AckNackResult anr = pckIn.readACK(ackId);
  740.                 switch (anr) {
  741.                 case NAK:
  742.                     // More have lines are necessary to compute the
  743.                     // pack on the remote side. Keep doing that.
  744.                     //
  745.                     resultsPending--;
  746.                     break READ_RESULT;

  747.                 case ACK:
  748.                     // The remote side is happy and knows exactly what
  749.                     // to send us. There is no further negotiation and
  750.                     // we can break out immediately.
  751.                     //
  752.                     multiAck = MultiAck.OFF;
  753.                     resultsPending = 0;
  754.                     receivedAck = true;
  755.                     if (statelessRPC) {
  756.                         state.writeTo(out, null);
  757.                     }
  758.                     break SEND_HAVES;

  759.                 case ACK_CONTINUE:
  760.                 case ACK_COMMON:
  761.                 case ACK_READY:
  762.                     // The server knows this commit (ackId). We don't
  763.                     // need to send any further along its ancestry, but
  764.                     // we need to continue to talk about other parts of
  765.                     // our local history.
  766.                     //
  767.                     markCommon(walk.parseAny(ackId), anr, statelessRPC);
  768.                     receivedAck = true;
  769.                     receivedContinue = true;
  770.                     havesSinceLastContinue = 0;
  771.                     if (anr == AckNackResult.ACK_READY) {
  772.                         receivedReady = true;
  773.                     }
  774.                     break;
  775.                 }

  776.                 if (monitor.isCancelled()) {
  777.                     throw new CancelledException();
  778.                 }
  779.             }

  780.             if (noDone && receivedReady) {
  781.                 break SEND_HAVES;
  782.             }
  783.             if (statelessRPC) {
  784.                 state.writeTo(out, null);
  785.             }

  786.             if ((receivedContinue && havesSinceLastContinue > MAX_HAVES)
  787.                     || havesSent >= maxHaves) {
  788.                 // Our history must be really different from the remote's.
  789.                 // We just sent a whole slew of have lines, and it did not
  790.                 // recognize any of them. Avoid sending our entire history
  791.                 // to them by giving up early.
  792.                 //
  793.                 break SEND_HAVES;
  794.             }
  795.         }

  796.         // Tell the remote side we have run out of things to talk about.
  797.         //
  798.         if (monitor.isCancelled()) {
  799.             throw new CancelledException();
  800.         }

  801.         if (!receivedReady || !noDone) {
  802.             // When statelessRPC is true we should always leave SEND_HAVES
  803.             // loop above while in the middle of a request. This allows us
  804.             // to just write done immediately.
  805.             //
  806.             pckOut.writeString("done\n"); //$NON-NLS-1$
  807.             pckOut.flush();
  808.         }

  809.         if (!receivedAck) {
  810.             // Apparently if we have never received an ACK earlier
  811.             // there is one more result expected from the done we
  812.             // just sent to the remote.
  813.             //
  814.             multiAck = MultiAck.OFF;
  815.             resultsPending++;
  816.         }

  817.         READ_RESULT: while (resultsPending > 0 || multiAck != MultiAck.OFF) {
  818.             final AckNackResult anr = pckIn.readACK(ackId);
  819.             resultsPending--;
  820.             switch (anr) {
  821.             case NAK:
  822.                 // A NAK is a response to an end we queued earlier
  823.                 // we eat it and look for another ACK/NAK message.
  824.                 //
  825.                 break;

  826.             case ACK:
  827.                 // A solitary ACK at this point means the remote won't
  828.                 // speak anymore, but is going to send us a pack now.
  829.                 //
  830.                 break READ_RESULT;

  831.             case ACK_CONTINUE:
  832.             case ACK_COMMON:
  833.             case ACK_READY:
  834.                 // We will expect a normal ACK to break out of the loop.
  835.                 //
  836.                 multiAck = MultiAck.CONTINUE;
  837.                 break;
  838.             }

  839.             if (monitor.isCancelled()) {
  840.                 throw new CancelledException();
  841.             }
  842.         }
  843.     }

  844.     private void negotiateBegin() throws IOException {
  845.         walk.resetRetain(REACHABLE, ADVERTISED);
  846.         walk.markStart(reachableCommits);
  847.         walk.sort(RevSort.COMMIT_TIME_DESC);
  848.         walk.setRevFilter(new RevFilter() {
  849.             @Override
  850.             public RevFilter clone() {
  851.                 return this;
  852.             }

  853.             @Override
  854.             public boolean include(RevWalk walker, RevCommit c) {
  855.                 final boolean remoteKnowsIsCommon = c.has(COMMON);
  856.                 if (c.has(ADVERTISED)) {
  857.                     // Remote advertised this, and we have it, hence common.
  858.                     // Whether or not the remote knows that fact is tested
  859.                     // before we added the flag. If the remote doesn't know
  860.                     // we have to still send them this object.
  861.                     //
  862.                     c.add(COMMON);
  863.                 }
  864.                 return !remoteKnowsIsCommon;
  865.             }

  866.             @Override
  867.             public boolean requiresCommitBody() {
  868.                 return false;
  869.             }
  870.         });
  871.     }

  872.     private void markRefsAdvertised() {
  873.         for (Ref r : getRefs()) {
  874.             markAdvertised(r.getObjectId());
  875.             if (r.getPeeledObjectId() != null)
  876.                 markAdvertised(r.getPeeledObjectId());
  877.         }
  878.     }

  879.     private void markAdvertised(AnyObjectId id) {
  880.         try {
  881.             walk.parseAny(id).add(ADVERTISED);
  882.         } catch (IOException readError) {
  883.             // We probably just do not have this object locally.
  884.         }
  885.     }

  886.     private void markCommon(RevObject obj, AckNackResult anr, boolean useState)
  887.             throws IOException {
  888.         if (useState && anr == AckNackResult.ACK_COMMON && !obj.has(STATE)) {
  889.             pckState.writeString("have " + obj.name() + '\n'); //$NON-NLS-1$
  890.             obj.add(STATE);
  891.         }
  892.         obj.add(COMMON);
  893.         if (obj instanceof RevCommit)
  894.             ((RevCommit) obj).carry(COMMON);
  895.     }

  896.     private void receivePack(final ProgressMonitor monitor,
  897.             OutputStream outputStream) throws IOException {
  898.         onReceivePack();
  899.         InputStream input = in;
  900.         if (sideband)
  901.             input = new SideBandInputStream(input, monitor, getMessageWriter(),
  902.                     outputStream);

  903.         try (ObjectInserter ins = local.newObjectInserter()) {
  904.             PackParser parser = ins.newPackParser(input);
  905.             parser.setAllowThin(thinPack);
  906.             parser.setObjectChecker(transport.getObjectChecker());
  907.             parser.setLockMessage(lockMessage);
  908.             packLock = parser.parse(monitor);
  909.             ins.flush();
  910.         }
  911.     }

  912.     /**
  913.      * Notification event delivered just before the pack is received from the
  914.      * network. This event can be used by RPC such as {@link org.eclipse.jgit.transport.TransportHttp} to
  915.      * disable its request magic and ensure the pack stream is read correctly.
  916.      *
  917.      * @since 2.0
  918.      */
  919.     protected void onReceivePack() {
  920.         // By default do nothing for TCP based protocols.
  921.     }

  922.     private static class CancelledException extends Exception {
  923.         private static final long serialVersionUID = 1L;
  924.     }

  925.     private static class FetchStateV2 {

  926.         long havesToSend = 32;

  927.         long havesTotal;

  928.         // Set to true if we got at least one ACK in protocol V2.
  929.         boolean hadAcks;

  930.         // Counts haves without ACK. Use as cutoff for negotiation only once
  931.         // hadAcks == true.
  932.         long havesWithoutAck;

  933.         void incHavesToSend(boolean statelessRPC) {
  934.             if (statelessRPC) {
  935.                 // Increase this quicker since connection setup costs accumulate
  936.                 if (havesToSend < 16384) {
  937.                     havesToSend *= 2;
  938.                 } else {
  939.                     havesToSend = havesToSend * 11 / 10;
  940.                 }
  941.             } else {
  942.                 havesToSend += 32;
  943.             }
  944.         }
  945.     }
  946. }