DfsInserter.java

/*
 * Copyright (C) 2011, Google Inc.
 * and other copyright owners as documented in the project's IP log.
 *
 * This program and the accompanying materials are made available
 * under the terms of the Eclipse Distribution License v1.0 which
 * accompanies this distribution, is reproduced below, and is
 * available at http://www.eclipse.org/org/documents/edl-v10.php
 *
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or
 * without modification, are permitted provided that the following
 * conditions are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Redistributions in binary form must reproduce the above
 *   copyright notice, this list of conditions and the following
 *   disclaimer in the documentation and/or other materials provided
 *   with the distribution.
 *
 * - Neither the name of the Eclipse Foundation, Inc. nor the
 *   names of its contributors may be used to endorse or promote
 *   products derived from this software without specific prior
 *   written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

package org.eclipse.jgit.internal.storage.dfs;

import static org.eclipse.jgit.internal.storage.pack.PackExt.INDEX;
import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;

import java.io.BufferedInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.zip.CRC32;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;

import org.eclipse.jgit.annotations.Nullable;
import org.eclipse.jgit.errors.CorruptObjectException;
import org.eclipse.jgit.errors.IncorrectObjectTypeException;
import org.eclipse.jgit.errors.LargeObjectException;
import org.eclipse.jgit.internal.JGitText;
import org.eclipse.jgit.internal.storage.file.PackIndex;
import org.eclipse.jgit.internal.storage.file.PackIndexWriter;
import org.eclipse.jgit.internal.storage.pack.PackExt;
import org.eclipse.jgit.lib.AbbreviatedObjectId;
import org.eclipse.jgit.lib.AnyObjectId;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectIdOwnerMap;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.ObjectLoader;
import org.eclipse.jgit.lib.ObjectReader;
import org.eclipse.jgit.lib.ObjectStream;
import org.eclipse.jgit.transport.PackedObjectInfo;
import org.eclipse.jgit.util.BlockList;
import org.eclipse.jgit.util.IO;
import org.eclipse.jgit.util.NB;
import org.eclipse.jgit.util.TemporaryBuffer;
import org.eclipse.jgit.util.io.CountingOutputStream;
import org.eclipse.jgit.util.sha1.SHA1;

/**
 * Inserts objects into the DFS.
 */
public class DfsInserter extends ObjectInserter {
	/** Always produce version 2 indexes, to get CRC data. */
	private static final int INDEX_VERSION = 2;

	final DfsObjDatabase db;
	int compression = Deflater.BEST_COMPRESSION;

	List<PackedObjectInfo> objectList;
	ObjectIdOwnerMap<PackedObjectInfo> objectMap;

	DfsBlockCache cache;
	DfsStreamKey packKey;
	DfsPackDescription packDsc;
	PackStream packOut;
	private boolean rollback;
	private boolean checkExisting = true;

	/**
	 * Initialize a new inserter.
	 *
	 * @param db
	 *            database the inserter writes to.
	 */
	protected DfsInserter(DfsObjDatabase db) {
		this.db = db;
	}

	/**
	 * Check existence
	 *
	 * @param check
	 *            if {@code false}, will write out possibly-duplicate objects
	 *            without first checking whether they exist in the repo; default
	 *            is true.
	 */
	public void checkExisting(boolean check) {
		checkExisting = check;
	}

	void setCompressionLevel(int compression) {
		this.compression = compression;
	}

	/** {@inheritDoc} */
	@Override
	public DfsPackParser newPackParser(InputStream in) throws IOException {
		return new DfsPackParser(db, this, in);
	}

	/** {@inheritDoc} */
	@Override
	public ObjectReader newReader() {
		return new Reader();
	}

	/** {@inheritDoc} */
	@Override
	public ObjectId insert(int type, byte[] data, int off, int len)
			throws IOException {
		ObjectId id = idFor(type, data, off, len);
		if (objectMap != null && objectMap.contains(id))
			return id;
		// Ignore unreachable (garbage) objects here.
		if (checkExisting && db.has(id, true))
			return id;

		long offset = beginObject(type, len);
		packOut.compress.write(data, off, len);
		packOut.compress.finish();
		return endObject(id, offset);
	}

	/** {@inheritDoc} */
	@Override
	public ObjectId insert(int type, long len, InputStream in)
			throws IOException {
		byte[] buf = insertBuffer(len);
		if (len <= buf.length) {
			IO.readFully(in, buf, 0, (int) len);
			return insert(type, buf, 0, (int) len);
		}

		long offset = beginObject(type, len);
		SHA1 md = digest();
		md.update(Constants.encodedTypeString(type));
		md.update((byte) ' ');
		md.update(Constants.encodeASCII(len));
		md.update((byte) 0);

		while (0 < len) {
			int n = in.read(buf, 0, (int) Math.min(buf.length, len));
			if (n <= 0)
				throw new EOFException();
			md.update(buf, 0, n);
			packOut.compress.write(buf, 0, n);
			len -= n;
		}
		packOut.compress.finish();
		return endObject(md.toObjectId(), offset);
	}

	private byte[] insertBuffer(long len) {
		byte[] buf = buffer();
		if (len <= buf.length)
			return buf;
		if (len < db.getReaderOptions().getStreamFileThreshold()) {
			try {
				return new byte[(int) len];
			} catch (OutOfMemoryError noMem) {
				return buf;
			}
		}
		return buf;
	}

	/** {@inheritDoc} */
	@Override
	public void flush() throws IOException {
		if (packDsc == null)
			return;

		if (packOut == null)
			throw new IOException();

		byte[] packHash = packOut.writePackFooter();
		packDsc.addFileExt(PACK);
		packDsc.setFileSize(PACK, packOut.getCount());
		packOut.close();
		packOut = null;

		sortObjectsById();

		PackIndex index = writePackIndex(packDsc, packHash, objectList);
		db.commitPack(Collections.singletonList(packDsc), null);
		rollback = false;

		DfsPackFile p = new DfsPackFile(cache, packDsc);
		if (index != null)
			p.setPackIndex(index);
		db.addPack(p);
		clear();
	}

	/** {@inheritDoc} */
	@Override
	public void close() {
		if (packOut != null) {
			try {
				packOut.close();
			} catch (IOException err) {
				// Ignore a close failure, the pack should be removed.
			} finally {
				packOut = null;
			}
		}
		if (rollback && packDsc != null) {
			try {
				db.rollbackPack(Collections.singletonList(packDsc));
			} finally {
				packDsc = null;
				rollback = false;
			}
		}
		clear();
	}

	private void clear() {
		objectList = null;
		objectMap = null;
		packKey = null;
		packDsc = null;
	}

	private long beginObject(int type, long len) throws IOException {
		if (packOut == null)
			beginPack();
		long offset = packOut.getCount();
		packOut.beginObject(type, len);
		return offset;
	}

	private ObjectId endObject(ObjectId id, long offset) {
		PackedObjectInfo obj = new PackedObjectInfo(id);
		obj.setOffset(offset);
		obj.setCRC((int) packOut.crc32.getValue());
		objectList.add(obj);
		objectMap.addIfAbsent(obj);
		return id;
	}

	private void beginPack() throws IOException {
		objectList = new BlockList<>();
		objectMap = new ObjectIdOwnerMap<>();
		cache = DfsBlockCache.getInstance();

		rollback = true;
		packDsc = db.newPack(DfsObjDatabase.PackSource.INSERT);
		DfsOutputStream dfsOut = db.writeFile(packDsc, PACK);
		packDsc.setBlockSize(PACK, dfsOut.blockSize());
		packOut = new PackStream(dfsOut);
		packKey = packDsc.getStreamKey(PACK);

		// Write the header as though it were a single object pack.
		byte[] buf = packOut.hdrBuf;
		System.arraycopy(Constants.PACK_SIGNATURE, 0, buf, 0, 4);
		NB.encodeInt32(buf, 4, 2); // Always use pack version 2.
		NB.encodeInt32(buf, 8, 1); // Always assume 1 object.
		packOut.write(buf, 0, 12);
	}

	private void sortObjectsById() {
		Collections.sort(objectList);
	}

	@Nullable
	private TemporaryBuffer.Heap maybeGetTemporaryBuffer(
			List<PackedObjectInfo> list) {
		if (list.size() <= 58000) {
			return new TemporaryBuffer.Heap(2 << 20);
		}
		return null;
	}

	PackIndex writePackIndex(DfsPackDescription pack, byte[] packHash,
			List<PackedObjectInfo> list) throws IOException {
		pack.setIndexVersion(INDEX_VERSION);
		pack.setObjectCount(list.size());

		// If there are less than 58,000 objects, the entire index fits in under
		// 2 MiB. Callers will probably need the index immediately, so buffer
		// the index in process and load from the buffer.
		PackIndex packIndex = null;
		try (TemporaryBuffer.Heap buf = maybeGetTemporaryBuffer(list);
				DfsOutputStream os = db.writeFile(pack, INDEX);
				CountingOutputStream cnt = new CountingOutputStream(os)) {
			if (buf != null) {
				index(buf, packHash, list);
				packIndex = PackIndex.read(buf.openInputStream());
				buf.writeTo(cnt, null);
			} else {
				index(cnt, packHash, list);
			}
			pack.addFileExt(INDEX);
			pack.setBlockSize(INDEX, os.blockSize());
			pack.setFileSize(INDEX, cnt.getCount());
		}
		return packIndex;
	}

	private static void index(OutputStream out, byte[] packHash,
			List<PackedObjectInfo> list) throws IOException {
		PackIndexWriter.createVersion(out, INDEX_VERSION).write(list, packHash);
	}

	private class PackStream extends OutputStream {
		private final DfsOutputStream out;
		private final MessageDigest md;
		final byte[] hdrBuf;
		private final Deflater deflater;
		private final int blockSize;

		private long currPos; // Position of currBuf[0] in the output stream.
		private int currPtr; // Number of bytes in currBuf.
		private byte[] currBuf;

		final CRC32 crc32;
		final DeflaterOutputStream compress;

		PackStream(DfsOutputStream out) {
			this.out = out;

			hdrBuf = new byte[32];
			md = Constants.newMessageDigest();
			crc32 = new CRC32();
			deflater = new Deflater(compression);
			compress = new DeflaterOutputStream(this, deflater, 8192);

			int size = out.blockSize();
			if (size <= 0)
				size = cache.getBlockSize();
			else if (size < cache.getBlockSize())
				size = (cache.getBlockSize() / size) * size;
			blockSize = size;
			currBuf = new byte[blockSize];
		}

		long getCount() {
			return currPos + currPtr;
		}

		void beginObject(int objectType, long length) throws IOException {
			crc32.reset();
			deflater.reset();
			write(hdrBuf, 0, encodeTypeSize(objectType, length));
		}

		private int encodeTypeSize(int type, long rawLength) {
			long nextLength = rawLength >>> 4;
			hdrBuf[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F));
			rawLength = nextLength;
			int n = 1;
			while (rawLength > 0) {
				nextLength >>>= 7;
				hdrBuf[n++] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (rawLength & 0x7F));
				rawLength = nextLength;
			}
			return n;
		}

		@Override
		public void write(int b) throws IOException {
			hdrBuf[0] = (byte) b;
			write(hdrBuf, 0, 1);
		}

		@Override
		public void write(byte[] data, int off, int len) throws IOException {
			crc32.update(data, off, len);
			md.update(data, off, len);
			writeNoHash(data, off, len);
		}

		private void writeNoHash(byte[] data, int off, int len)
				throws IOException {
			while (0 < len) {
				int n = Math.min(len, currBuf.length - currPtr);
				if (n == 0) {
					flushBlock();
					currBuf = new byte[blockSize];
					continue;
				}

				System.arraycopy(data, off, currBuf, currPtr, n);
				off += n;
				len -= n;
				currPtr += n;
			}
		}

		private void flushBlock() throws IOException {
			out.write(currBuf, 0, currPtr);

			byte[] buf;
			if (currPtr == currBuf.length)
				buf = currBuf;
			else
				buf = copyOf(currBuf, 0, currPtr);
			cache.put(new DfsBlock(packKey, currPos, buf));

			currPos += currPtr;
			currPtr = 0;
			currBuf = null;
		}

		private byte[] copyOf(byte[] src, int ptr, int cnt) {
			byte[] dst = new byte[cnt];
			System.arraycopy(src, ptr, dst, 0, cnt);
			return dst;
		}

		byte[] writePackFooter() throws IOException {
			byte[] packHash = md.digest();
			writeNoHash(packHash, 0, packHash.length);
			if (currPtr != 0)
				flushBlock();
			return packHash;
		}

		int read(long pos, byte[] dst, int ptr, int cnt) throws IOException {
			int r = 0;
			while (pos < currPos && r < cnt) {
				DfsBlock b = getOrLoadBlock(pos);
				int n = b.copy(pos, dst, ptr + r, cnt - r);
				pos += n;
				r += n;
			}
			if (currPos <= pos && r < cnt) {
				int s = (int) (pos - currPos);
				int n = Math.min(currPtr - s, cnt - r);
				System.arraycopy(currBuf, s, dst, ptr + r, n);
				r += n;
			}
			return r;
		}

		byte[] inflate(DfsReader ctx, long pos, int len) throws IOException,
				DataFormatException {
			byte[] dstbuf;
			try {
				dstbuf = new byte[len];
			} catch (OutOfMemoryError noMemory) {
				return null; // Caller will switch to large object streaming.
			}

			Inflater inf = ctx.inflater();
			pos += setInput(pos, inf);
			for (int dstoff = 0;;) {
				int n = inf.inflate(dstbuf, dstoff, dstbuf.length - dstoff);
				dstoff += n;
				if (inf.finished())
					return dstbuf;
				if (inf.needsInput())
					pos += setInput(pos, inf);
				else if (n == 0)
					throw new DataFormatException();
			}
		}

		private int setInput(long pos, Inflater inf)
				throws IOException, DataFormatException {
			if (pos < currPos)
				return getOrLoadBlock(pos).setInput(pos, inf);
			if (pos < currPos + currPtr) {
				int s = (int) (pos - currPos);
				int n = currPtr - s;
				inf.setInput(currBuf, s, n);
				return n;
			}
			throw new EOFException(JGitText.get().unexpectedEofInPack);
		}

		private DfsBlock getOrLoadBlock(long pos) throws IOException {
			long s = toBlockStart(pos);
			DfsBlock b = cache.get(packKey, s);
			if (b != null)
				return b;

			byte[] d = new byte[blockSize];
			for (int p = 0; p < blockSize;) {
				int n = out.read(s + p, ByteBuffer.wrap(d, p, blockSize - p));
				if (n <= 0)
					throw new EOFException(JGitText.get().unexpectedEofInPack);
				p += n;
			}
			b = new DfsBlock(packKey, s, d);
			cache.put(b);
			return b;
		}

		private long toBlockStart(long pos) {
			return (pos / blockSize) * blockSize;
		}

		@Override
		public void close() throws IOException {
			deflater.end();
			out.close();
		}
	}

	private class Reader extends ObjectReader {
		private final DfsReader ctx = db.newReader();

		@Override
		public ObjectReader newReader() {
			return db.newReader();
		}

		@Override
		public Collection<ObjectId> resolve(AbbreviatedObjectId id)
				throws IOException {
			Collection<ObjectId> stored = ctx.resolve(id);
			if (objectList == null)
				return stored;

			Set<ObjectId> r = new HashSet<>(stored.size() + 2);
			r.addAll(stored);
			for (PackedObjectInfo obj : objectList) {
				if (id.prefixCompare(obj) == 0)
					r.add(obj.copy());
			}
			return r;
		}

		@Override
		public ObjectLoader open(AnyObjectId objectId, int typeHint)
				throws IOException {
			if (objectMap == null)
				return ctx.open(objectId, typeHint);

			PackedObjectInfo obj = objectMap.get(objectId);
			if (obj == null)
				return ctx.open(objectId, typeHint);

			byte[] buf = buffer();
			int cnt = packOut.read(obj.getOffset(), buf, 0, 20);
			if (cnt <= 0)
					throw new EOFException(JGitText.get().unexpectedEofInPack);

			int c = buf[0] & 0xff;
			int type = (c >> 4) & 7;
			if (type == OBJ_OFS_DELTA || type == OBJ_REF_DELTA)
				throw new IOException(MessageFormat.format(
						JGitText.get().cannotReadBackDelta, Integer.toString(type)));
			if (typeHint != OBJ_ANY && type != typeHint) {
				throw new IncorrectObjectTypeException(objectId.copy(), typeHint);
			}

			long sz = c & 0x0f;
			int ptr = 1;
			int shift = 4;
			while ((c & 0x80) != 0) {
				if (ptr >= cnt)
					throw new EOFException(JGitText.get().unexpectedEofInPack);
				c = buf[ptr++] & 0xff;
				sz += ((long) (c & 0x7f)) << shift;
				shift += 7;
			}

			long zpos = obj.getOffset() + ptr;
			if (sz < ctx.getStreamFileThreshold()) {
				byte[] data = inflate(obj, zpos, (int) sz);
				if (data != null)
					return new ObjectLoader.SmallObject(type, data);
			}
			return new StreamLoader(obj.copy(), type, sz, packKey, zpos);
		}

		private byte[] inflate(PackedObjectInfo obj, long zpos, int sz)
				throws IOException, CorruptObjectException {
			try {
				return packOut.inflate(ctx, zpos, sz);
			} catch (DataFormatException dfe) {
				throw new CorruptObjectException(
						MessageFormat.format(
								JGitText.get().objectAtHasBadZlibStream,
								Long.valueOf(obj.getOffset()),
								packDsc.getFileName(PackExt.PACK)),
						dfe);
			}
		}

		@Override
		public boolean has(AnyObjectId objectId) throws IOException {
			return (objectMap != null && objectMap.contains(objectId))
					|| ctx.has(objectId);
		}

		@Override
		public Set<ObjectId> getShallowCommits() throws IOException {
			return ctx.getShallowCommits();
		}

		@Override
		public ObjectInserter getCreatedFromInserter() {
			return DfsInserter.this;
		}

		@Override
		public void close() {
			ctx.close();
		}
	}

	private class StreamLoader extends ObjectLoader {
		private final ObjectId id;
		private final int type;
		private final long size;

		private final DfsStreamKey srcPack;
		private final long pos;

		StreamLoader(ObjectId id, int type, long sz,
				DfsStreamKey key, long pos) {
			this.id = id;
			this.type = type;
			this.size = sz;
			this.srcPack = key;
			this.pos = pos;
		}

		@Override
		public ObjectStream openStream() throws IOException {
			@SuppressWarnings("resource") // Explicitly closed below
			final DfsReader ctx = db.newReader();
			if (srcPack != packKey) {
				try {
					// Post DfsInserter.flush() use the normal code path.
					// The newly created pack is registered in the cache.
					return ctx.open(id, type).openStream();
				} finally {
					ctx.close();
				}
			}

			int bufsz = 8192;
			final Inflater inf = ctx.inflater();
			return new ObjectStream.Filter(type,
					size, new BufferedInputStream(new InflaterInputStream(
							new ReadBackStream(pos), inf, bufsz), bufsz)) {
				@Override
				public void close() throws IOException {
					ctx.close();
					super.close();
				}
			};
		}

		@Override
		public int getType() {
			return type;
		}

		@Override
		public long getSize() {
			return size;
		}

		@Override
		public boolean isLarge() {
			return true;
		}

		@Override
		public byte[] getCachedBytes() throws LargeObjectException {
			throw new LargeObjectException.ExceedsLimit(
					db.getReaderOptions().getStreamFileThreshold(), size);
		}
	}

	private final class ReadBackStream extends InputStream {
		private long pos;

		ReadBackStream(long offset) {
			pos = offset;
		}

		@Override
		public int read() throws IOException {
			byte[] b = new byte[1];
			int n = read(b);
			return n == 1 ? b[0] & 0xff : -1;
		}

		@Override
		public int read(byte[] buf, int ptr, int len) throws IOException {
			int n = packOut.read(pos, buf, ptr, len);
			if (n > 0) {
				pos += n;
			}
			return n;
		}
	}
}