View Javadoc
1   /*
2    * Copyright (C) 2011, Google Inc. and others
3    *
4    * This program and the accompanying materials are made available under the
5    * terms of the Eclipse Distribution License v. 1.0 which is available at
6    * https://www.eclipse.org/org/documents/edl-v10.php.
7    *
8    * SPDX-License-Identifier: BSD-3-Clause
9    */
10  
11  package org.eclipse.jgit.internal.storage.dfs;
12  
13  import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
14  
15  import java.io.EOFException;
16  import java.io.IOException;
17  import java.io.InputStream;
18  import java.nio.ByteBuffer;
19  import java.security.MessageDigest;
20  import java.util.Collections;
21  import java.util.List;
22  import java.util.zip.CRC32;
23  import java.util.zip.Deflater;
24  
25  import org.eclipse.jgit.internal.storage.file.PackIndex;
26  import org.eclipse.jgit.lib.AnyObjectId;
27  import org.eclipse.jgit.lib.Constants;
28  import org.eclipse.jgit.lib.ProgressMonitor;
29  import org.eclipse.jgit.transport.PackLock;
30  import org.eclipse.jgit.transport.PackParser;
31  import org.eclipse.jgit.transport.PackedObjectInfo;
32  
33  /**
34   * Parses a pack stream into the DFS, by creating a new pack and index.
35   */
36  public class DfsPackParser extends PackParser {
37  	private final DfsObjDatabase objdb;
38  
39  	private final DfsInserter objins;
40  
41  	/** CRC-32 computation for objects that are appended onto the pack. */
42  	private final CRC32 crc;
43  
44  	/** Running SHA-1 of the entire pack stream. */
45  	private final MessageDigest packDigest;
46  
47  	/** Block size to use when caching data for read back. */
48  	private int blockSize;
49  
50  	/** Current end of the pack file. */
51  	private long packEnd;
52  
53  	/** Checksum of the entire pack file. */
54  	private byte[] packHash;
55  
56  	/** Compresses delta bases when completing a thin pack. */
57  	private Deflater def;
58  
59  	/** True if the pack is an empty pack. */
60  	private boolean isEmptyPack;
61  
62  	/** Name of the pack file, computed in {@link #onPackHeader(long)}. */
63  	private DfsPackDescription packDsc;
64  
65  	/** Key used during delta resolution reading delta chains. */
66  	private DfsStreamKey packKey;
67  
68  	/** If the index was small enough, the entire index after writing. */
69  	private PackIndex packIndex;
70  
71  	/** Stream to the DFS storage, opened during {@link #onPackHeader(long)}. */
72  	private DfsOutputStream out;
73  
74  	/** Data being written that has not yet been cached. */
75  	private byte[] currBuf;
76  	private long currPos; // Position of currBuf in the file.
77  	private int currEnd; // Position inside of currBuf to append to next.
78  
79  	/** Cache the chunks were stored into or get read back from. */
80  	private DfsBlockCache blockCache;
81  
82  	/** Cached block that is being read. */
83  	private long readPos;
84  	private DfsBlock readBlock;
85  
86  	/**
87  	 * Initialize a new pack parser.
88  	 *
89  	 * @param db
90  	 *            database the objects will be imported into.
91  	 * @param ins
92  	 *            inserter the parser will use to help it inject the objects.
93  	 * @param in
94  	 *            the stream to parse.
95  	 */
96  	protected DfsPackParser(DfsObjDatabase db, DfsInserter ins, InputStream in) {
97  		super(db, in);
98  		this.objdb = db;
99  		this.objins = ins;
100 		this.crc = new CRC32();
101 		this.packDigest = Constants.newMessageDigest();
102 	}
103 
104 	/** {@inheritDoc} */
105 	@Override
106 	public PackLock parse(ProgressMonitor receiving, ProgressMonitor resolving)
107 			throws IOException {
108 		boolean rollback = true;
109 		try {
110 			blockCache = DfsBlockCache.getInstance();
111 			super.parse(receiving, resolving);
112 			if (isEmptyPack)
113 				return null;
114 			buffer(packHash, 0, packHash.length);
115 			if (currEnd != 0)
116 				flushBlock();
117 			out.close();
118 			out = null;
119 			currBuf = null;
120 			readBlock = null;
121 			packDsc.addFileExt(PACK);
122 			packDsc.setFileSize(PACK, packEnd);
123 			packDsc.setBlockSize(PACK, blockSize);
124 
125 			writePackIndex();
126 			objdb.commitPack(Collections.singletonList(packDsc), null);
127 			rollback = false;
128 
129 			DfsPackFile p = new DfsPackFile(blockCache, packDsc);
130 			p.setBlockSize(blockSize);
131 			if (packIndex != null)
132 				p.setPackIndex(packIndex);
133 
134 			objdb.addPack(p);
135 
136 			return null;
137 		} finally {
138 			blockCache = null;
139 			currBuf = null;
140 			readBlock = null;
141 
142 			if (def != null) {
143 				def.end();
144 				def = null;
145 			}
146 
147 			if (out != null) {
148 				try {
149 					out.close();
150 				} catch (IOException err) {
151 					// Ignore a close error, rollbackPack is also important.
152 				}
153 				out = null;
154 			}
155 
156 			if (rollback && packDsc != null) {
157 				try {
158 					objdb.rollbackPack(Collections.singletonList(packDsc));
159 				} finally {
160 					packDsc = null;
161 				}
162 			}
163 		}
164 	}
165 
166 	/**
167 	 * Get description of the imported pack, if one was made.
168 	 *
169 	 * @return description of the imported pack, if one was made.
170 	 */
171 	public DfsPackDescription getPackDescription() {
172 		return packDsc;
173 	}
174 
175 	/** {@inheritDoc} */
176 	@Override
177 	protected void onPackHeader(long objectCount) throws IOException {
178 		if (objectCount == 0) {
179 			isEmptyPack = true;
180 			currBuf = new byte[256];
181 			return;
182 		}
183 
184 		packDsc = objdb.newPack(DfsObjDatabase.PackSource.RECEIVE);
185 		out = objdb.writeFile(packDsc, PACK);
186 		packKey = packDsc.getStreamKey(PACK);
187 
188 		int size = out.blockSize();
189 		if (size <= 0)
190 			size = blockCache.getBlockSize();
191 		else if (size < blockCache.getBlockSize())
192 			size = (blockCache.getBlockSize() / size) * size;
193 		blockSize = size;
194 		currBuf = new byte[blockSize];
195 	}
196 
197 	/** {@inheritDoc} */
198 	@Override
199 	protected void onBeginWholeObject(long streamPosition, int type,
200 			long inflatedSize) throws IOException {
201 		crc.reset();
202 	}
203 
204 	/** {@inheritDoc} */
205 	@Override
206 	protected void onEndWholeObject(PackedObjectInfo info) throws IOException {
207 		info.setCRC((int) crc.getValue());
208 	}
209 
210 	/** {@inheritDoc} */
211 	@Override
212 	protected void onBeginOfsDelta(long streamPosition,
213 			long baseStreamPosition, long inflatedSize) throws IOException {
214 		crc.reset();
215 	}
216 
217 	/** {@inheritDoc} */
218 	@Override
219 	protected void onBeginRefDelta(long streamPosition, AnyObjectId baseId,
220 			long inflatedSize) throws IOException {
221 		crc.reset();
222 	}
223 
224 	/** {@inheritDoc} */
225 	@Override
226 	protected UnresolvedDelta onEndDelta() throws IOException {
227 		UnresolvedDelta delta = new UnresolvedDelta();
228 		delta.setCRC((int) crc.getValue());
229 		return delta;
230 	}
231 
232 	/** {@inheritDoc} */
233 	@Override
234 	protected void onInflatedObjectData(PackedObjectInfo obj, int typeCode,
235 			byte[] data) throws IOException {
236 		// DfsPackParser ignores this event.
237 	}
238 
239 	/** {@inheritDoc} */
240 	@Override
241 	protected void onObjectHeader(Source src, byte[] raw, int pos, int len)
242 			throws IOException {
243 		crc.update(raw, pos, len);
244 	}
245 
246 	/** {@inheritDoc} */
247 	@Override
248 	protected void onObjectData(Source src, byte[] raw, int pos, int len)
249 			throws IOException {
250 		crc.update(raw, pos, len);
251 	}
252 
253 	/** {@inheritDoc} */
254 	@Override
255 	protected void onStoreStream(byte[] raw, int pos, int len)
256 			throws IOException {
257 		buffer(raw, pos, len);
258 		packDigest.update(raw, pos, len);
259 	}
260 
261 	private void buffer(byte[] raw, int pos, int len) throws IOException {
262 		while (0 < len) {
263 			int n = Math.min(len, currBuf.length - currEnd);
264 			if (n == 0) {
265 				DfsBlock v = flushBlock();
266 				currBuf = new byte[blockSize];
267 				currEnd = 0;
268 				currPos += v.size();
269 				continue;
270 			}
271 
272 			System.arraycopy(raw, pos, currBuf, currEnd, n);
273 			pos += n;
274 			len -= n;
275 			currEnd += n;
276 			packEnd += n;
277 		}
278 	}
279 
280 	private DfsBlock flushBlock() throws IOException {
281 		if (isEmptyPack)
282 			throw new IOException(DfsText.get().willNotStoreEmptyPack);
283 
284 		out.write(currBuf, 0, currEnd);
285 
286 		byte[] buf;
287 		if (currEnd == currBuf.length) {
288 			buf = currBuf;
289 		} else {
290 			buf = new byte[currEnd];
291 			System.arraycopy(currBuf, 0, buf, 0, currEnd);
292 		}
293 
294 		DfsBlock v = new DfsBlock(packKey, currPos, buf);
295 		readBlock = v;
296 		blockCache.put(v);
297 		return v;
298 	}
299 
300 	/** {@inheritDoc} */
301 	@Override
302 	protected void onPackFooter(byte[] hash) throws IOException {
303 		// The base class will validate the original hash matches
304 		// what the stream has stored at the end. We are called
305 		// only if the hash was good. Save it in case there are no
306 		// missing bases to append.
307 		packHash = hash;
308 	}
309 
310 	/** {@inheritDoc} */
311 	@Override
312 	protected ObjectTypeAndSize seekDatabase(PackedObjectInfo obj,
313 			ObjectTypeAndSize info) throws IOException {
314 		readPos = obj.getOffset();
315 		crc.reset();
316 		return readObjectHeader(info);
317 	}
318 
319 	/** {@inheritDoc} */
320 	@Override
321 	protected ObjectTypeAndSize seekDatabase(UnresolvedDelta delta,
322 			ObjectTypeAndSize info) throws IOException {
323 		readPos = delta.getOffset();
324 		crc.reset();
325 		return readObjectHeader(info);
326 	}
327 
328 	/** {@inheritDoc} */
329 	@Override
330 	protected int readDatabase(byte[] dst, int pos, int cnt) throws IOException {
331 		if (cnt == 0)
332 			return 0;
333 
334 		if (currPos <= readPos) {
335 			// Requested read is still buffered. Copy direct from buffer.
336 			int p = (int) (readPos - currPos);
337 			int n = Math.min(cnt, currEnd - p);
338 			if (n == 0)
339 				throw new EOFException();
340 			System.arraycopy(currBuf, p, dst, pos, n);
341 			readPos += n;
342 			return n;
343 		}
344 
345 		if (readBlock == null || !readBlock.contains(packKey, readPos)) {
346 			long start = toBlockStart(readPos);
347 			readBlock = blockCache.get(packKey, start);
348 			if (readBlock == null) {
349 				int size = (int) Math.min(blockSize, packEnd - start);
350 				byte[] buf = new byte[size];
351 				if (read(start, buf, 0, size) != size)
352 					throw new EOFException();
353 				readBlock = new DfsBlock(packKey, start, buf);
354 				blockCache.put(readBlock);
355 			}
356 		}
357 
358 		int n = readBlock.copy(readPos, dst, pos, cnt);
359 		readPos += n;
360 		return n;
361 	}
362 
363 	private int read(long pos, byte[] dst, int off, int len) throws IOException {
364 		if (len == 0)
365 			return 0;
366 
367 		int cnt = 0;
368 		while (0 < len) {
369 			int r = out.read(pos, ByteBuffer.wrap(dst, off, len));
370 			if (r <= 0)
371 				break;
372 			pos += r;
373 			off += r;
374 			len -= r;
375 			cnt += r;
376 		}
377 		return cnt != 0 ? cnt : -1;
378 	}
379 
380 	private long toBlockStart(long pos) {
381 		return (pos / blockSize) * blockSize;
382 	}
383 
384 	/** {@inheritDoc} */
385 	@Override
386 	protected boolean checkCRC(int oldCRC) {
387 		return oldCRC == (int) crc.getValue();
388 	}
389 
390 	/** {@inheritDoc} */
391 	@Override
392 	protected boolean onAppendBase(final int typeCode, final byte[] data,
393 			final PackedObjectInfo info) throws IOException {
394 		info.setOffset(packEnd);
395 
396 		final byte[] buf = buffer();
397 		int sz = data.length;
398 		int len = 0;
399 		buf[len++] = (byte) ((typeCode << 4) | (sz & 15));
400 		sz >>>= 4;
401 		while (sz > 0) {
402 			buf[len - 1] |= (byte) 0x80;
403 			buf[len++] = (byte) (sz & 0x7f);
404 			sz >>>= 7;
405 		}
406 
407 		packDigest.update(buf, 0, len);
408 		crc.reset();
409 		crc.update(buf, 0, len);
410 		buffer(buf, 0, len);
411 
412 		if (def == null)
413 			def = new Deflater(Deflater.DEFAULT_COMPRESSION, false);
414 		else
415 			def.reset();
416 		def.setInput(data);
417 		def.finish();
418 
419 		while (!def.finished()) {
420 			len = def.deflate(buf);
421 			packDigest.update(buf, 0, len);
422 			crc.update(buf, 0, len);
423 			buffer(buf, 0, len);
424 		}
425 
426 		info.setCRC((int) crc.getValue());
427 		return true;
428 	}
429 
430 	/** {@inheritDoc} */
431 	@Override
432 	protected void onEndThinPack() throws IOException {
433 		// Normally when a thin pack is closed the pack header gets
434 		// updated to reflect the actual object count. This is not going
435 		// to be possible on most DFS backends, so instead we allow
436 		// the header to have an incorrect count, but we do change the
437 		// trailing digest to be correct.
438 		packHash = packDigest.digest();
439 	}
440 
441 	private void writePackIndex() throws IOException {
442 		List<PackedObjectInfo> list = getSortedObjectList(null /* by ObjectId */);
443 		packIndex = objins.writePackIndex(packDsc, packHash, list);
444 	}
445 }