DfsPackParser.java

/*
 * Copyright (C) 2011, Google Inc.
 * and other copyright owners as documented in the project's IP log.
 *
 * This program and the accompanying materials are made available
 * under the terms of the Eclipse Distribution License v1.0 which
 * accompanies this distribution, is reproduced below, and is
 * available at http://www.eclipse.org/org/documents/edl-v10.php
 *
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or
 * without modification, are permitted provided that the following
 * conditions are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Redistributions in binary form must reproduce the above
 *   copyright notice, this list of conditions and the following
 *   disclaimer in the documentation and/or other materials provided
 *   with the distribution.
 *
 * - Neither the name of the Eclipse Foundation, Inc. nor the
 *   names of its contributors may be used to endorse or promote
 *   products derived from this software without specific prior
 *   written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

package org.eclipse.jgit.internal.storage.dfs;

import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.Collections;
import java.util.List;
import java.util.zip.CRC32;
import java.util.zip.Deflater;

import org.eclipse.jgit.internal.storage.file.PackIndex;
import org.eclipse.jgit.internal.storage.file.PackLock;
import org.eclipse.jgit.lib.AnyObjectId;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.transport.PackParser;
import org.eclipse.jgit.transport.PackedObjectInfo;

/**
 * Parses a pack stream into the DFS, by creating a new pack and index.
 */
public class DfsPackParser extends PackParser {
	private final DfsObjDatabase objdb;

	private final DfsInserter objins;

	/** CRC-32 computation for objects that are appended onto the pack. */
	private final CRC32 crc;

	/** Running SHA-1 of the entire pack stream. */
	private final MessageDigest packDigest;

	/** Block size to use when caching data for read back. */
	private int blockSize;

	/** Current end of the pack file. */
	private long packEnd;

	/** Checksum of the entire pack file. */
	private byte[] packHash;

	/** Compresses delta bases when completing a thin pack. */
	private Deflater def;

	/** True if the pack is an empty pack. */
	private boolean isEmptyPack;

	/** Name of the pack file, computed in {@link #onPackHeader(long)}. */
	private DfsPackDescription packDsc;

	/** Key used during delta resolution reading delta chains. */
	private DfsStreamKey packKey;

	/** If the index was small enough, the entire index after writing. */
	private PackIndex packIndex;

	/** Stream to the DFS storage, opened during {@link #onPackHeader(long)}. */
	private DfsOutputStream out;

	/** Data being written that has not yet been cached. */
	private byte[] currBuf;
	private long currPos; // Position of currBuf in the file.
	private int currEnd; // Position inside of currBuf to append to next.

	/** Cache the chunks were stored into or get read back from. */
	private DfsBlockCache blockCache;

	/** Cached block that is being read. */
	private long readPos;
	private DfsBlock readBlock;

	/**
	 * Initialize a new pack parser.
	 *
	 * @param db
	 *            database the objects will be imported into.
	 * @param ins
	 *            inserter the parser will use to help it inject the objects.
	 * @param in
	 *            the stream to parse.
	 */
	protected DfsPackParser(DfsObjDatabase db, DfsInserter ins, InputStream in) {
		super(db, in);
		this.objdb = db;
		this.objins = ins;
		this.crc = new CRC32();
		this.packDigest = Constants.newMessageDigest();
	}

	/** {@inheritDoc} */
	@Override
	public PackLock parse(ProgressMonitor receiving, ProgressMonitor resolving)
			throws IOException {
		boolean rollback = true;
		try {
			blockCache = DfsBlockCache.getInstance();
			super.parse(receiving, resolving);
			if (isEmptyPack)
				return null;
			buffer(packHash, 0, packHash.length);
			if (currEnd != 0)
				flushBlock();
			out.close();
			out = null;
			currBuf = null;
			readBlock = null;
			packDsc.addFileExt(PACK);
			packDsc.setFileSize(PACK, packEnd);
			packDsc.setBlockSize(PACK, blockSize);

			writePackIndex();
			objdb.commitPack(Collections.singletonList(packDsc), null);
			rollback = false;

			DfsPackFile p = new DfsPackFile(blockCache, packDsc);
			p.setBlockSize(blockSize);
			if (packIndex != null)
				p.setPackIndex(packIndex);

			objdb.addPack(p);

			return null;
		} finally {
			blockCache = null;
			currBuf = null;
			readBlock = null;

			if (def != null) {
				def.end();
				def = null;
			}

			if (out != null) {
				try {
					out.close();
				} catch (IOException err) {
					// Ignore a close error, rollbackPack is also important.
				}
				out = null;
			}

			if (rollback && packDsc != null) {
				try {
					objdb.rollbackPack(Collections.singletonList(packDsc));
				} finally {
					packDsc = null;
				}
			}
		}
	}

	/**
	 * Get description of the imported pack, if one was made.
	 *
	 * @return description of the imported pack, if one was made.
	 */
	public DfsPackDescription getPackDescription() {
		return packDsc;
	}

	/** {@inheritDoc} */
	@Override
	protected void onPackHeader(long objectCount) throws IOException {
		if (objectCount == 0) {
			isEmptyPack = true;
			currBuf = new byte[256];
			return;
		}

		packDsc = objdb.newPack(DfsObjDatabase.PackSource.RECEIVE);
		out = objdb.writeFile(packDsc, PACK);
		packKey = packDsc.getStreamKey(PACK);

		int size = out.blockSize();
		if (size <= 0)
			size = blockCache.getBlockSize();
		else if (size < blockCache.getBlockSize())
			size = (blockCache.getBlockSize() / size) * size;
		blockSize = size;
		currBuf = new byte[blockSize];
	}

	/** {@inheritDoc} */
	@Override
	protected void onBeginWholeObject(long streamPosition, int type,
			long inflatedSize) throws IOException {
		crc.reset();
	}

	/** {@inheritDoc} */
	@Override
	protected void onEndWholeObject(PackedObjectInfo info) throws IOException {
		info.setCRC((int) crc.getValue());
	}

	/** {@inheritDoc} */
	@Override
	protected void onBeginOfsDelta(long streamPosition,
			long baseStreamPosition, long inflatedSize) throws IOException {
		crc.reset();
	}

	/** {@inheritDoc} */
	@Override
	protected void onBeginRefDelta(long streamPosition, AnyObjectId baseId,
			long inflatedSize) throws IOException {
		crc.reset();
	}

	/** {@inheritDoc} */
	@Override
	protected UnresolvedDelta onEndDelta() throws IOException {
		UnresolvedDelta delta = new UnresolvedDelta();
		delta.setCRC((int) crc.getValue());
		return delta;
	}

	/** {@inheritDoc} */
	@Override
	protected void onInflatedObjectData(PackedObjectInfo obj, int typeCode,
			byte[] data) throws IOException {
		// DfsPackParser ignores this event.
	}

	/** {@inheritDoc} */
	@Override
	protected void onObjectHeader(Source src, byte[] raw, int pos, int len)
			throws IOException {
		crc.update(raw, pos, len);
	}

	/** {@inheritDoc} */
	@Override
	protected void onObjectData(Source src, byte[] raw, int pos, int len)
			throws IOException {
		crc.update(raw, pos, len);
	}

	/** {@inheritDoc} */
	@Override
	protected void onStoreStream(byte[] raw, int pos, int len)
			throws IOException {
		buffer(raw, pos, len);
		packDigest.update(raw, pos, len);
	}

	private void buffer(byte[] raw, int pos, int len) throws IOException {
		while (0 < len) {
			int n = Math.min(len, currBuf.length - currEnd);
			if (n == 0) {
				DfsBlock v = flushBlock();
				currBuf = new byte[blockSize];
				currEnd = 0;
				currPos += v.size();
				continue;
			}

			System.arraycopy(raw, pos, currBuf, currEnd, n);
			pos += n;
			len -= n;
			currEnd += n;
			packEnd += n;
		}
	}

	private DfsBlock flushBlock() throws IOException {
		if (isEmptyPack)
			throw new IOException(DfsText.get().willNotStoreEmptyPack);

		out.write(currBuf, 0, currEnd);

		byte[] buf;
		if (currEnd == currBuf.length) {
			buf = currBuf;
		} else {
			buf = new byte[currEnd];
			System.arraycopy(currBuf, 0, buf, 0, currEnd);
		}

		DfsBlock v = new DfsBlock(packKey, currPos, buf);
		readBlock = v;
		blockCache.put(v);
		return v;
	}

	/** {@inheritDoc} */
	@Override
	protected void onPackFooter(byte[] hash) throws IOException {
		// The base class will validate the original hash matches
		// what the stream has stored at the end. We are called
		// only if the hash was good. Save it in case there are no
		// missing bases to append.
		packHash = hash;
	}

	/** {@inheritDoc} */
	@Override
	protected ObjectTypeAndSize seekDatabase(PackedObjectInfo obj,
			ObjectTypeAndSize info) throws IOException {
		readPos = obj.getOffset();
		crc.reset();
		return readObjectHeader(info);
	}

	/** {@inheritDoc} */
	@Override
	protected ObjectTypeAndSize seekDatabase(UnresolvedDelta delta,
			ObjectTypeAndSize info) throws IOException {
		readPos = delta.getOffset();
		crc.reset();
		return readObjectHeader(info);
	}

	/** {@inheritDoc} */
	@Override
	protected int readDatabase(byte[] dst, int pos, int cnt) throws IOException {
		if (cnt == 0)
			return 0;

		if (currPos <= readPos) {
			// Requested read is still buffered. Copy direct from buffer.
			int p = (int) (readPos - currPos);
			int n = Math.min(cnt, currEnd - p);
			if (n == 0)
				throw new EOFException();
			System.arraycopy(currBuf, p, dst, pos, n);
			readPos += n;
			return n;
		}

		if (readBlock == null || !readBlock.contains(packKey, readPos)) {
			long start = toBlockStart(readPos);
			readBlock = blockCache.get(packKey, start);
			if (readBlock == null) {
				int size = (int) Math.min(blockSize, packEnd - start);
				byte[] buf = new byte[size];
				if (read(start, buf, 0, size) != size)
					throw new EOFException();
				readBlock = new DfsBlock(packKey, start, buf);
				blockCache.put(readBlock);
			}
		}

		int n = readBlock.copy(readPos, dst, pos, cnt);
		readPos += n;
		return n;
	}

	private int read(long pos, byte[] dst, int off, int len) throws IOException {
		if (len == 0)
			return 0;

		int cnt = 0;
		while (0 < len) {
			int r = out.read(pos, ByteBuffer.wrap(dst, off, len));
			if (r <= 0)
				break;
			pos += r;
			off += r;
			len -= r;
			cnt += r;
		}
		return cnt != 0 ? cnt : -1;
	}

	private long toBlockStart(long pos) {
		return (pos / blockSize) * blockSize;
	}

	/** {@inheritDoc} */
	@Override
	protected boolean checkCRC(int oldCRC) {
		return oldCRC == (int) crc.getValue();
	}

	/** {@inheritDoc} */
	@Override
	protected boolean onAppendBase(final int typeCode, final byte[] data,
			final PackedObjectInfo info) throws IOException {
		info.setOffset(packEnd);

		final byte[] buf = buffer();
		int sz = data.length;
		int len = 0;
		buf[len++] = (byte) ((typeCode << 4) | (sz & 15));
		sz >>>= 4;
		while (sz > 0) {
			buf[len - 1] |= 0x80;
			buf[len++] = (byte) (sz & 0x7f);
			sz >>>= 7;
		}

		packDigest.update(buf, 0, len);
		crc.reset();
		crc.update(buf, 0, len);
		buffer(buf, 0, len);

		if (def == null)
			def = new Deflater(Deflater.DEFAULT_COMPRESSION, false);
		else
			def.reset();
		def.setInput(data);
		def.finish();

		while (!def.finished()) {
			len = def.deflate(buf);
			packDigest.update(buf, 0, len);
			crc.update(buf, 0, len);
			buffer(buf, 0, len);
		}

		info.setCRC((int) crc.getValue());
		return true;
	}

	/** {@inheritDoc} */
	@Override
	protected void onEndThinPack() throws IOException {
		// Normally when a thin pack is closed the pack header gets
		// updated to reflect the actual object count. This is not going
		// to be possible on most DFS backends, so instead we allow
		// the header to have an incorrect count, but we do change the
		// trailing digest to be correct.
		packHash = packDigest.digest();
	}

	private void writePackIndex() throws IOException {
		List<PackedObjectInfo> list = getSortedObjectList(null /* by ObjectId */);
		packIndex = objins.writePackIndex(packDsc, packHash, list);
	}
}