View Javadoc
1   /*
2    * Copyright (C) 2008-2010, Google Inc.
3    * Copyright (C) 2008, Robin Rosenberg <robin.rosenberg@dewire.com>
4    * Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
5    * and other copyright owners as documented in the project's IP log.
6    *
7    * This program and the accompanying materials are made available
8    * under the terms of the Eclipse Distribution License v1.0 which
9    * accompanies this distribution, is reproduced below, and is
10   * available at http://www.eclipse.org/org/documents/edl-v10.php
11   *
12   * All rights reserved.
13   *
14   * Redistribution and use in source and binary forms, with or
15   * without modification, are permitted provided that the following
16   * conditions are met:
17   *
18   * - Redistributions of source code must retain the above copyright
19   *   notice, this list of conditions and the following disclaimer.
20   *
21   * - Redistributions in binary form must reproduce the above
22   *   copyright notice, this list of conditions and the following
23   *   disclaimer in the documentation and/or other materials provided
24   *   with the distribution.
25   *
26   * - Neither the name of the Eclipse Foundation, Inc. nor the
27   *   names of its contributors may be used to endorse or promote
28   *   products derived from this software without specific prior
29   *   written permission.
30   *
31   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
32   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
33   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
34   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
35   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
36   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
37   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
38   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
39   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
40   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
41   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
42   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
43   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
44   */
45  
46  package org.eclipse.jgit.transport;
47  
48  import static org.eclipse.jgit.lib.RefDatabase.ALL;
49  
50  import java.io.IOException;
51  import java.io.InputStream;
52  import java.io.OutputStream;
53  import java.text.MessageFormat;
54  import java.util.Collection;
55  import java.util.Collections;
56  import java.util.Date;
57  import java.util.Map;
58  import java.util.Set;
59  
60  import org.eclipse.jgit.errors.PackProtocolException;
61  import org.eclipse.jgit.errors.TransportException;
62  import org.eclipse.jgit.internal.JGitText;
63  import org.eclipse.jgit.internal.storage.file.PackLock;
64  import org.eclipse.jgit.lib.AnyObjectId;
65  import org.eclipse.jgit.lib.Config;
66  import org.eclipse.jgit.lib.Config.SectionParser;
67  import org.eclipse.jgit.lib.Constants;
68  import org.eclipse.jgit.lib.MutableObjectId;
69  import org.eclipse.jgit.lib.NullProgressMonitor;
70  import org.eclipse.jgit.lib.ObjectId;
71  import org.eclipse.jgit.lib.ObjectInserter;
72  import org.eclipse.jgit.lib.ProgressMonitor;
73  import org.eclipse.jgit.lib.Ref;
74  import org.eclipse.jgit.revwalk.RevCommit;
75  import org.eclipse.jgit.revwalk.RevCommitList;
76  import org.eclipse.jgit.revwalk.RevFlag;
77  import org.eclipse.jgit.revwalk.RevObject;
78  import org.eclipse.jgit.revwalk.RevSort;
79  import org.eclipse.jgit.revwalk.RevWalk;
80  import org.eclipse.jgit.revwalk.filter.CommitTimeRevFilter;
81  import org.eclipse.jgit.revwalk.filter.RevFilter;
82  import org.eclipse.jgit.transport.GitProtocolConstants.MultiAck;
83  import org.eclipse.jgit.transport.PacketLineIn.AckNackResult;
84  import org.eclipse.jgit.util.TemporaryBuffer;
85  
86  /**
87   * Fetch implementation using the native Git pack transfer service.
88   * <p>
89   * This is the canonical implementation for transferring objects from the remote
90   * repository to the local repository by talking to the 'git-upload-pack'
91   * service. Objects are packed on the remote side into a pack file and then sent
92   * down the pipe to us.
93   * <p>
94   * This connection requires only a bi-directional pipe or socket, and thus is
95   * easily wrapped up into a local process pipe, anonymous TCP socket, or a
96   * command executed through an SSH tunnel.
97   * <p>
98   * If {@link BasePackConnection#statelessRPC} is {@code true}, this connection
99   * can be tunneled over a request-response style RPC system like HTTP.  The RPC
100  * call boundary is determined by this class switching from writing to the
101  * OutputStream to reading from the InputStream.
102  * <p>
103  * Concrete implementations should just call
104  * {@link #init(java.io.InputStream, java.io.OutputStream)} and
105  * {@link #readAdvertisedRefs()} methods in constructor or before any use. They
106  * should also handle resources releasing in {@link #close()} method if needed.
107  */
108 public abstract class BasePackFetchConnection extends BasePackConnection
109 		implements FetchConnection {
110 	/**
111 	 * Maximum number of 'have' lines to send before giving up.
112 	 * <p>
113 	 * During {@link #negotiate(ProgressMonitor)} we send at most this many
114 	 * commits to the remote peer as 'have' lines without an ACK response before
115 	 * we give up.
116 	 */
117 	private static final int MAX_HAVES = 256;
118 
119 	/**
120 	 * Amount of data the client sends before starting to read.
121 	 * <p>
122 	 * Any output stream given to the client must be able to buffer this many
123 	 * bytes before the client will stop writing and start reading from the
124 	 * input stream. If the output stream blocks before this many bytes are in
125 	 * the send queue, the system will deadlock.
126 	 */
127 	protected static final int MIN_CLIENT_BUFFER = 2 * 32 * 46 + 8;
128 
129 	/**
130 	 * Include tags if we are also including the referenced objects.
131 	 * @since 2.0
132 	 */
133 	public static final String OPTION_INCLUDE_TAG = GitProtocolConstants.OPTION_INCLUDE_TAG;
134 
135 	/**
136 	 * Mutli-ACK support for improved negotiation.
137 	 * @since 2.0
138 	 */
139 	public static final String OPTION_MULTI_ACK = GitProtocolConstants.OPTION_MULTI_ACK;
140 
141 	/**
142 	 * Mutli-ACK detailed support for improved negotiation.
143 	 * @since 2.0
144 	 */
145 	public static final String OPTION_MULTI_ACK_DETAILED = GitProtocolConstants.OPTION_MULTI_ACK_DETAILED;
146 
147 	/**
148 	 * The client supports packs with deltas but not their bases.
149 	 * @since 2.0
150 	 */
151 	public static final String OPTION_THIN_PACK = GitProtocolConstants.OPTION_THIN_PACK;
152 
153 	/**
154 	 * The client supports using the side-band for progress messages.
155 	 * @since 2.0
156 	 */
157 	public static final String OPTION_SIDE_BAND = GitProtocolConstants.OPTION_SIDE_BAND;
158 
159 	/**
160 	 * The client supports using the 64K side-band for progress messages.
161 	 * @since 2.0
162 	 */
163 	public static final String OPTION_SIDE_BAND_64K = GitProtocolConstants.OPTION_SIDE_BAND_64K;
164 
165 	/**
166 	 * The client supports packs with OFS deltas.
167 	 * @since 2.0
168 	 */
169 	public static final String OPTION_OFS_DELTA = GitProtocolConstants.OPTION_OFS_DELTA;
170 
171 	/**
172 	 * The client supports shallow fetches.
173 	 * @since 2.0
174 	 */
175 	public static final String OPTION_SHALLOW = GitProtocolConstants.OPTION_SHALLOW;
176 
177 	/**
178 	 * The client does not want progress messages and will ignore them.
179 	 * @since 2.0
180 	 */
181 	public static final String OPTION_NO_PROGRESS = GitProtocolConstants.OPTION_NO_PROGRESS;
182 
183 	/**
184 	 * The client supports receiving a pack before it has sent "done".
185 	 * @since 2.0
186 	 */
187 	public static final String OPTION_NO_DONE = GitProtocolConstants.OPTION_NO_DONE;
188 
189 	/**
190 	 * The client supports fetching objects at the tip of any ref, even if not
191 	 * advertised.
192 	 * @since 3.1
193 	 */
194 	public static final String OPTION_ALLOW_TIP_SHA1_IN_WANT = GitProtocolConstants.OPTION_ALLOW_TIP_SHA1_IN_WANT;
195 
196 	/**
197 	 * The client supports fetching objects that are reachable from a tip of a
198 	 * ref that is allowed to fetch.
199 	 * @since 4.1
200 	 */
201 	public static final String OPTION_ALLOW_REACHABLE_SHA1_IN_WANT = GitProtocolConstants.OPTION_ALLOW_REACHABLE_SHA1_IN_WANT;
202 
203 	private final RevWalk walk;
204 
205 	/** All commits that are immediately reachable by a local ref. */
206 	private RevCommitList<RevCommit> reachableCommits;
207 
208 	/** Marks an object as having all its dependencies. */
209 	final RevFlag REACHABLE;
210 
211 	/** Marks a commit known to both sides of the connection. */
212 	final RevFlag COMMON;
213 
214 	/** Like {@link #COMMON} but means its also in {@link #pckState}. */
215 	private final RevFlag STATE;
216 
217 	/** Marks a commit listed in the advertised refs. */
218 	final RevFlag ADVERTISED;
219 
220 	private MultiAck multiAck = MultiAck.OFF;
221 
222 	private boolean thinPack;
223 
224 	private boolean sideband;
225 
226 	private boolean includeTags;
227 
228 	private boolean allowOfsDelta;
229 
230 	private boolean noDone;
231 
232 	private boolean noProgress;
233 
234 	private String lockMessage;
235 
236 	private PackLock packLock;
237 
238 	/** RPC state, if {@link BasePackConnection#statelessRPC} is true. */
239 	private TemporaryBuffer.Heap state;
240 
241 	private PacketLineOut pckState;
242 
243 	/**
244 	 * Create a new connection to fetch using the native git transport.
245 	 *
246 	 * @param packTransport
247 	 *            the transport.
248 	 */
249 	public BasePackFetchConnection(final PackTransport packTransport) {
250 		super(packTransport);
251 
252 		if (local != null) {
253 			final FetchConfig cfg = local.getConfig().get(FetchConfig.KEY);
254 			allowOfsDelta = cfg.allowOfsDelta;
255 		} else {
256 			allowOfsDelta = true;
257 		}
258 		includeTags = transport.getTagOpt() != TagOpt.NO_TAGS;
259 		thinPack = transport.isFetchThin();
260 
261 		if (local != null) {
262 			walk = new RevWalk(local);
263 			reachableCommits = new RevCommitList<>();
264 			REACHABLE = walk.newFlag("REACHABLE"); //$NON-NLS-1$
265 			COMMON = walk.newFlag("COMMON"); //$NON-NLS-1$
266 			STATE = walk.newFlag("STATE"); //$NON-NLS-1$
267 			ADVERTISED = walk.newFlag("ADVERTISED"); //$NON-NLS-1$
268 
269 			walk.carry(COMMON);
270 			walk.carry(REACHABLE);
271 			walk.carry(ADVERTISED);
272 		} else {
273 			walk = null;
274 			REACHABLE = null;
275 			COMMON = null;
276 			STATE = null;
277 			ADVERTISED = null;
278 		}
279 	}
280 
281 	private static class FetchConfig {
282 		static final SectionParser<FetchConfig> KEY = new SectionParser<FetchConfig>() {
283 			@Override
284 			public FetchConfig parse(final Config cfg) {
285 				return new FetchConfig(cfg);
286 			}
287 		};
288 
289 		final boolean allowOfsDelta;
290 
291 		FetchConfig(final Config c) {
292 			allowOfsDelta = c.getBoolean("repack", "usedeltabaseoffset", true); //$NON-NLS-1$ //$NON-NLS-2$
293 		}
294 	}
295 
296 	@Override
297 	public final void fetch(final ProgressMonitor monitor,
298 			final Collection<Ref> want, final Set<ObjectId> have)
299 			throws TransportException {
300 		fetch(monitor, want, have, null);
301 	}
302 
303 	/**
304 	 * @since 3.0
305 	 */
306 	@Override
307 	public final void fetch(final ProgressMonitor monitor,
308 			final Collection<Ref> want, final Set<ObjectId> have,
309 			OutputStream outputStream) throws TransportException {
310 		markStartedOperation();
311 		doFetch(monitor, want, have, outputStream);
312 	}
313 
314 	@Override
315 	public boolean didFetchIncludeTags() {
316 		return false;
317 	}
318 
319 	@Override
320 	public boolean didFetchTestConnectivity() {
321 		return false;
322 	}
323 
324 	@Override
325 	public void setPackLockMessage(final String message) {
326 		lockMessage = message;
327 	}
328 
329 	@Override
330 	public Collection<PackLock> getPackLocks() {
331 		if (packLock != null)
332 			return Collections.singleton(packLock);
333 		return Collections.<PackLock> emptyList();
334 	}
335 
336 	/**
337 	 * Execute common ancestor negotiation and fetch the objects.
338 	 *
339 	 * @param monitor
340 	 *            progress monitor to receive status updates. If the monitor is
341 	 *            the {@link NullProgressMonitor#INSTANCE}, then the no-progress
342 	 *            option enabled.
343 	 * @param want
344 	 *            the advertised remote references the caller wants to fetch.
345 	 * @param have
346 	 *            additional objects to assume that already exist locally. This
347 	 *            will be added to the set of objects reachable from the
348 	 *            destination repository's references.
349 	 * @param outputStream
350 	 *            ouputStream to write sideband messages to
351 	 * @throws TransportException
352 	 *             if any exception occurs.
353 	 * @since 3.0
354 	 */
355 	protected void doFetch(final ProgressMonitor monitor,
356 			final Collection<Ref> want, final Set<ObjectId> have,
357 			OutputStream outputStream) throws TransportException {
358 		try {
359 			noProgress = monitor == NullProgressMonitor.INSTANCE;
360 
361 			markRefsAdvertised();
362 			markReachable(have, maxTimeWanted(want));
363 
364 			if (statelessRPC) {
365 				state = new TemporaryBuffer.Heap(Integer.MAX_VALUE);
366 				pckState = new PacketLineOut(state);
367 			}
368 
369 			if (sendWants(want)) {
370 				negotiate(monitor);
371 
372 				walk.dispose();
373 				reachableCommits = null;
374 				state = null;
375 				pckState = null;
376 
377 				receivePack(monitor, outputStream);
378 			}
379 		} catch (CancelledException ce) {
380 			close();
381 			return; // Caller should test (or just know) this themselves.
382 		} catch (IOException err) {
383 			close();
384 			throw new TransportException(err.getMessage(), err);
385 		} catch (RuntimeException err) {
386 			close();
387 			throw new TransportException(err.getMessage(), err);
388 		}
389 	}
390 
391 	@Override
392 	public void close() {
393 		if (walk != null)
394 			walk.close();
395 		super.close();
396 	}
397 
398 	private int maxTimeWanted(final Collection<Ref> wants) {
399 		int maxTime = 0;
400 		for (final Ref r : wants) {
401 			try {
402 				final RevObject obj = walk.parseAny(r.getObjectId());
403 				if (obj instanceof RevCommit) {
404 					final int cTime = ((RevCommit) obj).getCommitTime();
405 					if (maxTime < cTime)
406 						maxTime = cTime;
407 				}
408 			} catch (IOException error) {
409 				// We don't have it, but we want to fetch (thus fixing error).
410 			}
411 		}
412 		return maxTime;
413 	}
414 
415 	private void markReachable(final Set<ObjectId> have, final int maxTime)
416 			throws IOException {
417 		Map<String, Ref> refs = local.getRefDatabase().getRefs(ALL);
418 		for (final Ref r : refs.values()) {
419 			ObjectId id = r.getPeeledObjectId();
420 			if (id == null)
421 				id = r.getObjectId();
422 			if (id == null)
423 				continue;
424 			parseReachable(id);
425 		}
426 
427 		for (ObjectId id : local.getAdditionalHaves())
428 			parseReachable(id);
429 
430 		for (ObjectId id : have)
431 			parseReachable(id);
432 
433 		if (maxTime > 0) {
434 			// Mark reachable commits until we reach maxTime. These may
435 			// wind up later matching up against things we want and we
436 			// can avoid asking for something we already happen to have.
437 			//
438 			final Date maxWhen = new Date(maxTime * 1000L);
439 			walk.sort(RevSort.COMMIT_TIME_DESC);
440 			walk.markStart(reachableCommits);
441 			walk.setRevFilter(CommitTimeRevFilter.after(maxWhen));
442 			for (;;) {
443 				final RevCommit c = walk.next();
444 				if (c == null)
445 					break;
446 				if (c.has(ADVERTISED) && !c.has(COMMON)) {
447 					// This is actually going to be a common commit, but
448 					// our peer doesn't know that fact yet.
449 					//
450 					c.add(COMMON);
451 					c.carry(COMMON);
452 					reachableCommits.add(c);
453 				}
454 			}
455 		}
456 	}
457 
458 	private void parseReachable(ObjectId id) {
459 		try {
460 			RevCommit o = walk.parseCommit(id);
461 			if (!o.has(REACHABLE)) {
462 				o.add(REACHABLE);
463 				reachableCommits.add(o);
464 			}
465 		} catch (IOException readError) {
466 			// If we cannot read the value of the ref skip it.
467 		}
468 	}
469 
470 	private boolean sendWants(final Collection<Ref> want) throws IOException {
471 		final PacketLineOut p = statelessRPC ? pckState : pckOut;
472 		boolean first = true;
473 		for (final Ref r : want) {
474 			ObjectId objectId = r.getObjectId();
475 			if (objectId == null) {
476 				continue;
477 			}
478 			try {
479 				if (walk.parseAny(objectId).has(REACHABLE)) {
480 					// We already have this object. Asking for it is
481 					// not a very good idea.
482 					//
483 					continue;
484 				}
485 			} catch (IOException err) {
486 				// Its OK, we don't have it, but we want to fix that
487 				// by fetching the object from the other side.
488 			}
489 
490 			final StringBuilder line = new StringBuilder(46);
491 			line.append("want "); //$NON-NLS-1$
492 			line.append(objectId.name());
493 			if (first) {
494 				line.append(enableCapabilities());
495 				first = false;
496 			}
497 			line.append('\n');
498 			p.writeString(line.toString());
499 		}
500 		if (first)
501 			return false;
502 		p.end();
503 		outNeedsEnd = false;
504 		return true;
505 	}
506 
507 	private String enableCapabilities() throws TransportException {
508 		final StringBuilder line = new StringBuilder();
509 		if (noProgress)
510 			wantCapability(line, OPTION_NO_PROGRESS);
511 		if (includeTags)
512 			includeTags = wantCapability(line, OPTION_INCLUDE_TAG);
513 		if (allowOfsDelta)
514 			wantCapability(line, OPTION_OFS_DELTA);
515 
516 		if (wantCapability(line, OPTION_MULTI_ACK_DETAILED)) {
517 			multiAck = MultiAck.DETAILED;
518 			if (statelessRPC)
519 				noDone = wantCapability(line, OPTION_NO_DONE);
520 		} else if (wantCapability(line, OPTION_MULTI_ACK))
521 			multiAck = MultiAck.CONTINUE;
522 		else
523 			multiAck = MultiAck.OFF;
524 
525 		if (thinPack)
526 			thinPack = wantCapability(line, OPTION_THIN_PACK);
527 		if (wantCapability(line, OPTION_SIDE_BAND_64K))
528 			sideband = true;
529 		else if (wantCapability(line, OPTION_SIDE_BAND))
530 			sideband = true;
531 
532 		if (statelessRPC && multiAck != MultiAck.DETAILED) {
533 			// Our stateless RPC implementation relies upon the detailed
534 			// ACK status to tell us common objects for reuse in future
535 			// requests.  If its not enabled, we can't talk to the peer.
536 			//
537 			throw new PackProtocolException(uri, MessageFormat.format(
538 					JGitText.get().statelessRPCRequiresOptionToBeEnabled,
539 					OPTION_MULTI_ACK_DETAILED));
540 		}
541 
542 		addUserAgentCapability(line);
543 		return line.toString();
544 	}
545 
546 	private void negotiate(final ProgressMonitor monitor) throws IOException,
547 			CancelledException {
548 		final MutableObjectId ackId = new MutableObjectId();
549 		int resultsPending = 0;
550 		int havesSent = 0;
551 		int havesSinceLastContinue = 0;
552 		boolean receivedContinue = false;
553 		boolean receivedAck = false;
554 		boolean receivedReady = false;
555 
556 		if (statelessRPC)
557 			state.writeTo(out, null);
558 
559 		negotiateBegin();
560 		SEND_HAVES: for (;;) {
561 			final RevCommit c = walk.next();
562 			if (c == null)
563 				break SEND_HAVES;
564 
565 			pckOut.writeString("have " + c.getId().name() + "\n"); //$NON-NLS-1$ //$NON-NLS-2$
566 			havesSent++;
567 			havesSinceLastContinue++;
568 
569 			if ((31 & havesSent) != 0) {
570 				// We group the have lines into blocks of 32, each marked
571 				// with a flush (aka end). This one is within a block so
572 				// continue with another have line.
573 				//
574 				continue;
575 			}
576 
577 			if (monitor.isCancelled())
578 				throw new CancelledException();
579 
580 			pckOut.end();
581 			resultsPending++; // Each end will cause a result to come back.
582 
583 			if (havesSent == 32 && !statelessRPC) {
584 				// On the first block we race ahead and try to send
585 				// more of the second block while waiting for the
586 				// remote to respond to our first block request.
587 				// This keeps us one block ahead of the peer.
588 				//
589 				continue;
590 			}
591 
592 			READ_RESULT: for (;;) {
593 				final AckNackResult anr = pckIn.readACK(ackId);
594 				switch (anr) {
595 				case NAK:
596 					// More have lines are necessary to compute the
597 					// pack on the remote side. Keep doing that.
598 					//
599 					resultsPending--;
600 					break READ_RESULT;
601 
602 				case ACK:
603 					// The remote side is happy and knows exactly what
604 					// to send us. There is no further negotiation and
605 					// we can break out immediately.
606 					//
607 					multiAck = MultiAck.OFF;
608 					resultsPending = 0;
609 					receivedAck = true;
610 					if (statelessRPC)
611 						state.writeTo(out, null);
612 					break SEND_HAVES;
613 
614 				case ACK_CONTINUE:
615 				case ACK_COMMON:
616 				case ACK_READY:
617 					// The server knows this commit (ackId). We don't
618 					// need to send any further along its ancestry, but
619 					// we need to continue to talk about other parts of
620 					// our local history.
621 					//
622 					markCommon(walk.parseAny(ackId), anr);
623 					receivedAck = true;
624 					receivedContinue = true;
625 					havesSinceLastContinue = 0;
626 					if (anr == AckNackResult.ACK_READY)
627 						receivedReady = true;
628 					break;
629 				}
630 
631 				if (monitor.isCancelled())
632 					throw new CancelledException();
633 			}
634 
635 			if (noDone & receivedReady)
636 				break SEND_HAVES;
637 			if (statelessRPC)
638 				state.writeTo(out, null);
639 
640 			if (receivedContinue && havesSinceLastContinue > MAX_HAVES) {
641 				// Our history must be really different from the remote's.
642 				// We just sent a whole slew of have lines, and it did not
643 				// recognize any of them. Avoid sending our entire history
644 				// to them by giving up early.
645 				//
646 				break SEND_HAVES;
647 			}
648 		}
649 
650 		// Tell the remote side we have run out of things to talk about.
651 		//
652 		if (monitor.isCancelled())
653 			throw new CancelledException();
654 
655 		if (!receivedReady || !noDone) {
656 			// When statelessRPC is true we should always leave SEND_HAVES
657 			// loop above while in the middle of a request. This allows us
658 			// to just write done immediately.
659 			//
660 			pckOut.writeString("done\n"); //$NON-NLS-1$
661 			pckOut.flush();
662 		}
663 
664 		if (!receivedAck) {
665 			// Apparently if we have never received an ACK earlier
666 			// there is one more result expected from the done we
667 			// just sent to the remote.
668 			//
669 			multiAck = MultiAck.OFF;
670 			resultsPending++;
671 		}
672 
673 		READ_RESULT: while (resultsPending > 0 || multiAck != MultiAck.OFF) {
674 			final AckNackResult anr = pckIn.readACK(ackId);
675 			resultsPending--;
676 			switch (anr) {
677 			case NAK:
678 				// A NAK is a response to an end we queued earlier
679 				// we eat it and look for another ACK/NAK message.
680 				//
681 				break;
682 
683 			case ACK:
684 				// A solitary ACK at this point means the remote won't
685 				// speak anymore, but is going to send us a pack now.
686 				//
687 				break READ_RESULT;
688 
689 			case ACK_CONTINUE:
690 			case ACK_COMMON:
691 			case ACK_READY:
692 				// We will expect a normal ACK to break out of the loop.
693 				//
694 				multiAck = MultiAck.CONTINUE;
695 				break;
696 			}
697 
698 			if (monitor.isCancelled())
699 				throw new CancelledException();
700 		}
701 	}
702 
703 	private void negotiateBegin() throws IOException {
704 		walk.resetRetain(REACHABLE, ADVERTISED);
705 		walk.markStart(reachableCommits);
706 		walk.sort(RevSort.COMMIT_TIME_DESC);
707 		walk.setRevFilter(new RevFilter() {
708 			@Override
709 			public RevFilter clone() {
710 				return this;
711 			}
712 
713 			@Override
714 			public boolean include(final RevWalk walker, final RevCommit c) {
715 				final boolean remoteKnowsIsCommon = c.has(COMMON);
716 				if (c.has(ADVERTISED)) {
717 					// Remote advertised this, and we have it, hence common.
718 					// Whether or not the remote knows that fact is tested
719 					// before we added the flag. If the remote doesn't know
720 					// we have to still send them this object.
721 					//
722 					c.add(COMMON);
723 				}
724 				return !remoteKnowsIsCommon;
725 			}
726 
727 			@Override
728 			public boolean requiresCommitBody() {
729 				return false;
730 			}
731 		});
732 	}
733 
734 	private void markRefsAdvertised() {
735 		for (final Ref r : getRefs()) {
736 			markAdvertised(r.getObjectId());
737 			if (r.getPeeledObjectId() != null)
738 				markAdvertised(r.getPeeledObjectId());
739 		}
740 	}
741 
742 	private void markAdvertised(final AnyObjectId id) {
743 		try {
744 			walk.parseAny(id).add(ADVERTISED);
745 		} catch (IOException readError) {
746 			// We probably just do not have this object locally.
747 		}
748 	}
749 
750 	private void markCommon(final RevObject obj, final AckNackResult anr)
751 			throws IOException {
752 		if (statelessRPC && anr == AckNackResult.ACK_COMMON && !obj.has(STATE)) {
753 			StringBuilder s;
754 
755 			s = new StringBuilder(6 + Constants.OBJECT_ID_STRING_LENGTH);
756 			s.append("have "); //$NON-NLS-1$
757 			s.append(obj.name());
758 			s.append('\n');
759 			pckState.writeString(s.toString());
760 			obj.add(STATE);
761 		}
762 		obj.add(COMMON);
763 		if (obj instanceof RevCommit)
764 			((RevCommit) obj).carry(COMMON);
765 	}
766 
767 	private void receivePack(final ProgressMonitor monitor,
768 			OutputStream outputStream) throws IOException {
769 		onReceivePack();
770 		InputStream input = in;
771 		if (sideband)
772 			input = new SideBandInputStream(input, monitor, getMessageWriter(),
773 					outputStream);
774 
775 		try (ObjectInserter ins = local.newObjectInserter()) {
776 			PackParser parser = ins.newPackParser(input);
777 			parser.setAllowThin(thinPack);
778 			parser.setObjectChecker(transport.getObjectChecker());
779 			parser.setLockMessage(lockMessage);
780 			packLock = parser.parse(monitor);
781 			ins.flush();
782 		}
783 	}
784 
785 	/**
786 	 * Notification event delivered just before the pack is received from the
787 	 * network. This event can be used by RPC such as {@link TransportHttp} to
788 	 * disable its request magic and ensure the pack stream is read correctly.
789 	 *
790 	 * @since 2.0
791 	 */
792 	protected void onReceivePack() {
793 		// By default do nothing for TCP based protocols.
794 	}
795 
796 	private static class CancelledException extends Exception {
797 		private static final long serialVersionUID = 1L;
798 	}
799 }