DfsBlockCache.java

  1. /*
  2.  * Copyright (C) 2008-2011, Google Inc.
  3.  * Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org> and others
  4.  *
  5.  * This program and the accompanying materials are made available under the
  6.  * terms of the Eclipse Distribution License v. 1.0 which is available at
  7.  * https://www.eclipse.org/org/documents/edl-v10.php.
  8.  *
  9.  * SPDX-License-Identifier: BSD-3-Clause
  10.  */

  11. package org.eclipse.jgit.internal.storage.dfs;

  12. import java.io.IOException;
  13. import java.util.concurrent.atomic.AtomicLong;
  14. import java.util.concurrent.atomic.AtomicReference;
  15. import java.util.concurrent.atomic.AtomicReferenceArray;
  16. import java.util.concurrent.locks.ReentrantLock;
  17. import java.util.function.Consumer;
  18. import java.util.stream.LongStream;

  19. import org.eclipse.jgit.internal.JGitText;
  20. import org.eclipse.jgit.internal.storage.pack.PackExt;

  21. /**
  22.  * Caches slices of a
  23.  * {@link org.eclipse.jgit.internal.storage.dfs.BlockBasedFile} in memory for
  24.  * faster read access.
  25.  * <p>
  26.  * The DfsBlockCache serves as a Java based "buffer cache", loading segments of
  27.  * a BlockBasedFile into the JVM heap prior to use. As JGit often wants to do
  28.  * reads of only tiny slices of a file, the DfsBlockCache tries to smooth out
  29.  * these tiny reads into larger block-sized IO operations.
  30.  * <p>
  31.  * Whenever a cache miss occurs, loading is invoked by exactly one thread for
  32.  * the given <code>(DfsStreamKey,position)</code> key tuple. This is ensured by
  33.  * an array of locks, with the tuple hashed to a lock instance.
  34.  * <p>
  35.  * Its too expensive during object access to be accurate with a least recently
  36.  * used (LRU) algorithm. Strictly ordering every read is a lot of overhead that
  37.  * typically doesn't yield a corresponding benefit to the application. This
  38.  * cache implements a clock replacement algorithm, giving each block one chance
  39.  * to have been accessed during a sweep of the cache to save itself from
  40.  * eviction.
  41.  * <p>
  42.  * Entities created by the cache are held under hard references, preventing the
  43.  * Java VM from clearing anything. Blocks are discarded by the replacement
  44.  * algorithm when adding a new block would cause the cache to exceed its
  45.  * configured maximum size.
  46.  * <p>
  47.  * The key tuple is passed through to methods as a pair of parameters rather
  48.  * than as a single Object, thus reducing the transient memory allocations of
  49.  * callers. It is more efficient to avoid the allocation, as we can't be 100%
  50.  * sure that a JIT would be able to stack-allocate a key tuple.
  51.  * <p>
  52.  * The internal hash table does not expand at runtime, instead it is fixed in
  53.  * size at cache creation time. The internal lock table used to gate load
  54.  * invocations is also fixed in size.
  55.  */
  56. public final class DfsBlockCache {
  57.     private static volatile DfsBlockCache cache;

  58.     static {
  59.         reconfigure(new DfsBlockCacheConfig());
  60.     }

  61.     /**
  62.      * Modify the configuration of the window cache.
  63.      * <p>
  64.      * The new configuration is applied immediately, and the existing cache is
  65.      * cleared.
  66.      *
  67.      * @param cfg
  68.      *            the new window cache configuration.
  69.      * @throws java.lang.IllegalArgumentException
  70.      *             the cache configuration contains one or more invalid
  71.      *             settings, usually too low of a limit.
  72.      */
  73.     public static void reconfigure(DfsBlockCacheConfig cfg) {
  74.         cache = new DfsBlockCache(cfg);
  75.     }

  76.     /**
  77.      * Get the currently active DfsBlockCache.
  78.      *
  79.      * @return the currently active DfsBlockCache.
  80.      */
  81.     public static DfsBlockCache getInstance() {
  82.         return cache;
  83.     }

  84.     /** Number of entries in {@link #table}. */
  85.     private final int tableSize;

  86.     /** Hash bucket directory; entries are chained below. */
  87.     private final AtomicReferenceArray<HashEntry> table;

  88.     /**
  89.      * Locks to prevent concurrent loads for same (PackFile,position) block. The
  90.      * number of locks is {@link DfsBlockCacheConfig#getConcurrencyLevel()} to
  91.      * cap the overall concurrent block loads.
  92.      */
  93.     private final ReentrantLock[] loadLocks;

  94.     /**
  95.      * A separate pool of locks to prevent concurrent loads for same index or bitmap from PackFile.
  96.      */
  97.     private final ReentrantLock[] refLocks;

  98.     /** Maximum number of bytes the cache should hold. */
  99.     private final long maxBytes;

  100.     /** Pack files smaller than this size can be copied through the cache. */
  101.     private final long maxStreamThroughCache;

  102.     /**
  103.      * Suggested block size to read from pack files in.
  104.      * <p>
  105.      * If a pack file does not have a native block size, this size will be used.
  106.      * <p>
  107.      * If a pack file has a native size, a whole multiple of the native size
  108.      * will be used until it matches this size.
  109.      * <p>
  110.      * The value for blockSize must be a power of 2.
  111.      */
  112.     private final int blockSize;

  113.     /** As {@link #blockSize} is a power of 2, bits to shift for a / blockSize. */
  114.     private final int blockSizeShift;

  115.     /**
  116.      * Number of times a block was found in the cache, per pack file extension.
  117.      */
  118.     private final AtomicReference<AtomicLong[]> statHit;

  119.     /**
  120.      * Number of times a block was not found, and had to be loaded, per pack
  121.      * file extension.
  122.      */
  123.     private final AtomicReference<AtomicLong[]> statMiss;

  124.     /**
  125.      * Number of blocks evicted due to cache being full, per pack file
  126.      * extension.
  127.      */
  128.     private final AtomicReference<AtomicLong[]> statEvict;

  129.     /**
  130.      * Number of bytes currently loaded in the cache, per pack file extension.
  131.      */
  132.     private final AtomicReference<AtomicLong[]> liveBytes;

  133.     /** Protects the clock and its related data. */
  134.     private final ReentrantLock clockLock;

  135.     /**
  136.      * A consumer of object reference lock wait time milliseconds.  May be used to build a metric.
  137.      */
  138.     private final Consumer<Long> refLockWaitTime;

  139.     /** Current position of the clock. */
  140.     private Ref clockHand;

  141.     @SuppressWarnings("unchecked")
  142.     private DfsBlockCache(DfsBlockCacheConfig cfg) {
  143.         tableSize = tableSize(cfg);
  144.         if (tableSize < 1) {
  145.             throw new IllegalArgumentException(JGitText.get().tSizeMustBeGreaterOrEqual1);
  146.         }

  147.         table = new AtomicReferenceArray<>(tableSize);
  148.         loadLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
  149.         for (int i = 0; i < loadLocks.length; i++) {
  150.             loadLocks[i] = new ReentrantLock(true /* fair */);
  151.         }
  152.         refLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
  153.         for (int i = 0; i < refLocks.length; i++) {
  154.             refLocks[i] = new ReentrantLock(true /* fair */);
  155.         }

  156.         maxBytes = cfg.getBlockLimit();
  157.         maxStreamThroughCache = (long) (maxBytes * cfg.getStreamRatio());
  158.         blockSize = cfg.getBlockSize();
  159.         blockSizeShift = Integer.numberOfTrailingZeros(blockSize);

  160.         clockLock = new ReentrantLock(true /* fair */);
  161.         String none = ""; //$NON-NLS-1$
  162.         clockHand = new Ref<>(
  163.                 DfsStreamKey.of(new DfsRepositoryDescription(none), none, null),
  164.                 -1, 0, null);
  165.         clockHand.next = clockHand;

  166.         statHit = new AtomicReference<>(newCounters());
  167.         statMiss = new AtomicReference<>(newCounters());
  168.         statEvict = new AtomicReference<>(newCounters());
  169.         liveBytes = new AtomicReference<>(newCounters());

  170.         refLockWaitTime = cfg.getRefLockWaitTimeConsumer();
  171.     }

  172.     boolean shouldCopyThroughCache(long length) {
  173.         return length <= maxStreamThroughCache;
  174.     }

  175.     /**
  176.      * Get total number of bytes in the cache, per pack file extension.
  177.      *
  178.      * @return total number of bytes in the cache, per pack file extension.
  179.      */
  180.     public long[] getCurrentSize() {
  181.         return getStatVals(liveBytes);
  182.     }

  183.     /**
  184.      * Get 0..100, defining how full the cache is.
  185.      *
  186.      * @return 0..100, defining how full the cache is.
  187.      */
  188.     public long getFillPercentage() {
  189.         return LongStream.of(getCurrentSize()).sum() * 100 / maxBytes;
  190.     }

  191.     /**
  192.      * Get number of requests for items in the cache, per pack file extension.
  193.      *
  194.      * @return number of requests for items in the cache, per pack file
  195.      *         extension.
  196.      */
  197.     public long[] getHitCount() {
  198.         return getStatVals(statHit);
  199.     }

  200.     /**
  201.      * Get number of requests for items not in the cache, per pack file
  202.      * extension.
  203.      *
  204.      * @return number of requests for items not in the cache, per pack file
  205.      *         extension.
  206.      */
  207.     public long[] getMissCount() {
  208.         return getStatVals(statMiss);
  209.     }

  210.     /**
  211.      * Get total number of requests (hit + miss), per pack file extension.
  212.      *
  213.      * @return total number of requests (hit + miss), per pack file extension.
  214.      */
  215.     public long[] getTotalRequestCount() {
  216.         AtomicLong[] hit = statHit.get();
  217.         AtomicLong[] miss = statMiss.get();
  218.         long[] cnt = new long[Math.max(hit.length, miss.length)];
  219.         for (int i = 0; i < hit.length; i++) {
  220.             cnt[i] += hit[i].get();
  221.         }
  222.         for (int i = 0; i < miss.length; i++) {
  223.             cnt[i] += miss[i].get();
  224.         }
  225.         return cnt;
  226.     }

  227.     /**
  228.      * Get hit ratios
  229.      *
  230.      * @return hit ratios
  231.      */
  232.     public long[] getHitRatio() {
  233.         AtomicLong[] hit = statHit.get();
  234.         AtomicLong[] miss = statMiss.get();
  235.         long[] ratio = new long[Math.max(hit.length, miss.length)];
  236.         for (int i = 0; i < ratio.length; i++) {
  237.             if (i >= hit.length) {
  238.                 ratio[i] = 0;
  239.             } else if (i >= miss.length) {
  240.                 ratio[i] = 100;
  241.             } else {
  242.                 long hitVal = hit[i].get();
  243.                 long missVal = miss[i].get();
  244.                 long total = hitVal + missVal;
  245.                 ratio[i] = total == 0 ? 0 : hitVal * 100 / total;
  246.             }
  247.         }
  248.         return ratio;
  249.     }

  250.     /**
  251.      * Get number of evictions performed due to cache being full, per pack file
  252.      * extension.
  253.      *
  254.      * @return number of evictions performed due to cache being full, per pack
  255.      *         file extension.
  256.      */
  257.     public long[] getEvictions() {
  258.         return getStatVals(statEvict);
  259.     }

  260.     /**
  261.      * Quickly check if the cache contains block 0 of the given stream.
  262.      * <p>
  263.      * This can be useful for sophisticated pre-read algorithms to quickly
  264.      * determine if a file is likely already in cache, especially small
  265.      * reftables which may be smaller than a typical DFS block size.
  266.      *
  267.      * @param key
  268.      *            the file to check.
  269.      * @return true if block 0 (the first block) is in the cache.
  270.      */
  271.     public boolean hasBlock0(DfsStreamKey key) {
  272.         HashEntry e1 = table.get(slot(key, 0));
  273.         DfsBlock v = scan(e1, key, 0);
  274.         return v != null && v.contains(key, 0);
  275.     }

  276.     private int hash(int packHash, long off) {
  277.         return packHash + (int) (off >>> blockSizeShift);
  278.     }

  279.     int getBlockSize() {
  280.         return blockSize;
  281.     }

  282.     private static int tableSize(DfsBlockCacheConfig cfg) {
  283.         final int wsz = cfg.getBlockSize();
  284.         final long limit = cfg.getBlockLimit();
  285.         if (wsz <= 0) {
  286.             throw new IllegalArgumentException(JGitText.get().invalidWindowSize);
  287.         }
  288.         if (limit < wsz) {
  289.             throw new IllegalArgumentException(JGitText.get().windowSizeMustBeLesserThanLimit);
  290.         }
  291.         return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE);
  292.     }

  293.     /**
  294.      * Look up a cached object, creating and loading it if it doesn't exist.
  295.      *
  296.      * @param file
  297.      *            the pack that "contains" the cached object.
  298.      * @param position
  299.      *            offset within <code>pack</code> of the object.
  300.      * @param ctx
  301.      *            current thread's reader.
  302.      * @param fileChannel
  303.      *            supplier for channel to read {@code pack}.
  304.      * @return the object reference.
  305.      * @throws IOException
  306.      *             the reference was not in the cache and could not be loaded.
  307.      */
  308.     DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
  309.             ReadableChannelSupplier fileChannel) throws IOException {
  310.         final long requestedPosition = position;
  311.         position = file.alignToBlock(position);

  312.         DfsStreamKey key = file.key;
  313.         int slot = slot(key, position);
  314.         HashEntry e1 = table.get(slot);
  315.         DfsBlock v = scan(e1, key, position);
  316.         if (v != null && v.contains(key, requestedPosition)) {
  317.             ctx.stats.blockCacheHit++;
  318.             getStat(statHit, key).incrementAndGet();
  319.             return v;
  320.         }

  321.         reserveSpace(blockSize, key);
  322.         ReentrantLock regionLock = lockFor(key, position);
  323.         regionLock.lock();
  324.         try {
  325.             HashEntry e2 = table.get(slot);
  326.             if (e2 != e1) {
  327.                 v = scan(e2, key, position);
  328.                 if (v != null) {
  329.                     ctx.stats.blockCacheHit++;
  330.                     getStat(statHit, key).incrementAndGet();
  331.                     creditSpace(blockSize, key);
  332.                     return v;
  333.                 }
  334.             }

  335.             getStat(statMiss, key).incrementAndGet();
  336.             boolean credit = true;
  337.             try {
  338.                 v = file.readOneBlock(position, ctx, fileChannel.get());
  339.                 credit = false;
  340.             } finally {
  341.                 if (credit) {
  342.                     creditSpace(blockSize, key);
  343.                 }
  344.             }
  345.             if (position != v.start) {
  346.                 // The file discovered its blockSize and adjusted.
  347.                 position = v.start;
  348.                 slot = slot(key, position);
  349.                 e2 = table.get(slot);
  350.             }

  351.             Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v);
  352.             ref.hot = true;
  353.             for (;;) {
  354.                 HashEntry n = new HashEntry(clean(e2), ref);
  355.                 if (table.compareAndSet(slot, e2, n)) {
  356.                     break;
  357.                 }
  358.                 e2 = table.get(slot);
  359.             }
  360.             addToClock(ref, blockSize - v.size());
  361.         } finally {
  362.             regionLock.unlock();
  363.         }

  364.         // If the block size changed from the default, it is possible the block
  365.         // that was loaded is the wrong block for the requested position.
  366.         if (v.contains(file.key, requestedPosition)) {
  367.             return v;
  368.         }
  369.         return getOrLoad(file, requestedPosition, ctx, fileChannel);
  370.     }

  371.     @SuppressWarnings("unchecked")
  372.     private void reserveSpace(long reserve, DfsStreamKey key) {
  373.         clockLock.lock();
  374.         try {
  375.             long live = LongStream.of(getCurrentSize()).sum() + reserve;
  376.             if (maxBytes < live) {
  377.                 Ref prev = clockHand;
  378.                 Ref hand = clockHand.next;
  379.                 do {
  380.                     if (hand.hot) {
  381.                         // Value was recently touched. Clear
  382.                         // hot and give it another chance.
  383.                         hand.hot = false;
  384.                         prev = hand;
  385.                         hand = hand.next;
  386.                         continue;
  387.                     } else if (prev == hand)
  388.                         break;

  389.                     // No recent access since last scan, kill
  390.                     // value and remove from clock.
  391.                     Ref dead = hand;
  392.                     hand = hand.next;
  393.                     prev.next = hand;
  394.                     dead.next = null;
  395.                     dead.value = null;
  396.                     live -= dead.size;
  397.                     getStat(liveBytes, dead.key).addAndGet(-dead.size);
  398.                     getStat(statEvict, dead.key).incrementAndGet();
  399.                 } while (maxBytes < live);
  400.                 clockHand = prev;
  401.             }
  402.             getStat(liveBytes, key).addAndGet(reserve);
  403.         } finally {
  404.             clockLock.unlock();
  405.         }
  406.     }

  407.     private void creditSpace(long credit, DfsStreamKey key) {
  408.         clockLock.lock();
  409.         try {
  410.             getStat(liveBytes, key).addAndGet(-credit);
  411.         } finally {
  412.             clockLock.unlock();
  413.         }
  414.     }

  415.     @SuppressWarnings("unchecked")
  416.     private void addToClock(Ref ref, long credit) {
  417.         clockLock.lock();
  418.         try {
  419.             if (credit != 0) {
  420.                 getStat(liveBytes, ref.key).addAndGet(-credit);
  421.             }
  422.             Ref ptr = clockHand;
  423.             ref.next = ptr.next;
  424.             ptr.next = ref;
  425.             clockHand = ref;
  426.         } finally {
  427.             clockLock.unlock();
  428.         }
  429.     }

  430.     void put(DfsBlock v) {
  431.         put(v.stream, v.start, v.size(), v);
  432.     }

  433.     /**
  434.      * Look up a cached object, creating and loading it if it doesn't exist.
  435.      *
  436.      * @param key
  437.      *            the stream key of the pack.
  438.      * @param position
  439.      *            the position in the key. The default should be 0.
  440.      * @param loader
  441.      *            the function to load the reference.
  442.      * @return the object reference.
  443.      * @throws IOException
  444.      *             the reference was not in the cache and could not be loaded.
  445.      */
  446.     <T> Ref<T> getOrLoadRef(
  447.             DfsStreamKey key, long position, RefLoader<T> loader)
  448.             throws IOException {
  449.         int slot = slot(key, position);
  450.         HashEntry e1 = table.get(slot);
  451.         Ref<T> ref = scanRef(e1, key, position);
  452.         if (ref != null) {
  453.             getStat(statHit, key).incrementAndGet();
  454.             return ref;
  455.         }

  456.         ReentrantLock regionLock = lockForRef(key);
  457.         long lockStart = System.currentTimeMillis();
  458.         regionLock.lock();
  459.         try {
  460.             HashEntry e2 = table.get(slot);
  461.             if (e2 != e1) {
  462.                 ref = scanRef(e2, key, position);
  463.                 if (ref != null) {
  464.                     getStat(statHit, key).incrementAndGet();
  465.                     return ref;
  466.                 }
  467.             }

  468.             if (refLockWaitTime != null) {
  469.                 refLockWaitTime.accept(
  470.                         Long.valueOf(System.currentTimeMillis() - lockStart));
  471.             }
  472.             getStat(statMiss, key).incrementAndGet();
  473.             ref = loader.load();
  474.             ref.hot = true;
  475.             // Reserve after loading to get the size of the object
  476.             reserveSpace(ref.size, key);
  477.             for (;;) {
  478.                 HashEntry n = new HashEntry(clean(e2), ref);
  479.                 if (table.compareAndSet(slot, e2, n)) {
  480.                     break;
  481.                 }
  482.                 e2 = table.get(slot);
  483.             }
  484.             addToClock(ref, 0);
  485.         } finally {
  486.             regionLock.unlock();
  487.         }
  488.         return ref;
  489.     }

  490.     <T> Ref<T> putRef(DfsStreamKey key, long size, T v) {
  491.         return put(key, 0, size, v);
  492.     }

  493.     <T> Ref<T> put(DfsStreamKey key, long pos, long size, T v) {
  494.         int slot = slot(key, pos);
  495.         HashEntry e1 = table.get(slot);
  496.         Ref<T> ref = scanRef(e1, key, pos);
  497.         if (ref != null) {
  498.             return ref;
  499.         }

  500.         reserveSpace(size, key);
  501.         ReentrantLock regionLock = lockFor(key, pos);
  502.         regionLock.lock();
  503.         try {
  504.             HashEntry e2 = table.get(slot);
  505.             if (e2 != e1) {
  506.                 ref = scanRef(e2, key, pos);
  507.                 if (ref != null) {
  508.                     creditSpace(size, key);
  509.                     return ref;
  510.                 }
  511.             }

  512.             ref = new Ref<>(key, pos, size, v);
  513.             ref.hot = true;
  514.             for (;;) {
  515.                 HashEntry n = new HashEntry(clean(e2), ref);
  516.                 if (table.compareAndSet(slot, e2, n)) {
  517.                     break;
  518.                 }
  519.                 e2 = table.get(slot);
  520.             }
  521.             addToClock(ref, 0);
  522.         } finally {
  523.             regionLock.unlock();
  524.         }
  525.         return ref;
  526.     }

  527.     boolean contains(DfsStreamKey key, long position) {
  528.         return scan(table.get(slot(key, position)), key, position) != null;
  529.     }

  530.     @SuppressWarnings("unchecked")
  531.     <T> T get(DfsStreamKey key, long position) {
  532.         T val = (T) scan(table.get(slot(key, position)), key, position);
  533.         if (val == null) {
  534.             getStat(statMiss, key).incrementAndGet();
  535.         } else {
  536.             getStat(statHit, key).incrementAndGet();
  537.         }
  538.         return val;
  539.     }

  540.     private <T> T scan(HashEntry n, DfsStreamKey key, long position) {
  541.         Ref<T> r = scanRef(n, key, position);
  542.         return r != null ? r.get() : null;
  543.     }

  544.     @SuppressWarnings("unchecked")
  545.     private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) {
  546.         for (; n != null; n = n.next) {
  547.             Ref<T> r = n.ref;
  548.             if (r.position == position && r.key.equals(key)) {
  549.                 return r.get() != null ? r : null;
  550.             }
  551.         }
  552.         return null;
  553.     }

  554.     private int slot(DfsStreamKey key, long position) {
  555.         return (hash(key.hash, position) >>> 1) % tableSize;
  556.     }

  557.     private ReentrantLock lockFor(DfsStreamKey key, long position) {
  558.         return loadLocks[(hash(key.hash, position) >>> 1) % loadLocks.length];
  559.     }

  560.     private ReentrantLock lockForRef(DfsStreamKey key) {
  561.         return refLocks[(key.hash >>> 1) % refLocks.length];
  562.     }

  563.     private static AtomicLong[] newCounters() {
  564.         AtomicLong[] ret = new AtomicLong[PackExt.values().length];
  565.         for (int i = 0; i < ret.length; i++) {
  566.             ret[i] = new AtomicLong();
  567.         }
  568.         return ret;
  569.     }

  570.     private static AtomicLong getStat(AtomicReference<AtomicLong[]> stats,
  571.             DfsStreamKey key) {
  572.         int pos = key.packExtPos;
  573.         while (true) {
  574.             AtomicLong[] vals = stats.get();
  575.             if (pos < vals.length) {
  576.                 return vals[pos];
  577.             }
  578.             AtomicLong[] expect = vals;
  579.             vals = new AtomicLong[Math.max(pos + 1, PackExt.values().length)];
  580.             System.arraycopy(expect, 0, vals, 0, expect.length);
  581.             for (int i = expect.length; i < vals.length; i++) {
  582.                 vals[i] = new AtomicLong();
  583.             }
  584.             if (stats.compareAndSet(expect, vals)) {
  585.                 return vals[pos];
  586.             }
  587.         }
  588.     }

  589.     private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) {
  590.         AtomicLong[] stats = stat.get();
  591.         long[] cnt = new long[stats.length];
  592.         for (int i = 0; i < stats.length; i++) {
  593.             cnt[i] = stats[i].get();
  594.         }
  595.         return cnt;
  596.     }

  597.     private static HashEntry clean(HashEntry top) {
  598.         while (top != null && top.ref.next == null)
  599.             top = top.next;
  600.         if (top == null) {
  601.             return null;
  602.         }
  603.         HashEntry n = clean(top.next);
  604.         return n == top.next ? top : new HashEntry(n, top.ref);
  605.     }

  606.     private static final class HashEntry {
  607.         /** Next entry in the hash table's chain list. */
  608.         final HashEntry next;

  609.         /** The referenced object. */
  610.         final Ref ref;

  611.         HashEntry(HashEntry n, Ref r) {
  612.             next = n;
  613.             ref = r;
  614.         }
  615.     }

  616.     static final class Ref<T> {
  617.         final DfsStreamKey key;
  618.         final long position;
  619.         final long size;
  620.         volatile T value;
  621.         Ref next;
  622.         volatile boolean hot;

  623.         Ref(DfsStreamKey key, long position, long size, T v) {
  624.             this.key = key;
  625.             this.position = position;
  626.             this.size = size;
  627.             this.value = v;
  628.         }

  629.         T get() {
  630.             T v = value;
  631.             if (v != null) {
  632.                 hot = true;
  633.             }
  634.             return v;
  635.         }

  636.         boolean has() {
  637.             return value != null;
  638.         }
  639.     }

  640.     @FunctionalInterface
  641.     interface RefLoader<T> {
  642.         Ref<T> load() throws IOException;
  643.     }

  644.     /**
  645.      * Supplier for readable channel
  646.      */
  647.     @FunctionalInterface
  648.     interface ReadableChannelSupplier {
  649.         /**
  650.          * @return ReadableChannel
  651.          * @throws IOException
  652.          */
  653.         ReadableChannel get() throws IOException;
  654.     }
  655. }