View Javadoc
1   /*
2    * Copyright (C) 2011, Google Inc. and others
3    *
4    * This program and the accompanying materials are made available under the
5    * terms of the Eclipse Distribution License v. 1.0 which is available at
6    * https://www.eclipse.org/org/documents/edl-v10.php.
7    *
8    * SPDX-License-Identifier: BSD-3-Clause
9    */
10  
11  package org.eclipse.jgit.internal.storage.dfs;
12  
13  import static org.eclipse.jgit.internal.storage.pack.PackExt.INDEX;
14  import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
15  import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
16  import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;
17  
18  import java.io.BufferedInputStream;
19  import java.io.EOFException;
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.OutputStream;
23  import java.nio.ByteBuffer;
24  import java.security.MessageDigest;
25  import java.text.MessageFormat;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.List;
30  import java.util.Set;
31  import java.util.zip.CRC32;
32  import java.util.zip.DataFormatException;
33  import java.util.zip.Deflater;
34  import java.util.zip.DeflaterOutputStream;
35  import java.util.zip.Inflater;
36  import java.util.zip.InflaterInputStream;
37  
38  import org.eclipse.jgit.annotations.Nullable;
39  import org.eclipse.jgit.errors.CorruptObjectException;
40  import org.eclipse.jgit.errors.IncorrectObjectTypeException;
41  import org.eclipse.jgit.errors.LargeObjectException;
42  import org.eclipse.jgit.internal.JGitText;
43  import org.eclipse.jgit.internal.storage.file.PackIndex;
44  import org.eclipse.jgit.internal.storage.file.PackIndexWriter;
45  import org.eclipse.jgit.internal.storage.pack.PackExt;
46  import org.eclipse.jgit.lib.AbbreviatedObjectId;
47  import org.eclipse.jgit.lib.AnyObjectId;
48  import org.eclipse.jgit.lib.Constants;
49  import org.eclipse.jgit.lib.ObjectId;
50  import org.eclipse.jgit.lib.ObjectIdOwnerMap;
51  import org.eclipse.jgit.lib.ObjectInserter;
52  import org.eclipse.jgit.lib.ObjectLoader;
53  import org.eclipse.jgit.lib.ObjectReader;
54  import org.eclipse.jgit.lib.ObjectStream;
55  import org.eclipse.jgit.transport.PackedObjectInfo;
56  import org.eclipse.jgit.util.BlockList;
57  import org.eclipse.jgit.util.IO;
58  import org.eclipse.jgit.util.NB;
59  import org.eclipse.jgit.util.TemporaryBuffer;
60  import org.eclipse.jgit.util.io.CountingOutputStream;
61  import org.eclipse.jgit.util.sha1.SHA1;
62  
63  /**
64   * Inserts objects into the DFS.
65   */
66  public class DfsInserter extends ObjectInserter {
67  	/** Always produce version 2 indexes, to get CRC data. */
68  	private static final int INDEX_VERSION = 2;
69  
70  	final DfsObjDatabase db;
71  	int compression = Deflater.BEST_COMPRESSION;
72  
73  	List<PackedObjectInfo> objectList;
74  	ObjectIdOwnerMap<PackedObjectInfo> objectMap;
75  
76  	DfsBlockCache cache;
77  	DfsStreamKey packKey;
78  	DfsPackDescription packDsc;
79  	PackStream packOut;
80  	private boolean rollback;
81  	private boolean checkExisting = true;
82  
83  	/**
84  	 * Initialize a new inserter.
85  	 *
86  	 * @param db
87  	 *            database the inserter writes to.
88  	 */
89  	protected DfsInserter(DfsObjDatabase db) {
90  		this.db = db;
91  	}
92  
93  	/**
94  	 * Check existence
95  	 *
96  	 * @param check
97  	 *            if {@code false}, will write out possibly-duplicate objects
98  	 *            without first checking whether they exist in the repo; default
99  	 *            is true.
100 	 */
101 	public void checkExisting(boolean check) {
102 		checkExisting = check;
103 	}
104 
105 	void setCompressionLevel(int compression) {
106 		this.compression = compression;
107 	}
108 
109 	/** {@inheritDoc} */
110 	@Override
111 	public DfsPackParser newPackParser(InputStream in) throws IOException {
112 		return new DfsPackParser(db, this, in);
113 	}
114 
115 	/** {@inheritDoc} */
116 	@Override
117 	public ObjectReader newReader() {
118 		return new Reader();
119 	}
120 
121 	/** {@inheritDoc} */
122 	@Override
123 	public ObjectId insert(int type, byte[] data, int off, int len)
124 			throws IOException {
125 		ObjectId id = idFor(type, data, off, len);
126 		if (objectMap != null && objectMap.contains(id))
127 			return id;
128 		// Ignore unreachable (garbage) objects here.
129 		if (checkExisting && db.has(id, true))
130 			return id;
131 
132 		long offset = beginObject(type, len);
133 		packOut.compress.write(data, off, len);
134 		packOut.compress.finish();
135 		return endObject(id, offset);
136 	}
137 
138 	/** {@inheritDoc} */
139 	@Override
140 	public ObjectId insert(int type, long len, InputStream in)
141 			throws IOException {
142 		byte[] buf = insertBuffer(len);
143 		if (len <= buf.length) {
144 			IO.readFully(in, buf, 0, (int) len);
145 			return insert(type, buf, 0, (int) len);
146 		}
147 
148 		long offset = beginObject(type, len);
149 		SHA1 md = digest();
150 		md.update(Constants.encodedTypeString(type));
151 		md.update((byte) ' ');
152 		md.update(Constants.encodeASCII(len));
153 		md.update((byte) 0);
154 
155 		while (0 < len) {
156 			int n = in.read(buf, 0, (int) Math.min(buf.length, len));
157 			if (n <= 0)
158 				throw new EOFException();
159 			md.update(buf, 0, n);
160 			packOut.compress.write(buf, 0, n);
161 			len -= n;
162 		}
163 		packOut.compress.finish();
164 		return endObject(md.toObjectId(), offset);
165 	}
166 
167 	private byte[] insertBuffer(long len) {
168 		byte[] buf = buffer();
169 		if (len <= buf.length)
170 			return buf;
171 		if (len < db.getReaderOptions().getStreamFileThreshold()) {
172 			try {
173 				return new byte[(int) len];
174 			} catch (OutOfMemoryError noMem) {
175 				return buf;
176 			}
177 		}
178 		return buf;
179 	}
180 
181 	/** {@inheritDoc} */
182 	@Override
183 	public void flush() throws IOException {
184 		if (packDsc == null)
185 			return;
186 
187 		if (packOut == null)
188 			throw new IOException();
189 
190 		byte[] packHash = packOut.writePackFooter();
191 		packDsc.addFileExt(PACK);
192 		packDsc.setFileSize(PACK, packOut.getCount());
193 		packOut.close();
194 		packOut = null;
195 
196 		sortObjectsById();
197 
198 		PackIndex index = writePackIndex(packDsc, packHash, objectList);
199 		db.commitPack(Collections.singletonList(packDsc), null);
200 		rollback = false;
201 
202 		DfsPackFile p = new DfsPackFile(cache, packDsc);
203 		if (index != null)
204 			p.setPackIndex(index);
205 		db.addPack(p);
206 		clear();
207 	}
208 
209 	/** {@inheritDoc} */
210 	@Override
211 	public void close() {
212 		if (packOut != null) {
213 			try {
214 				packOut.close();
215 			} catch (IOException err) {
216 				// Ignore a close failure, the pack should be removed.
217 			} finally {
218 				packOut = null;
219 			}
220 		}
221 		if (rollback && packDsc != null) {
222 			try {
223 				db.rollbackPack(Collections.singletonList(packDsc));
224 			} finally {
225 				packDsc = null;
226 				rollback = false;
227 			}
228 		}
229 		clear();
230 	}
231 
232 	private void clear() {
233 		objectList = null;
234 		objectMap = null;
235 		packKey = null;
236 		packDsc = null;
237 	}
238 
239 	private long beginObject(int type, long len) throws IOException {
240 		if (packOut == null)
241 			beginPack();
242 		long offset = packOut.getCount();
243 		packOut.beginObject(type, len);
244 		return offset;
245 	}
246 
247 	private ObjectId endObject(ObjectId id, long offset) {
248 		PackedObjectInfo obj = new PackedObjectInfo(id);
249 		obj.setOffset(offset);
250 		obj.setCRC((int) packOut.crc32.getValue());
251 		objectList.add(obj);
252 		objectMap.addIfAbsent(obj);
253 		return id;
254 	}
255 
256 	private void beginPack() throws IOException {
257 		objectList = new BlockList<>();
258 		objectMap = new ObjectIdOwnerMap<>();
259 		cache = DfsBlockCache.getInstance();
260 
261 		rollback = true;
262 		packDsc = db.newPack(DfsObjDatabase.PackSource.INSERT);
263 		DfsOutputStream dfsOut = db.writeFile(packDsc, PACK);
264 		packDsc.setBlockSize(PACK, dfsOut.blockSize());
265 		packOut = new PackStream(dfsOut);
266 		packKey = packDsc.getStreamKey(PACK);
267 
268 		// Write the header as though it were a single object pack.
269 		byte[] buf = packOut.hdrBuf;
270 		System.arraycopy(Constants.PACK_SIGNATURE, 0, buf, 0, 4);
271 		NB.encodeInt32(buf, 4, 2); // Always use pack version 2.
272 		NB.encodeInt32(buf, 8, 1); // Always assume 1 object.
273 		packOut.write(buf, 0, 12);
274 	}
275 
276 	private void sortObjectsById() {
277 		Collections.sort(objectList);
278 	}
279 
280 	@Nullable
281 	private TemporaryBuffer.Heap maybeGetTemporaryBuffer(
282 			List<PackedObjectInfo> list) {
283 		if (list.size() <= 58000) {
284 			return new TemporaryBuffer.Heap(2 << 20);
285 		}
286 		return null;
287 	}
288 
289 	PackIndex writePackIndex(DfsPackDescription pack, byte[] packHash,
290 			List<PackedObjectInfo> list) throws IOException {
291 		pack.setIndexVersion(INDEX_VERSION);
292 		pack.setObjectCount(list.size());
293 
294 		// If there are less than 58,000 objects, the entire index fits in under
295 		// 2 MiB. Callers will probably need the index immediately, so buffer
296 		// the index in process and load from the buffer.
297 		PackIndex packIndex = null;
298 		try (TemporaryBuffer.Heap buf = maybeGetTemporaryBuffer(list);
299 				DfsOutputStream os = db.writeFile(pack, INDEX);
300 				CountingOutputStream cnt = new CountingOutputStream(os)) {
301 			if (buf != null) {
302 				index(buf, packHash, list);
303 				packIndex = PackIndex.read(buf.openInputStream());
304 				buf.writeTo(cnt, null);
305 			} else {
306 				index(cnt, packHash, list);
307 			}
308 			pack.addFileExt(INDEX);
309 			pack.setBlockSize(INDEX, os.blockSize());
310 			pack.setFileSize(INDEX, cnt.getCount());
311 		}
312 		return packIndex;
313 	}
314 
315 	private static void index(OutputStream out, byte[] packHash,
316 			List<PackedObjectInfo> list) throws IOException {
317 		PackIndexWriter.createVersion(out, INDEX_VERSION).write(list, packHash);
318 	}
319 
320 	private class PackStream extends OutputStream {
321 		private final DfsOutputStream out;
322 		private final MessageDigest md;
323 		final byte[] hdrBuf;
324 		private final Deflater deflater;
325 		private final int blockSize;
326 
327 		private long currPos; // Position of currBuf[0] in the output stream.
328 		private int currPtr; // Number of bytes in currBuf.
329 		private byte[] currBuf;
330 
331 		final CRC32 crc32;
332 		final DeflaterOutputStream compress;
333 
334 		PackStream(DfsOutputStream out) {
335 			this.out = out;
336 
337 			hdrBuf = new byte[32];
338 			md = Constants.newMessageDigest();
339 			crc32 = new CRC32();
340 			deflater = new Deflater(compression);
341 			compress = new DeflaterOutputStream(this, deflater, 8192);
342 
343 			int size = out.blockSize();
344 			if (size <= 0)
345 				size = cache.getBlockSize();
346 			else if (size < cache.getBlockSize())
347 				size = (cache.getBlockSize() / size) * size;
348 			blockSize = size;
349 			currBuf = new byte[blockSize];
350 		}
351 
352 		long getCount() {
353 			return currPos + currPtr;
354 		}
355 
356 		void beginObject(int objectType, long length) throws IOException {
357 			crc32.reset();
358 			deflater.reset();
359 			write(hdrBuf, 0, encodeTypeSize(objectType, length));
360 		}
361 
362 		private int encodeTypeSize(int type, long rawLength) {
363 			long nextLength = rawLength >>> 4;
364 			hdrBuf[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F));
365 			rawLength = nextLength;
366 			int n = 1;
367 			while (rawLength > 0) {
368 				nextLength >>>= 7;
369 				hdrBuf[n++] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (rawLength & 0x7F));
370 				rawLength = nextLength;
371 			}
372 			return n;
373 		}
374 
375 		@Override
376 		public void write(int b) throws IOException {
377 			hdrBuf[0] = (byte) b;
378 			write(hdrBuf, 0, 1);
379 		}
380 
381 		@Override
382 		public void write(byte[] data, int off, int len) throws IOException {
383 			crc32.update(data, off, len);
384 			md.update(data, off, len);
385 			writeNoHash(data, off, len);
386 		}
387 
388 		private void writeNoHash(byte[] data, int off, int len)
389 				throws IOException {
390 			while (0 < len) {
391 				int n = Math.min(len, currBuf.length - currPtr);
392 				if (n == 0) {
393 					flushBlock();
394 					currBuf = new byte[blockSize];
395 					continue;
396 				}
397 
398 				System.arraycopy(data, off, currBuf, currPtr, n);
399 				off += n;
400 				len -= n;
401 				currPtr += n;
402 			}
403 		}
404 
405 		private void flushBlock() throws IOException {
406 			out.write(currBuf, 0, currPtr);
407 
408 			byte[] buf;
409 			if (currPtr == currBuf.length)
410 				buf = currBuf;
411 			else
412 				buf = copyOf(currBuf, 0, currPtr);
413 			cache.put(new DfsBlock(packKey, currPos, buf));
414 
415 			currPos += currPtr;
416 			currPtr = 0;
417 			currBuf = null;
418 		}
419 
420 		private byte[] copyOf(byte[] src, int ptr, int cnt) {
421 			byte[] dst = new byte[cnt];
422 			System.arraycopy(src, ptr, dst, 0, cnt);
423 			return dst;
424 		}
425 
426 		byte[] writePackFooter() throws IOException {
427 			byte[] packHash = md.digest();
428 			writeNoHash(packHash, 0, packHash.length);
429 			if (currPtr != 0)
430 				flushBlock();
431 			return packHash;
432 		}
433 
434 		int read(long pos, byte[] dst, int ptr, int cnt) throws IOException {
435 			int r = 0;
436 			while (pos < currPos && r < cnt) {
437 				DfsBlock b = getOrLoadBlock(pos);
438 				int n = b.copy(pos, dst, ptr + r, cnt - r);
439 				pos += n;
440 				r += n;
441 			}
442 			if (currPos <= pos && r < cnt) {
443 				int s = (int) (pos - currPos);
444 				int n = Math.min(currPtr - s, cnt - r);
445 				System.arraycopy(currBuf, s, dst, ptr + r, n);
446 				r += n;
447 			}
448 			return r;
449 		}
450 
451 		byte[] inflate(DfsReader ctx, long pos, int len) throws IOException,
452 				DataFormatException {
453 			byte[] dstbuf;
454 			try {
455 				dstbuf = new byte[len];
456 			} catch (OutOfMemoryError noMemory) {
457 				return null; // Caller will switch to large object streaming.
458 			}
459 
460 			Inflater inf = ctx.inflater();
461 			pos += setInput(pos, inf);
462 			for (int dstoff = 0;;) {
463 				int n = inf.inflate(dstbuf, dstoff, dstbuf.length - dstoff);
464 				dstoff += n;
465 				if (inf.finished())
466 					return dstbuf;
467 				if (inf.needsInput())
468 					pos += setInput(pos, inf);
469 				else if (n == 0)
470 					throw new DataFormatException();
471 			}
472 		}
473 
474 		private int setInput(long pos, Inflater inf)
475 				throws IOException, DataFormatException {
476 			if (pos < currPos)
477 				return getOrLoadBlock(pos).setInput(pos, inf);
478 			if (pos < currPos + currPtr) {
479 				int s = (int) (pos - currPos);
480 				int n = currPtr - s;
481 				inf.setInput(currBuf, s, n);
482 				return n;
483 			}
484 			throw new EOFException(JGitText.get().unexpectedEofInPack);
485 		}
486 
487 		private DfsBlock getOrLoadBlock(long pos) throws IOException {
488 			long s = toBlockStart(pos);
489 			DfsBlock b = cache.get(packKey, s);
490 			if (b != null)
491 				return b;
492 
493 			byte[] d = new byte[blockSize];
494 			for (int p = 0; p < blockSize;) {
495 				int n = out.read(s + p, ByteBuffer.wrap(d, p, blockSize - p));
496 				if (n <= 0)
497 					throw new EOFException(JGitText.get().unexpectedEofInPack);
498 				p += n;
499 			}
500 			b = new DfsBlock(packKey, s, d);
501 			cache.put(b);
502 			return b;
503 		}
504 
505 		private long toBlockStart(long pos) {
506 			return (pos / blockSize) * blockSize;
507 		}
508 
509 		@Override
510 		public void close() throws IOException {
511 			deflater.end();
512 			out.close();
513 		}
514 	}
515 
516 	private class Reader extends ObjectReader {
517 		private final DfsReader ctx = db.newReader();
518 
519 		@Override
520 		public ObjectReader newReader() {
521 			return db.newReader();
522 		}
523 
524 		@Override
525 		public Collection<ObjectId> resolve(AbbreviatedObjectId id)
526 				throws IOException {
527 			Collection<ObjectId> stored = ctx.resolve(id);
528 			if (objectList == null)
529 				return stored;
530 
531 			Set<ObjectId> r = new HashSet<>(stored.size() + 2);
532 			r.addAll(stored);
533 			for (PackedObjectInfo obj : objectList) {
534 				if (id.prefixCompare(obj) == 0)
535 					r.add(obj.copy());
536 			}
537 			return r;
538 		}
539 
540 		@Override
541 		public ObjectLoader open(AnyObjectId objectId, int typeHint)
542 				throws IOException {
543 			if (objectMap == null)
544 				return ctx.open(objectId, typeHint);
545 
546 			PackedObjectInfo obj = objectMap.get(objectId);
547 			if (obj == null)
548 				return ctx.open(objectId, typeHint);
549 
550 			byte[] buf = buffer();
551 			int cnt = packOut.read(obj.getOffset(), buf, 0, 20);
552 			if (cnt <= 0)
553 					throw new EOFException(JGitText.get().unexpectedEofInPack);
554 
555 			int c = buf[0] & 0xff;
556 			int type = (c >> 4) & 7;
557 			if (type == OBJ_OFS_DELTA || type == OBJ_REF_DELTA)
558 				throw new IOException(MessageFormat.format(
559 						JGitText.get().cannotReadBackDelta, Integer.toString(type)));
560 			if (typeHint != OBJ_ANY && type != typeHint) {
561 				throw new IncorrectObjectTypeException(objectId.copy(), typeHint);
562 			}
563 
564 			long sz = c & 0x0f;
565 			int ptr = 1;
566 			int shift = 4;
567 			while ((c & 0x80) != 0) {
568 				if (ptr >= cnt)
569 					throw new EOFException(JGitText.get().unexpectedEofInPack);
570 				c = buf[ptr++] & 0xff;
571 				sz += ((long) (c & 0x7f)) << shift;
572 				shift += 7;
573 			}
574 
575 			long zpos = obj.getOffset() + ptr;
576 			if (sz < ctx.getStreamFileThreshold()) {
577 				byte[] data = inflate(obj, zpos, (int) sz);
578 				if (data != null)
579 					return new ObjectLoader.SmallObject(type, data);
580 			}
581 			return new StreamLoader(obj.copy(), type, sz, packKey, zpos);
582 		}
583 
584 		private byte[] inflate(PackedObjectInfo obj, long zpos, int sz)
585 				throws IOException, CorruptObjectException {
586 			try {
587 				return packOut.inflate(ctx, zpos, sz);
588 			} catch (DataFormatException dfe) {
589 				throw new CorruptObjectException(
590 						MessageFormat.format(
591 								JGitText.get().objectAtHasBadZlibStream,
592 								Long.valueOf(obj.getOffset()),
593 								packDsc.getFileName(PackExt.PACK)),
594 						dfe);
595 			}
596 		}
597 
598 		@Override
599 		public boolean has(AnyObjectId objectId) throws IOException {
600 			return (objectMap != null && objectMap.contains(objectId))
601 					|| ctx.has(objectId);
602 		}
603 
604 		@Override
605 		public Set<ObjectId> getShallowCommits() throws IOException {
606 			return ctx.getShallowCommits();
607 		}
608 
609 		@Override
610 		public ObjectInserter getCreatedFromInserter() {
611 			return DfsInserter.this;
612 		}
613 
614 		@Override
615 		public void close() {
616 			ctx.close();
617 		}
618 	}
619 
620 	private class StreamLoader extends ObjectLoader {
621 		private final ObjectId id;
622 		private final int type;
623 		private final long size;
624 
625 		private final DfsStreamKey srcPack;
626 		private final long pos;
627 
628 		StreamLoader(ObjectId id, int type, long sz,
629 				DfsStreamKey key, long pos) {
630 			this.id = id;
631 			this.type = type;
632 			this.size = sz;
633 			this.srcPack = key;
634 			this.pos = pos;
635 		}
636 
637 		@Override
638 		public ObjectStream openStream() throws IOException {
639 			@SuppressWarnings("resource") // Explicitly closed below
640 			final DfsReader ctx = db.newReader();
641 			if (srcPack != packKey) {
642 				try {
643 					// Post DfsInserter.flush() use the normal code path.
644 					// The newly created pack is registered in the cache.
645 					return ctx.open(id, type).openStream();
646 				} finally {
647 					ctx.close();
648 				}
649 			}
650 
651 			int bufsz = 8192;
652 			final Inflater inf = ctx.inflater();
653 			return new ObjectStream.Filter(type,
654 					size, new BufferedInputStream(new InflaterInputStream(
655 							new ReadBackStream(pos), inf, bufsz), bufsz)) {
656 				@Override
657 				public void close() throws IOException {
658 					ctx.close();
659 					super.close();
660 				}
661 			};
662 		}
663 
664 		@Override
665 		public int getType() {
666 			return type;
667 		}
668 
669 		@Override
670 		public long getSize() {
671 			return size;
672 		}
673 
674 		@Override
675 		public boolean isLarge() {
676 			return true;
677 		}
678 
679 		@Override
680 		public byte[] getCachedBytes() throws LargeObjectException {
681 			throw new LargeObjectException.ExceedsLimit(
682 					db.getReaderOptions().getStreamFileThreshold(), size);
683 		}
684 	}
685 
686 	private final class ReadBackStream extends InputStream {
687 		private long pos;
688 
689 		ReadBackStream(long offset) {
690 			pos = offset;
691 		}
692 
693 		@Override
694 		public int read() throws IOException {
695 			byte[] b = new byte[1];
696 			int n = read(b);
697 			return n == 1 ? b[0] & 0xff : -1;
698 		}
699 
700 		@Override
701 		public int read(byte[] buf, int ptr, int len) throws IOException {
702 			int n = packOut.read(pos, buf, ptr, len);
703 			if (n > 0) {
704 				pos += n;
705 			}
706 			return n;
707 		}
708 	}
709 }