DeltaTask.java

/*
 * Copyright (C) 2010, Google Inc.
 * and other copyright owners as documented in the project's IP log.
 *
 * This program and the accompanying materials are made available
 * under the terms of the Eclipse Distribution License v1.0 which
 * accompanies this distribution, is reproduced below, and is
 * available at http://www.eclipse.org/org/documents/edl-v10.php
 *
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or
 * without modification, are permitted provided that the following
 * conditions are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Redistributions in binary form must reproduce the above
 *   copyright notice, this list of conditions and the following
 *   disclaimer in the documentation and/or other materials provided
 *   with the distribution.
 *
 * - Neither the name of the Eclipse Foundation, Inc. nor the
 *   names of its contributors may be used to endorse or promote
 *   products derived from this software without specific prior
 *   written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

package org.eclipse.jgit.internal.storage.pack;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;

import org.eclipse.jgit.lib.ObjectReader;
import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
import org.eclipse.jgit.storage.pack.PackConfig;

final class DeltaTask implements Callable<Object> {
	static final long MAX_METER = 9 << 20;

	static final class Block {
		private static final int MIN_TOP_PATH = 50 << 20;

		final List<DeltaTask> tasks;
		final int threads;
		final PackConfig config;
		final ObjectReader templateReader;
		final DeltaCache dc;
		final ThreadSafeProgressMonitor pm;
		final ObjectToPack[] list;
		final int beginIndex;
		final int endIndex;

		private long totalWeight;
		long bytesPerUnit;

		Block(int threads, PackConfig config, ObjectReader reader,
				DeltaCache dc, ThreadSafeProgressMonitor pm,
				ObjectToPack[] list, int begin, int end) {
			this.tasks = new ArrayList<>(threads);
			this.threads = threads;
			this.config = config;
			this.templateReader = reader;
			this.dc = dc;
			this.pm = pm;
			this.list = list;
			this.beginIndex = begin;
			this.endIndex = end;
		}

		int cost() {
			int d = (int) (totalWeight / bytesPerUnit);
			if (totalWeight % bytesPerUnit != 0)
				d++;
			return d;
		}

		synchronized DeltaWindow stealWork(DeltaTask forThread) {
			for (;;) {
				DeltaTask maxTask = null;
				Slice maxSlice = null;
				int maxWork = 0;

				for (DeltaTask task : tasks) {
					Slice s = task.remaining();
					if (s != null && maxWork < s.size()) {
						maxTask = task;
						maxSlice = s;
						maxWork = s.size();
					}
				}
				if (maxTask == null) {
					return null;
				}
				if (maxTask.tryStealWork(maxSlice)) {
					return forThread.initWindow(maxSlice);
				}
			}
		}

		void partitionTasks() {
			ArrayList<WeightedPath> topPaths = computeTopPaths();
			Iterator<WeightedPath> topPathItr = topPaths.iterator();
			int nextTop = 0;
			long weightPerThread = Math.max(totalWeight / threads, 1);
			for (int i = beginIndex; i < endIndex;) {
				DeltaTask task = new DeltaTask(this);
				long w = 0;

				// Assign the thread one top path.
				if (topPathItr.hasNext()) {
					WeightedPath p = topPathItr.next();
					w += p.weight;
					task.add(p.slice);
				}

				// Assign the task thread ~average weight.
				int s = i;
				for (; w < weightPerThread && i < endIndex;) {
					if (nextTop < topPaths.size()
							&& i == topPaths.get(nextTop).slice.beginIndex) {
						if (s < i) {
							task.add(new Slice(s, i));
						}
						s = i = topPaths.get(nextTop++).slice.endIndex;
					} else {
						w += getAdjustedWeight(list[i++]);
					}
				}

				// Round up the slice to the end of a path.
				if (s < i) {
					int h = list[i - 1].getPathHash();
					while (i < endIndex) {
						if (h == list[i].getPathHash()) {
							i++;
						} else {
							break;
						}
					}
					task.add(new Slice(s, i));
				}
				if (!task.slices.isEmpty()) {
					tasks.add(task);
				}
			}
			while (topPathItr.hasNext()) {
				WeightedPath p = topPathItr.next();
				DeltaTask task = new DeltaTask(this);
				task.add(p.slice);
				tasks.add(task);
			}

			topPaths = null;
		}

		private ArrayList<WeightedPath> computeTopPaths() {
			ArrayList<WeightedPath> topPaths = new ArrayList<>(
					threads);
			int cp = beginIndex;
			int ch = list[cp].getPathHash();
			long cw = getAdjustedWeight(list[cp]);
			totalWeight = cw;

			for (int i = cp + 1; i < endIndex; i++) {
				ObjectToPack o = list[i];
				if (ch != o.getPathHash()) {
					if (MIN_TOP_PATH < cw) {
						if (topPaths.size() < threads) {
							Slice s = new Slice(cp, i);
							topPaths.add(new WeightedPath(cw, s));
							if (topPaths.size() == threads) {
								Collections.sort(topPaths);
							}
						} else if (topPaths.get(0).weight < cw) {
							Slice s = new Slice(cp, i);
							WeightedPath p = new WeightedPath(cw, s);
							topPaths.set(0, p);
							if (p.compareTo(topPaths.get(1)) > 0) {
								Collections.sort(topPaths);
							}
						}
					}
					cp = i;
					ch = o.getPathHash();
					cw = 0;
				}
				int weight = getAdjustedWeight(o);
				cw += weight;
				totalWeight += weight;
			}

			// Sort by starting index to identify gaps later.
			Collections.sort(topPaths, (WeightedPath a,
					WeightedPath b) -> a.slice.beginIndex - b.slice.beginIndex);

			bytesPerUnit = 1;
			while (MAX_METER <= (totalWeight / bytesPerUnit)) {
				bytesPerUnit <<= 10;
			}
			return topPaths;
		}
	}

	static int getAdjustedWeight(ObjectToPack o) {
		// Edge objects and those with reused deltas do not need to be
		// compressed. For compression calculations, ignore their weights.
		if (o.isEdge() || o.doNotAttemptDelta()) {
			return 0;
		}
		return o.getWeight();
	}

	static final class WeightedPath implements Comparable<WeightedPath> {
		final long weight;
		final Slice slice;

		WeightedPath(long weight, Slice s) {
			this.weight = weight;
			this.slice = s;
		}

		@Override
		public int compareTo(WeightedPath o) {
			int cmp = Long.signum(weight - o.weight);
			if (cmp != 0) {
				return cmp;
			}
			return slice.beginIndex - o.slice.beginIndex;
		}
	}

	static final class Slice {
		final int beginIndex;
		final int endIndex;

		Slice(int b, int e) {
			beginIndex = b;
			endIndex = e;
		}

		final int size() {
			return endIndex - beginIndex;
		}
	}

	private final Block block;
	final LinkedList<Slice> slices;

	private ObjectReader or;
	private DeltaWindow dw;

	DeltaTask(Block b) {
		this.block = b;
		this.slices = new LinkedList<>();
	}

	void add(Slice s) {
		if (!slices.isEmpty()) {
			Slice last = slices.getLast();
			if (last.endIndex == s.beginIndex) {
				slices.removeLast();
				slices.add(new Slice(last.beginIndex, s.endIndex));
				return;
			}
		}
		slices.add(s);
	}

	/** {@inheritDoc} */
	@Override
	public Object call() throws Exception {
		or = block.templateReader.newReader();
		try {
			DeltaWindow w;
			for (;;) {
				synchronized (this) {
					if (slices.isEmpty()) {
						break;
					}
					w = initWindow(slices.removeFirst());
				}
				runWindow(w);
			}
			while ((w = block.stealWork(this)) != null) {
				runWindow(w);
			}
		} finally {
			block.pm.endWorker();
			or.close();
			or = null;
		}
		return null;
	}

	DeltaWindow initWindow(Slice s) {
		DeltaWindow w = new DeltaWindow(block.config, block.dc,
				or, block.pm, block.bytesPerUnit,
				block.list, s.beginIndex, s.endIndex);
		synchronized (this) {
			dw = w;
		}
		return w;
	}

	private void runWindow(DeltaWindow w) throws IOException {
		try {
			w.search();
		} finally {
			synchronized (this) {
				dw = null;
			}
		}
	}

	synchronized Slice remaining() {
		if (!slices.isEmpty()) {
			return slices.getLast();
		}
		DeltaWindow d = dw;
		return d != null ? d.remaining() : null;
	}

	synchronized boolean tryStealWork(Slice s) {
		if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) {
			slices.removeLast();
			return true;
		}
		DeltaWindow d = dw;
		return d != null ? d.tryStealWork(s) : false;
	}
}