DfsPackParser.java

  1. /*
  2.  * Copyright (C) 2011, Google Inc. and others
  3.  *
  4.  * This program and the accompanying materials are made available under the
  5.  * terms of the Eclipse Distribution License v. 1.0 which is available at
  6.  * https://www.eclipse.org/org/documents/edl-v10.php.
  7.  *
  8.  * SPDX-License-Identifier: BSD-3-Clause
  9.  */

  10. package org.eclipse.jgit.internal.storage.dfs;

  11. import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;

  12. import java.io.EOFException;
  13. import java.io.IOException;
  14. import java.io.InputStream;
  15. import java.nio.ByteBuffer;
  16. import java.security.MessageDigest;
  17. import java.util.Collections;
  18. import java.util.List;
  19. import java.util.zip.CRC32;
  20. import java.util.zip.Deflater;

  21. import org.eclipse.jgit.internal.storage.file.PackIndex;
  22. import org.eclipse.jgit.internal.storage.file.PackLock;
  23. import org.eclipse.jgit.lib.AnyObjectId;
  24. import org.eclipse.jgit.lib.Constants;
  25. import org.eclipse.jgit.lib.ProgressMonitor;
  26. import org.eclipse.jgit.transport.PackParser;
  27. import org.eclipse.jgit.transport.PackedObjectInfo;

  28. /**
  29.  * Parses a pack stream into the DFS, by creating a new pack and index.
  30.  */
  31. public class DfsPackParser extends PackParser {
  32.     private final DfsObjDatabase objdb;

  33.     private final DfsInserter objins;

  34.     /** CRC-32 computation for objects that are appended onto the pack. */
  35.     private final CRC32 crc;

  36.     /** Running SHA-1 of the entire pack stream. */
  37.     private final MessageDigest packDigest;

  38.     /** Block size to use when caching data for read back. */
  39.     private int blockSize;

  40.     /** Current end of the pack file. */
  41.     private long packEnd;

  42.     /** Checksum of the entire pack file. */
  43.     private byte[] packHash;

  44.     /** Compresses delta bases when completing a thin pack. */
  45.     private Deflater def;

  46.     /** True if the pack is an empty pack. */
  47.     private boolean isEmptyPack;

  48.     /** Name of the pack file, computed in {@link #onPackHeader(long)}. */
  49.     private DfsPackDescription packDsc;

  50.     /** Key used during delta resolution reading delta chains. */
  51.     private DfsStreamKey packKey;

  52.     /** If the index was small enough, the entire index after writing. */
  53.     private PackIndex packIndex;

  54.     /** Stream to the DFS storage, opened during {@link #onPackHeader(long)}. */
  55.     private DfsOutputStream out;

  56.     /** Data being written that has not yet been cached. */
  57.     private byte[] currBuf;
  58.     private long currPos; // Position of currBuf in the file.
  59.     private int currEnd; // Position inside of currBuf to append to next.

  60.     /** Cache the chunks were stored into or get read back from. */
  61.     private DfsBlockCache blockCache;

  62.     /** Cached block that is being read. */
  63.     private long readPos;
  64.     private DfsBlock readBlock;

  65.     /**
  66.      * Initialize a new pack parser.
  67.      *
  68.      * @param db
  69.      *            database the objects will be imported into.
  70.      * @param ins
  71.      *            inserter the parser will use to help it inject the objects.
  72.      * @param in
  73.      *            the stream to parse.
  74.      */
  75.     protected DfsPackParser(DfsObjDatabase db, DfsInserter ins, InputStream in) {
  76.         super(db, in);
  77.         this.objdb = db;
  78.         this.objins = ins;
  79.         this.crc = new CRC32();
  80.         this.packDigest = Constants.newMessageDigest();
  81.     }

  82.     /** {@inheritDoc} */
  83.     @Override
  84.     public PackLock parse(ProgressMonitor receiving, ProgressMonitor resolving)
  85.             throws IOException {
  86.         boolean rollback = true;
  87.         try {
  88.             blockCache = DfsBlockCache.getInstance();
  89.             super.parse(receiving, resolving);
  90.             if (isEmptyPack)
  91.                 return null;
  92.             buffer(packHash, 0, packHash.length);
  93.             if (currEnd != 0)
  94.                 flushBlock();
  95.             out.close();
  96.             out = null;
  97.             currBuf = null;
  98.             readBlock = null;
  99.             packDsc.addFileExt(PACK);
  100.             packDsc.setFileSize(PACK, packEnd);
  101.             packDsc.setBlockSize(PACK, blockSize);

  102.             writePackIndex();
  103.             objdb.commitPack(Collections.singletonList(packDsc), null);
  104.             rollback = false;

  105.             DfsPackFile p = new DfsPackFile(blockCache, packDsc);
  106.             p.setBlockSize(blockSize);
  107.             if (packIndex != null)
  108.                 p.setPackIndex(packIndex);

  109.             objdb.addPack(p);

  110.             return null;
  111.         } finally {
  112.             blockCache = null;
  113.             currBuf = null;
  114.             readBlock = null;

  115.             if (def != null) {
  116.                 def.end();
  117.                 def = null;
  118.             }

  119.             if (out != null) {
  120.                 try {
  121.                     out.close();
  122.                 } catch (IOException err) {
  123.                     // Ignore a close error, rollbackPack is also important.
  124.                 }
  125.                 out = null;
  126.             }

  127.             if (rollback && packDsc != null) {
  128.                 try {
  129.                     objdb.rollbackPack(Collections.singletonList(packDsc));
  130.                 } finally {
  131.                     packDsc = null;
  132.                 }
  133.             }
  134.         }
  135.     }

  136.     /**
  137.      * Get description of the imported pack, if one was made.
  138.      *
  139.      * @return description of the imported pack, if one was made.
  140.      */
  141.     public DfsPackDescription getPackDescription() {
  142.         return packDsc;
  143.     }

  144.     /** {@inheritDoc} */
  145.     @Override
  146.     protected void onPackHeader(long objectCount) throws IOException {
  147.         if (objectCount == 0) {
  148.             isEmptyPack = true;
  149.             currBuf = new byte[256];
  150.             return;
  151.         }

  152.         packDsc = objdb.newPack(DfsObjDatabase.PackSource.RECEIVE);
  153.         out = objdb.writeFile(packDsc, PACK);
  154.         packKey = packDsc.getStreamKey(PACK);

  155.         int size = out.blockSize();
  156.         if (size <= 0)
  157.             size = blockCache.getBlockSize();
  158.         else if (size < blockCache.getBlockSize())
  159.             size = (blockCache.getBlockSize() / size) * size;
  160.         blockSize = size;
  161.         currBuf = new byte[blockSize];
  162.     }

  163.     /** {@inheritDoc} */
  164.     @Override
  165.     protected void onBeginWholeObject(long streamPosition, int type,
  166.             long inflatedSize) throws IOException {
  167.         crc.reset();
  168.     }

  169.     /** {@inheritDoc} */
  170.     @Override
  171.     protected void onEndWholeObject(PackedObjectInfo info) throws IOException {
  172.         info.setCRC((int) crc.getValue());
  173.     }

  174.     /** {@inheritDoc} */
  175.     @Override
  176.     protected void onBeginOfsDelta(long streamPosition,
  177.             long baseStreamPosition, long inflatedSize) throws IOException {
  178.         crc.reset();
  179.     }

  180.     /** {@inheritDoc} */
  181.     @Override
  182.     protected void onBeginRefDelta(long streamPosition, AnyObjectId baseId,
  183.             long inflatedSize) throws IOException {
  184.         crc.reset();
  185.     }

  186.     /** {@inheritDoc} */
  187.     @Override
  188.     protected UnresolvedDelta onEndDelta() throws IOException {
  189.         UnresolvedDelta delta = new UnresolvedDelta();
  190.         delta.setCRC((int) crc.getValue());
  191.         return delta;
  192.     }

  193.     /** {@inheritDoc} */
  194.     @Override
  195.     protected void onInflatedObjectData(PackedObjectInfo obj, int typeCode,
  196.             byte[] data) throws IOException {
  197.         // DfsPackParser ignores this event.
  198.     }

  199.     /** {@inheritDoc} */
  200.     @Override
  201.     protected void onObjectHeader(Source src, byte[] raw, int pos, int len)
  202.             throws IOException {
  203.         crc.update(raw, pos, len);
  204.     }

  205.     /** {@inheritDoc} */
  206.     @Override
  207.     protected void onObjectData(Source src, byte[] raw, int pos, int len)
  208.             throws IOException {
  209.         crc.update(raw, pos, len);
  210.     }

  211.     /** {@inheritDoc} */
  212.     @Override
  213.     protected void onStoreStream(byte[] raw, int pos, int len)
  214.             throws IOException {
  215.         buffer(raw, pos, len);
  216.         packDigest.update(raw, pos, len);
  217.     }

  218.     private void buffer(byte[] raw, int pos, int len) throws IOException {
  219.         while (0 < len) {
  220.             int n = Math.min(len, currBuf.length - currEnd);
  221.             if (n == 0) {
  222.                 DfsBlock v = flushBlock();
  223.                 currBuf = new byte[blockSize];
  224.                 currEnd = 0;
  225.                 currPos += v.size();
  226.                 continue;
  227.             }

  228.             System.arraycopy(raw, pos, currBuf, currEnd, n);
  229.             pos += n;
  230.             len -= n;
  231.             currEnd += n;
  232.             packEnd += n;
  233.         }
  234.     }

  235.     private DfsBlock flushBlock() throws IOException {
  236.         if (isEmptyPack)
  237.             throw new IOException(DfsText.get().willNotStoreEmptyPack);

  238.         out.write(currBuf, 0, currEnd);

  239.         byte[] buf;
  240.         if (currEnd == currBuf.length) {
  241.             buf = currBuf;
  242.         } else {
  243.             buf = new byte[currEnd];
  244.             System.arraycopy(currBuf, 0, buf, 0, currEnd);
  245.         }

  246.         DfsBlock v = new DfsBlock(packKey, currPos, buf);
  247.         readBlock = v;
  248.         blockCache.put(v);
  249.         return v;
  250.     }

  251.     /** {@inheritDoc} */
  252.     @Override
  253.     protected void onPackFooter(byte[] hash) throws IOException {
  254.         // The base class will validate the original hash matches
  255.         // what the stream has stored at the end. We are called
  256.         // only if the hash was good. Save it in case there are no
  257.         // missing bases to append.
  258.         packHash = hash;
  259.     }

  260.     /** {@inheritDoc} */
  261.     @Override
  262.     protected ObjectTypeAndSize seekDatabase(PackedObjectInfo obj,
  263.             ObjectTypeAndSize info) throws IOException {
  264.         readPos = obj.getOffset();
  265.         crc.reset();
  266.         return readObjectHeader(info);
  267.     }

  268.     /** {@inheritDoc} */
  269.     @Override
  270.     protected ObjectTypeAndSize seekDatabase(UnresolvedDelta delta,
  271.             ObjectTypeAndSize info) throws IOException {
  272.         readPos = delta.getOffset();
  273.         crc.reset();
  274.         return readObjectHeader(info);
  275.     }

  276.     /** {@inheritDoc} */
  277.     @Override
  278.     protected int readDatabase(byte[] dst, int pos, int cnt) throws IOException {
  279.         if (cnt == 0)
  280.             return 0;

  281.         if (currPos <= readPos) {
  282.             // Requested read is still buffered. Copy direct from buffer.
  283.             int p = (int) (readPos - currPos);
  284.             int n = Math.min(cnt, currEnd - p);
  285.             if (n == 0)
  286.                 throw new EOFException();
  287.             System.arraycopy(currBuf, p, dst, pos, n);
  288.             readPos += n;
  289.             return n;
  290.         }

  291.         if (readBlock == null || !readBlock.contains(packKey, readPos)) {
  292.             long start = toBlockStart(readPos);
  293.             readBlock = blockCache.get(packKey, start);
  294.             if (readBlock == null) {
  295.                 int size = (int) Math.min(blockSize, packEnd - start);
  296.                 byte[] buf = new byte[size];
  297.                 if (read(start, buf, 0, size) != size)
  298.                     throw new EOFException();
  299.                 readBlock = new DfsBlock(packKey, start, buf);
  300.                 blockCache.put(readBlock);
  301.             }
  302.         }

  303.         int n = readBlock.copy(readPos, dst, pos, cnt);
  304.         readPos += n;
  305.         return n;
  306.     }

  307.     private int read(long pos, byte[] dst, int off, int len) throws IOException {
  308.         if (len == 0)
  309.             return 0;

  310.         int cnt = 0;
  311.         while (0 < len) {
  312.             int r = out.read(pos, ByteBuffer.wrap(dst, off, len));
  313.             if (r <= 0)
  314.                 break;
  315.             pos += r;
  316.             off += r;
  317.             len -= r;
  318.             cnt += r;
  319.         }
  320.         return cnt != 0 ? cnt : -1;
  321.     }

  322.     private long toBlockStart(long pos) {
  323.         return (pos / blockSize) * blockSize;
  324.     }

  325.     /** {@inheritDoc} */
  326.     @Override
  327.     protected boolean checkCRC(int oldCRC) {
  328.         return oldCRC == (int) crc.getValue();
  329.     }

  330.     /** {@inheritDoc} */
  331.     @Override
  332.     protected boolean onAppendBase(final int typeCode, final byte[] data,
  333.             final PackedObjectInfo info) throws IOException {
  334.         info.setOffset(packEnd);

  335.         final byte[] buf = buffer();
  336.         int sz = data.length;
  337.         int len = 0;
  338.         buf[len++] = (byte) ((typeCode << 4) | (sz & 15));
  339.         sz >>>= 4;
  340.         while (sz > 0) {
  341.             buf[len - 1] |= (byte) 0x80;
  342.             buf[len++] = (byte) (sz & 0x7f);
  343.             sz >>>= 7;
  344.         }

  345.         packDigest.update(buf, 0, len);
  346.         crc.reset();
  347.         crc.update(buf, 0, len);
  348.         buffer(buf, 0, len);

  349.         if (def == null)
  350.             def = new Deflater(Deflater.DEFAULT_COMPRESSION, false);
  351.         else
  352.             def.reset();
  353.         def.setInput(data);
  354.         def.finish();

  355.         while (!def.finished()) {
  356.             len = def.deflate(buf);
  357.             packDigest.update(buf, 0, len);
  358.             crc.update(buf, 0, len);
  359.             buffer(buf, 0, len);
  360.         }

  361.         info.setCRC((int) crc.getValue());
  362.         return true;
  363.     }

  364.     /** {@inheritDoc} */
  365.     @Override
  366.     protected void onEndThinPack() throws IOException {
  367.         // Normally when a thin pack is closed the pack header gets
  368.         // updated to reflect the actual object count. This is not going
  369.         // to be possible on most DFS backends, so instead we allow
  370.         // the header to have an incorrect count, but we do change the
  371.         // trailing digest to be correct.
  372.         packHash = packDigest.digest();
  373.     }

  374.     private void writePackIndex() throws IOException {
  375.         List<PackedObjectInfo> list = getSortedObjectList(null /* by ObjectId */);
  376.         packIndex = objins.writePackIndex(packDsc, packHash, list);
  377.     }
  378. }