DfsBlockCache.java

/*
 * Copyright (C) 2008-2011, Google Inc.
 * Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
 * and other copyright owners as documented in the project's IP log.
 *
 * This program and the accompanying materials are made available
 * under the terms of the Eclipse Distribution License v1.0 which
 * accompanies this distribution, is reproduced below, and is
 * available at http://www.eclipse.org/org/documents/edl-v10.php
 *
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or
 * without modification, are permitted provided that the following
 * conditions are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Redistributions in binary form must reproduce the above
 *   copyright notice, this list of conditions and the following
 *   disclaimer in the documentation and/or other materials provided
 *   with the distribution.
 *
 * - Neither the name of the Eclipse Foundation, Inc. nor the
 *   names of its contributors may be used to endorse or promote
 *   products derived from this software without specific prior
 *   written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

package org.eclipse.jgit.internal.storage.dfs;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.LongStream;

import org.eclipse.jgit.internal.JGitText;
import org.eclipse.jgit.internal.storage.pack.PackExt;

/**
 * Caches slices of a
 * {@link org.eclipse.jgit.internal.storage.dfs.BlockBasedFile} in memory for
 * faster read access.
 * <p>
 * The DfsBlockCache serves as a Java based "buffer cache", loading segments of
 * a BlockBasedFile into the JVM heap prior to use. As JGit often wants to do
 * reads of only tiny slices of a file, the DfsBlockCache tries to smooth out
 * these tiny reads into larger block-sized IO operations.
 * <p>
 * Whenever a cache miss occurs, loading is invoked by exactly one thread for
 * the given <code>(DfsStreamKey,position)</code> key tuple. This is ensured by
 * an array of locks, with the tuple hashed to a lock instance.
 * <p>
 * Its too expensive during object access to be accurate with a least recently
 * used (LRU) algorithm. Strictly ordering every read is a lot of overhead that
 * typically doesn't yield a corresponding benefit to the application. This
 * cache implements a clock replacement algorithm, giving each block one chance
 * to have been accessed during a sweep of the cache to save itself from
 * eviction.
 * <p>
 * Entities created by the cache are held under hard references, preventing the
 * Java VM from clearing anything. Blocks are discarded by the replacement
 * algorithm when adding a new block would cause the cache to exceed its
 * configured maximum size.
 * <p>
 * The key tuple is passed through to methods as a pair of parameters rather
 * than as a single Object, thus reducing the transient memory allocations of
 * callers. It is more efficient to avoid the allocation, as we can't be 100%
 * sure that a JIT would be able to stack-allocate a key tuple.
 * <p>
 * The internal hash table does not expand at runtime, instead it is fixed in
 * size at cache creation time. The internal lock table used to gate load
 * invocations is also fixed in size.
 */
public final class DfsBlockCache {
	private static volatile DfsBlockCache cache;

	static {
		reconfigure(new DfsBlockCacheConfig());
	}

	/**
	 * Modify the configuration of the window cache.
	 * <p>
	 * The new configuration is applied immediately, and the existing cache is
	 * cleared.
	 *
	 * @param cfg
	 *            the new window cache configuration.
	 * @throws java.lang.IllegalArgumentException
	 *             the cache configuration contains one or more invalid
	 *             settings, usually too low of a limit.
	 */
	public static void reconfigure(DfsBlockCacheConfig cfg) {
		cache = new DfsBlockCache(cfg);
	}

	/**
	 * Get the currently active DfsBlockCache.
	 *
	 * @return the currently active DfsBlockCache.
	 */
	public static DfsBlockCache getInstance() {
		return cache;
	}

	/** Number of entries in {@link #table}. */
	private final int tableSize;

	/** Hash bucket directory; entries are chained below. */
	private final AtomicReferenceArray<HashEntry> table;

	/**
	 * Locks to prevent concurrent loads for same (PackFile,position) block. The
	 * number of locks is {@link DfsBlockCacheConfig#getConcurrencyLevel()} to
	 * cap the overall concurrent block loads.
	 */
	private final ReentrantLock[] loadLocks;

	/**
	 * A separate pool of locks to prevent concurrent loads for same index or bitmap from PackFile.
	 */
	private final ReentrantLock[] refLocks;

	/** Maximum number of bytes the cache should hold. */
	private final long maxBytes;

	/** Pack files smaller than this size can be copied through the cache. */
	private final long maxStreamThroughCache;

	/**
	 * Suggested block size to read from pack files in.
	 * <p>
	 * If a pack file does not have a native block size, this size will be used.
	 * <p>
	 * If a pack file has a native size, a whole multiple of the native size
	 * will be used until it matches this size.
	 * <p>
	 * The value for blockSize must be a power of 2.
	 */
	private final int blockSize;

	/** As {@link #blockSize} is a power of 2, bits to shift for a / blockSize. */
	private final int blockSizeShift;

	/**
	 * Number of times a block was found in the cache, per pack file extension.
	 */
	private final AtomicReference<AtomicLong[]> statHit;

	/**
	 * Number of times a block was not found, and had to be loaded, per pack
	 * file extension.
	 */
	private final AtomicReference<AtomicLong[]> statMiss;

	/**
	 * Number of blocks evicted due to cache being full, per pack file
	 * extension.
	 */
	private final AtomicReference<AtomicLong[]> statEvict;

	/**
	 * Number of bytes currently loaded in the cache, per pack file extension.
	 */
	private final AtomicReference<AtomicLong[]> liveBytes;

	/** Protects the clock and its related data. */
	private final ReentrantLock clockLock;

	/**
	 * A consumer of object reference lock wait time milliseconds.  May be used to build a metric.
	 */
	private final Consumer<Long> refLockWaitTime;

	/** Current position of the clock. */
	private Ref clockHand;

	@SuppressWarnings("unchecked")
	private DfsBlockCache(DfsBlockCacheConfig cfg) {
		tableSize = tableSize(cfg);
		if (tableSize < 1) {
			throw new IllegalArgumentException(JGitText.get().tSizeMustBeGreaterOrEqual1);
		}

		table = new AtomicReferenceArray<>(tableSize);
		loadLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
		for (int i = 0; i < loadLocks.length; i++) {
			loadLocks[i] = new ReentrantLock(true /* fair */);
		}
		refLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
		for (int i = 0; i < refLocks.length; i++) {
			refLocks[i] = new ReentrantLock(true /* fair */);
		}

		maxBytes = cfg.getBlockLimit();
		maxStreamThroughCache = (long) (maxBytes * cfg.getStreamRatio());
		blockSize = cfg.getBlockSize();
		blockSizeShift = Integer.numberOfTrailingZeros(blockSize);

		clockLock = new ReentrantLock(true /* fair */);
		String none = ""; //$NON-NLS-1$
		clockHand = new Ref<>(
				DfsStreamKey.of(new DfsRepositoryDescription(none), none, null),
				-1, 0, null);
		clockHand.next = clockHand;

		statHit = new AtomicReference<>(newCounters());
		statMiss = new AtomicReference<>(newCounters());
		statEvict = new AtomicReference<>(newCounters());
		liveBytes = new AtomicReference<>(newCounters());

		refLockWaitTime = cfg.getRefLockWaitTimeConsumer();
	}

	boolean shouldCopyThroughCache(long length) {
		return length <= maxStreamThroughCache;
	}

	/**
	 * Get total number of bytes in the cache, per pack file extension.
	 *
	 * @return total number of bytes in the cache, per pack file extension.
	 */
	public long[] getCurrentSize() {
		return getStatVals(liveBytes);
	}

	/**
	 * Get 0..100, defining how full the cache is.
	 *
	 * @return 0..100, defining how full the cache is.
	 */
	public long getFillPercentage() {
		return LongStream.of(getCurrentSize()).sum() * 100 / maxBytes;
	}

	/**
	 * Get number of requests for items in the cache, per pack file extension.
	 *
	 * @return number of requests for items in the cache, per pack file
	 *         extension.
	 */
	public long[] getHitCount() {
		return getStatVals(statHit);
	}

	/**
	 * Get number of requests for items not in the cache, per pack file
	 * extension.
	 *
	 * @return number of requests for items not in the cache, per pack file
	 *         extension.
	 */
	public long[] getMissCount() {
		return getStatVals(statMiss);
	}

	/**
	 * Get total number of requests (hit + miss), per pack file extension.
	 *
	 * @return total number of requests (hit + miss), per pack file extension.
	 */
	public long[] getTotalRequestCount() {
		AtomicLong[] hit = statHit.get();
		AtomicLong[] miss = statMiss.get();
		long[] cnt = new long[Math.max(hit.length, miss.length)];
		for (int i = 0; i < hit.length; i++) {
			cnt[i] += hit[i].get();
		}
		for (int i = 0; i < miss.length; i++) {
			cnt[i] += miss[i].get();
		}
		return cnt;
	}

	/**
	 * Get hit ratios
	 *
	 * @return hit ratios
	 */
	public long[] getHitRatio() {
		AtomicLong[] hit = statHit.get();
		AtomicLong[] miss = statMiss.get();
		long[] ratio = new long[Math.max(hit.length, miss.length)];
		for (int i = 0; i < ratio.length; i++) {
			if (i >= hit.length) {
				ratio[i] = 0;
			} else if (i >= miss.length) {
				ratio[i] = 100;
			} else {
				long hitVal = hit[i].get();
				long missVal = miss[i].get();
				long total = hitVal + missVal;
				ratio[i] = total == 0 ? 0 : hitVal * 100 / total;
			}
		}
		return ratio;
	}

	/**
	 * Get number of evictions performed due to cache being full, per pack file
	 * extension.
	 *
	 * @return number of evictions performed due to cache being full, per pack
	 *         file extension.
	 */
	public long[] getEvictions() {
		return getStatVals(statEvict);
	}

	/**
	 * Quickly check if the cache contains block 0 of the given stream.
	 * <p>
	 * This can be useful for sophisticated pre-read algorithms to quickly
	 * determine if a file is likely already in cache, especially small
	 * reftables which may be smaller than a typical DFS block size.
	 *
	 * @param key
	 *            the file to check.
	 * @return true if block 0 (the first block) is in the cache.
	 */
	public boolean hasBlock0(DfsStreamKey key) {
		HashEntry e1 = table.get(slot(key, 0));
		DfsBlock v = scan(e1, key, 0);
		return v != null && v.contains(key, 0);
	}

	private int hash(int packHash, long off) {
		return packHash + (int) (off >>> blockSizeShift);
	}

	int getBlockSize() {
		return blockSize;
	}

	private static int tableSize(DfsBlockCacheConfig cfg) {
		final int wsz = cfg.getBlockSize();
		final long limit = cfg.getBlockLimit();
		if (wsz <= 0) {
			throw new IllegalArgumentException(JGitText.get().invalidWindowSize);
		}
		if (limit < wsz) {
			throw new IllegalArgumentException(JGitText.get().windowSizeMustBeLesserThanLimit);
		}
		return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE);
	}

	/**
	 * Look up a cached object, creating and loading it if it doesn't exist.
	 *
	 * @param file
	 *            the pack that "contains" the cached object.
	 * @param position
	 *            offset within <code>pack</code> of the object.
	 * @param ctx
	 *            current thread's reader.
	 * @param fileChannel
	 *            supplier for channel to read {@code pack}.
	 * @return the object reference.
	 * @throws IOException
	 *             the reference was not in the cache and could not be loaded.
	 */
	DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
			ReadableChannelSupplier fileChannel) throws IOException {
		final long requestedPosition = position;
		position = file.alignToBlock(position);

		DfsStreamKey key = file.key;
		int slot = slot(key, position);
		HashEntry e1 = table.get(slot);
		DfsBlock v = scan(e1, key, position);
		if (v != null && v.contains(key, requestedPosition)) {
			ctx.stats.blockCacheHit++;
			getStat(statHit, key).incrementAndGet();
			return v;
		}

		reserveSpace(blockSize, key);
		ReentrantLock regionLock = lockFor(key, position);
		regionLock.lock();
		try {
			HashEntry e2 = table.get(slot);
			if (e2 != e1) {
				v = scan(e2, key, position);
				if (v != null) {
					ctx.stats.blockCacheHit++;
					getStat(statHit, key).incrementAndGet();
					creditSpace(blockSize, key);
					return v;
				}
			}

			getStat(statMiss, key).incrementAndGet();
			boolean credit = true;
			try {
				v = file.readOneBlock(requestedPosition, ctx,
						fileChannel.get());
				credit = false;
			} finally {
				if (credit) {
					creditSpace(blockSize, key);
				}
			}
			if (position != v.start) {
				// The file discovered its blockSize and adjusted.
				position = v.start;
				slot = slot(key, position);
				e2 = table.get(slot);
			}

			Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v);
			ref.hot = true;
			for (;;) {
				HashEntry n = new HashEntry(clean(e2), ref);
				if (table.compareAndSet(slot, e2, n)) {
					break;
				}
				e2 = table.get(slot);
			}
			addToClock(ref, blockSize - v.size());
		} finally {
			regionLock.unlock();
		}

		// If the block size changed from the default, it is possible the block
		// that was loaded is the wrong block for the requested position.
		if (v.contains(file.key, requestedPosition)) {
			return v;
		}
		return getOrLoad(file, requestedPosition, ctx, fileChannel);
	}

	@SuppressWarnings("unchecked")
	private void reserveSpace(int reserve, DfsStreamKey key) {
		clockLock.lock();
		try {
			long live = LongStream.of(getCurrentSize()).sum() + reserve;
			if (maxBytes < live) {
				Ref prev = clockHand;
				Ref hand = clockHand.next;
				do {
					if (hand.hot) {
						// Value was recently touched. Clear
						// hot and give it another chance.
						hand.hot = false;
						prev = hand;
						hand = hand.next;
						continue;
					} else if (prev == hand)
						break;

					// No recent access since last scan, kill
					// value and remove from clock.
					Ref dead = hand;
					hand = hand.next;
					prev.next = hand;
					dead.next = null;
					dead.value = null;
					live -= dead.size;
					getStat(liveBytes, dead.key).addAndGet(-dead.size);
					getStat(statEvict, dead.key).incrementAndGet();
				} while (maxBytes < live);
				clockHand = prev;
			}
			getStat(liveBytes, key).addAndGet(reserve);
		} finally {
			clockLock.unlock();
		}
	}

	private void creditSpace(int credit, DfsStreamKey key) {
		clockLock.lock();
		try {
			getStat(liveBytes, key).addAndGet(-credit);
		} finally {
			clockLock.unlock();
		}
	}

	@SuppressWarnings("unchecked")
	private void addToClock(Ref ref, int credit) {
		clockLock.lock();
		try {
			if (credit != 0) {
				getStat(liveBytes, ref.key).addAndGet(-credit);
			}
			Ref ptr = clockHand;
			ref.next = ptr.next;
			ptr.next = ref;
			clockHand = ref;
		} finally {
			clockLock.unlock();
		}
	}

	void put(DfsBlock v) {
		put(v.stream, v.start, v.size(), v);
	}

	/**
	 * Look up a cached object, creating and loading it if it doesn't exist.
	 *
	 * @param key
	 *            the stream key of the pack.
	 * @param loader
	 *            the function to load the reference.
	 * @return the object reference.
	 * @throws IOException
	 *             the reference was not in the cache and could not be loaded.
	 */
	<T> Ref<T> getOrLoadRef(DfsStreamKey key, RefLoader<T> loader)
			throws IOException {
		int slot = slot(key, 0);
		HashEntry e1 = table.get(slot);
		Ref<T> ref = scanRef(e1, key, 0);
		if (ref != null) {
			getStat(statHit, key).incrementAndGet();
			return ref;
		}

		ReentrantLock regionLock = lockForRef(key);
		long lockStart = System.currentTimeMillis();
		regionLock.lock();
		try {
			HashEntry e2 = table.get(slot);
			if (e2 != e1) {
				ref = scanRef(e2, key, 0);
				if (ref != null) {
					getStat(statHit, key).incrementAndGet();
					return ref;
				}
			}

			if (refLockWaitTime != null) {
				refLockWaitTime.accept(
						Long.valueOf(System.currentTimeMillis() - lockStart));
			}
			getStat(statMiss, key).incrementAndGet();
			ref = loader.load();
			ref.hot = true;
			// Reserve after loading to get the size of the object
			reserveSpace(ref.size, key);
			for (;;) {
				HashEntry n = new HashEntry(clean(e2), ref);
				if (table.compareAndSet(slot, e2, n)) {
					break;
				}
				e2 = table.get(slot);
			}
			addToClock(ref, 0);
		} finally {
			regionLock.unlock();
		}
		return ref;
	}

	<T> Ref<T> putRef(DfsStreamKey key, long size, T v) {
		return put(key, 0, (int) Math.min(size, Integer.MAX_VALUE), v);
	}

	<T> Ref<T> put(DfsStreamKey key, long pos, int size, T v) {
		int slot = slot(key, pos);
		HashEntry e1 = table.get(slot);
		Ref<T> ref = scanRef(e1, key, pos);
		if (ref != null) {
			return ref;
		}

		reserveSpace(size, key);
		ReentrantLock regionLock = lockFor(key, pos);
		regionLock.lock();
		try {
			HashEntry e2 = table.get(slot);
			if (e2 != e1) {
				ref = scanRef(e2, key, pos);
				if (ref != null) {
					creditSpace(size, key);
					return ref;
				}
			}

			ref = new Ref<>(key, pos, size, v);
			ref.hot = true;
			for (;;) {
				HashEntry n = new HashEntry(clean(e2), ref);
				if (table.compareAndSet(slot, e2, n)) {
					break;
				}
				e2 = table.get(slot);
			}
			addToClock(ref, 0);
		} finally {
			regionLock.unlock();
		}
		return ref;
	}

	boolean contains(DfsStreamKey key, long position) {
		return scan(table.get(slot(key, position)), key, position) != null;
	}

	@SuppressWarnings("unchecked")
	<T> T get(DfsStreamKey key, long position) {
		T val = (T) scan(table.get(slot(key, position)), key, position);
		if (val == null) {
			getStat(statMiss, key).incrementAndGet();
		} else {
			getStat(statHit, key).incrementAndGet();
		}
		return val;
	}

	private <T> T scan(HashEntry n, DfsStreamKey key, long position) {
		Ref<T> r = scanRef(n, key, position);
		return r != null ? r.get() : null;
	}

	@SuppressWarnings("unchecked")
	private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) {
		for (; n != null; n = n.next) {
			Ref<T> r = n.ref;
			if (r.position == position && r.key.equals(key)) {
				return r.get() != null ? r : null;
			}
		}
		return null;
	}

	private int slot(DfsStreamKey key, long position) {
		return (hash(key.hash, position) >>> 1) % tableSize;
	}

	private ReentrantLock lockFor(DfsStreamKey key, long position) {
		return loadLocks[(hash(key.hash, position) >>> 1) % loadLocks.length];
	}

	private ReentrantLock lockForRef(DfsStreamKey key) {
		return refLocks[(key.hash >>> 1) % refLocks.length];
	}

	private static AtomicLong[] newCounters() {
		AtomicLong[] ret = new AtomicLong[PackExt.values().length];
		for (int i = 0; i < ret.length; i++) {
			ret[i] = new AtomicLong();
		}
		return ret;
	}

	private static AtomicLong getStat(AtomicReference<AtomicLong[]> stats,
			DfsStreamKey key) {
		int pos = key.packExtPos;
		while (true) {
			AtomicLong[] vals = stats.get();
			if (pos < vals.length) {
				return vals[pos];
			}
			AtomicLong[] expect = vals;
			vals = new AtomicLong[Math.max(pos + 1, PackExt.values().length)];
			System.arraycopy(expect, 0, vals, 0, expect.length);
			for (int i = expect.length; i < vals.length; i++) {
				vals[i] = new AtomicLong();
			}
			if (stats.compareAndSet(expect, vals)) {
				return vals[pos];
			}
		}
	}

	private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) {
		AtomicLong[] stats = stat.get();
		long[] cnt = new long[stats.length];
		for (int i = 0; i < stats.length; i++) {
			cnt[i] = stats[i].get();
		}
		return cnt;
	}

	private static HashEntry clean(HashEntry top) {
		while (top != null && top.ref.next == null)
			top = top.next;
		if (top == null) {
			return null;
		}
		HashEntry n = clean(top.next);
		return n == top.next ? top : new HashEntry(n, top.ref);
	}

	private static final class HashEntry {
		/** Next entry in the hash table's chain list. */
		final HashEntry next;

		/** The referenced object. */
		final Ref ref;

		HashEntry(HashEntry n, Ref r) {
			next = n;
			ref = r;
		}
	}

	static final class Ref<T> {
		final DfsStreamKey key;
		final long position;
		final int size;
		volatile T value;
		Ref next;
		volatile boolean hot;

		Ref(DfsStreamKey key, long position, int size, T v) {
			this.key = key;
			this.position = position;
			this.size = size;
			this.value = v;
		}

		T get() {
			T v = value;
			if (v != null) {
				hot = true;
			}
			return v;
		}

		boolean has() {
			return value != null;
		}
	}

	@FunctionalInterface
	interface RefLoader<T> {
		Ref<T> load() throws IOException;
	}

	/**
	 * Supplier for readable channel
	 */
	@FunctionalInterface
	interface ReadableChannelSupplier {
		/**
		 * @return ReadableChannel
		 * @throws IOException
		 */
		ReadableChannel get() throws IOException;
	}
}