View Javadoc
1   /*
2    * Copyright (C) 2011, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.storage.dfs;
45  
46  import static org.eclipse.jgit.internal.storage.pack.PackExt.INDEX;
47  import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
48  import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
49  import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;
50  
51  import java.io.BufferedInputStream;
52  import java.io.EOFException;
53  import java.io.IOException;
54  import java.io.InputStream;
55  import java.io.OutputStream;
56  import java.nio.ByteBuffer;
57  import java.security.MessageDigest;
58  import java.text.MessageFormat;
59  import java.util.Collection;
60  import java.util.Collections;
61  import java.util.HashSet;
62  import java.util.List;
63  import java.util.Set;
64  import java.util.zip.CRC32;
65  import java.util.zip.DataFormatException;
66  import java.util.zip.Deflater;
67  import java.util.zip.DeflaterOutputStream;
68  import java.util.zip.Inflater;
69  import java.util.zip.InflaterInputStream;
70  
71  import org.eclipse.jgit.errors.CorruptObjectException;
72  import org.eclipse.jgit.errors.LargeObjectException;
73  import org.eclipse.jgit.internal.JGitText;
74  import org.eclipse.jgit.internal.storage.file.PackIndex;
75  import org.eclipse.jgit.internal.storage.file.PackIndexWriter;
76  import org.eclipse.jgit.internal.storage.pack.PackExt;
77  import org.eclipse.jgit.lib.AbbreviatedObjectId;
78  import org.eclipse.jgit.lib.AnyObjectId;
79  import org.eclipse.jgit.lib.Constants;
80  import org.eclipse.jgit.lib.ObjectId;
81  import org.eclipse.jgit.lib.ObjectIdOwnerMap;
82  import org.eclipse.jgit.lib.ObjectInserter;
83  import org.eclipse.jgit.lib.ObjectLoader;
84  import org.eclipse.jgit.lib.ObjectReader;
85  import org.eclipse.jgit.lib.ObjectStream;
86  import org.eclipse.jgit.transport.PackedObjectInfo;
87  import org.eclipse.jgit.util.BlockList;
88  import org.eclipse.jgit.util.IO;
89  import org.eclipse.jgit.util.NB;
90  import org.eclipse.jgit.util.TemporaryBuffer;
91  import org.eclipse.jgit.util.io.CountingOutputStream;
92  
93  /** Inserts objects into the DFS. */
94  public class DfsInserter extends ObjectInserter {
95  	/** Always produce version 2 indexes, to get CRC data. */
96  	private static final int INDEX_VERSION = 2;
97  
98  	final DfsObjDatabase db;
99  	int compression = Deflater.BEST_COMPRESSION;
100 
101 	List<PackedObjectInfo> objectList;
102 	ObjectIdOwnerMap<PackedObjectInfo> objectMap;
103 
104 	DfsBlockCache cache;
105 	DfsPackKey packKey;
106 	DfsPackDescription packDsc;
107 	PackStream packOut;
108 	private boolean rollback;
109 
110 	/**
111 	 * Initialize a new inserter.
112 	 *
113 	 * @param db
114 	 *            database the inserter writes to.
115 	 */
116 	protected DfsInserter(DfsObjDatabase db) {
117 		this.db = db;
118 	}
119 
120 	void setCompressionLevel(int compression) {
121 		this.compression = compression;
122 	}
123 
124 	@Override
125 	public DfsPackParser newPackParser(InputStream in) throws IOException {
126 		return new DfsPackParser(db, this, in);
127 	}
128 
129 	@Override
130 	public ObjectReader newReader() {
131 		return new Reader();
132 	}
133 
134 	@Override
135 	public ObjectId insert(int type, byte[] data, int off, int len)
136 			throws IOException {
137 		ObjectId id = idFor(type, data, off, len);
138 		if (objectMap != null && objectMap.contains(id))
139 			return id;
140 		// Ignore unreachable (garbage) objects here.
141 		if (db.has(id, true))
142 			return id;
143 
144 		long offset = beginObject(type, len);
145 		packOut.compress.write(data, off, len);
146 		packOut.compress.finish();
147 		return endObject(id, offset);
148 	}
149 
150 	@Override
151 	public ObjectId insert(int type, long len, InputStream in)
152 			throws IOException {
153 		byte[] buf = insertBuffer(len);
154 		if (len <= buf.length) {
155 			IO.readFully(in, buf, 0, (int) len);
156 			return insert(type, buf, 0, (int) len);
157 		}
158 
159 		long offset = beginObject(type, len);
160 		MessageDigest md = digest();
161 		md.update(Constants.encodedTypeString(type));
162 		md.update((byte) ' ');
163 		md.update(Constants.encodeASCII(len));
164 		md.update((byte) 0);
165 
166 		while (0 < len) {
167 			int n = in.read(buf, 0, (int) Math.min(buf.length, len));
168 			if (n <= 0)
169 				throw new EOFException();
170 			md.update(buf, 0, n);
171 			packOut.compress.write(buf, 0, n);
172 			len -= n;
173 		}
174 		packOut.compress.finish();
175 		return endObject(ObjectId.fromRaw(md.digest()), offset);
176 	}
177 
178 	private byte[] insertBuffer(long len) {
179 		byte[] buf = buffer();
180 		if (len <= buf.length)
181 			return buf;
182 		if (len < db.getReaderOptions().getStreamFileThreshold()) {
183 			try {
184 				return new byte[(int) len];
185 			} catch (OutOfMemoryError noMem) {
186 				return buf;
187 			}
188 		}
189 		return buf;
190 	}
191 
192 	@Override
193 	public void flush() throws IOException {
194 		if (packDsc == null)
195 			return;
196 
197 		if (packOut == null)
198 			throw new IOException();
199 
200 		byte[] packHash = packOut.writePackFooter();
201 		packDsc.addFileExt(PACK);
202 		packDsc.setFileSize(PACK, packOut.getCount());
203 		packOut.close();
204 		packOut = null;
205 
206 		sortObjectsById();
207 
208 		PackIndex index = writePackIndex(packDsc, packHash, objectList);
209 		db.commitPack(Collections.singletonList(packDsc), null);
210 		rollback = false;
211 
212 		DfsPackFile p = cache.getOrCreate(packDsc, packKey);
213 		if (index != null)
214 			p.setPackIndex(index);
215 		db.addPack(p);
216 		clear();
217 	}
218 
219 	@Override
220 	public void close() {
221 		if (packOut != null) {
222 			try {
223 				packOut.close();
224 			} catch (IOException err) {
225 				// Ignore a close failure, the pack should be removed.
226 			} finally {
227 				packOut = null;
228 			}
229 		}
230 		if (rollback && packDsc != null) {
231 			try {
232 				db.rollbackPack(Collections.singletonList(packDsc));
233 			} finally {
234 				packDsc = null;
235 				rollback = false;
236 			}
237 		}
238 		clear();
239 	}
240 
241 	private void clear() {
242 		objectList = null;
243 		objectMap = null;
244 		packKey = null;
245 		packDsc = null;
246 	}
247 
248 	private long beginObject(int type, long len) throws IOException {
249 		if (packOut == null)
250 			beginPack();
251 		long offset = packOut.getCount();
252 		packOut.beginObject(type, len);
253 		return offset;
254 	}
255 
256 	private ObjectId endObject(ObjectId id, long offset) {
257 		PackedObjectInfo obj = new PackedObjectInfo(id);
258 		obj.setOffset(offset);
259 		obj.setCRC((int) packOut.crc32.getValue());
260 		objectList.add(obj);
261 		objectMap.addIfAbsent(obj);
262 		return id;
263 	}
264 
265 	private void beginPack() throws IOException {
266 		objectList = new BlockList<PackedObjectInfo>();
267 		objectMap = new ObjectIdOwnerMap<PackedObjectInfo>();
268 		cache = DfsBlockCache.getInstance();
269 
270 		rollback = true;
271 		packDsc = db.newPack(DfsObjDatabase.PackSource.INSERT);
272 		packOut = new PackStream(db.writeFile(packDsc, PACK));
273 		packKey = new DfsPackKey();
274 
275 		// Write the header as though it were a single object pack.
276 		byte[] buf = packOut.hdrBuf;
277 		System.arraycopy(Constants.PACK_SIGNATURE, 0, buf, 0, 4);
278 		NB.encodeInt32(buf, 4, 2); // Always use pack version 2.
279 		NB.encodeInt32(buf, 8, 1); // Always assume 1 object.
280 		packOut.write(buf, 0, 12);
281 	}
282 
283 	private void sortObjectsById() {
284 		Collections.sort(objectList);
285 	}
286 
287 	PackIndex writePackIndex(DfsPackDescription pack, byte[] packHash,
288 			List<PackedObjectInfo> list) throws IOException {
289 		pack.setIndexVersion(INDEX_VERSION);
290 		pack.setObjectCount(list.size());
291 
292 		// If there are less than 58,000 objects, the entire index fits in under
293 		// 2 MiB. Callers will probably need the index immediately, so buffer
294 		// the index in process and load from the buffer.
295 		TemporaryBuffer.Heap buf = null;
296 		PackIndex packIndex = null;
297 		if (list.size() <= 58000) {
298 			buf = new TemporaryBuffer.Heap(2 << 20);
299 			index(buf, packHash, list);
300 			packIndex = PackIndex.read(buf.openInputStream());
301 		}
302 
303 		DfsOutputStream os = db.writeFile(pack, INDEX);
304 		try {
305 			CountingOutputStream cnt = new CountingOutputStream(os);
306 			if (buf != null)
307 				buf.writeTo(cnt, null);
308 			else
309 				index(cnt, packHash, list);
310 			pack.addFileExt(INDEX);
311 			pack.setFileSize(INDEX, cnt.getCount());
312 		} finally {
313 			os.close();
314 		}
315 		return packIndex;
316 	}
317 
318 	private static void index(OutputStream out, byte[] packHash,
319 			List<PackedObjectInfo> list) throws IOException {
320 		PackIndexWriter.createVersion(out, INDEX_VERSION).write(list, packHash);
321 	}
322 
323 	private class PackStream extends OutputStream {
324 		private final DfsOutputStream out;
325 		private final MessageDigest md;
326 		final byte[] hdrBuf;
327 		private final Deflater deflater;
328 		private final int blockSize;
329 
330 		private long currPos; // Position of currBuf[0] in the output stream.
331 		private int currPtr; // Number of bytes in currBuf.
332 		private byte[] currBuf;
333 
334 		final CRC32 crc32;
335 		final DeflaterOutputStream compress;
336 
337 		PackStream(DfsOutputStream out) {
338 			this.out = out;
339 
340 			hdrBuf = new byte[32];
341 			md = Constants.newMessageDigest();
342 			crc32 = new CRC32();
343 			deflater = new Deflater(compression);
344 			compress = new DeflaterOutputStream(this, deflater, 8192);
345 
346 			int size = out.blockSize();
347 			if (size <= 0)
348 				size = cache.getBlockSize();
349 			else if (size < cache.getBlockSize())
350 				size = (cache.getBlockSize() / size) * size;
351 			blockSize = size;
352 			currBuf = new byte[blockSize];
353 		}
354 
355 		long getCount() {
356 			return currPos + currPtr;
357 		}
358 
359 		void beginObject(int objectType, long length) throws IOException {
360 			crc32.reset();
361 			deflater.reset();
362 			write(hdrBuf, 0, encodeTypeSize(objectType, length));
363 		}
364 
365 		private int encodeTypeSize(int type, long rawLength) {
366 			long nextLength = rawLength >>> 4;
367 			hdrBuf[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F));
368 			rawLength = nextLength;
369 			int n = 1;
370 			while (rawLength > 0) {
371 				nextLength >>>= 7;
372 				hdrBuf[n++] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (rawLength & 0x7F));
373 				rawLength = nextLength;
374 			}
375 			return n;
376 		}
377 
378 		@Override
379 		public void write(final int b) throws IOException {
380 			hdrBuf[0] = (byte) b;
381 			write(hdrBuf, 0, 1);
382 		}
383 
384 		@Override
385 		public void write(byte[] data, int off, int len) throws IOException {
386 			crc32.update(data, off, len);
387 			md.update(data, off, len);
388 			writeNoHash(data, off, len);
389 		}
390 
391 		private void writeNoHash(byte[] data, int off, int len)
392 				throws IOException {
393 			while (0 < len) {
394 				int n = Math.min(len, currBuf.length - currPtr);
395 				if (n == 0) {
396 					flushBlock();
397 					currBuf = new byte[blockSize];
398 					continue;
399 				}
400 
401 				System.arraycopy(data, off, currBuf, currPtr, n);
402 				off += n;
403 				len -= n;
404 				currPtr += n;
405 			}
406 		}
407 
408 		private void flushBlock() throws IOException {
409 			out.write(currBuf, 0, currPtr);
410 
411 			byte[] buf;
412 			if (currPtr == currBuf.length)
413 				buf = currBuf;
414 			else
415 				buf = copyOf(currBuf, 0, currPtr);
416 			cache.put(new DfsBlock(packKey, currPos, buf));
417 
418 			currPos += currPtr;
419 			currPtr = 0;
420 			currBuf = null;
421 		}
422 
423 		private byte[] copyOf(byte[] src, int ptr, int cnt) {
424 			byte[] dst = new byte[cnt];
425 			System.arraycopy(src, ptr, dst, 0, cnt);
426 			return dst;
427 		}
428 
429 		byte[] writePackFooter() throws IOException {
430 			byte[] packHash = md.digest();
431 			writeNoHash(packHash, 0, packHash.length);
432 			if (currPtr != 0)
433 				flushBlock();
434 			return packHash;
435 		}
436 
437 		int read(long pos, byte[] dst, int ptr, int cnt) throws IOException {
438 			int r = 0;
439 			while (pos < currPos && r < cnt) {
440 				DfsBlock b = getOrLoadBlock(pos);
441 				int n = b.copy(pos, dst, ptr + r, cnt - r);
442 				pos += n;
443 				r += n;
444 			}
445 			if (currPos <= pos && r < cnt) {
446 				int s = (int) (pos - currPos);
447 				int n = Math.min(currPtr - s, cnt - r);
448 				System.arraycopy(currBuf, s, dst, ptr + r, n);
449 				r += n;
450 			}
451 			return r;
452 		}
453 
454 		byte[] inflate(DfsReader ctx, long pos, int len) throws IOException,
455 				DataFormatException {
456 			byte[] dstbuf;
457 			try {
458 				dstbuf = new byte[len];
459 			} catch (OutOfMemoryError noMemory) {
460 				return null; // Caller will switch to large object streaming.
461 			}
462 
463 			Inflater inf = ctx.inflater();
464 			pos += setInput(pos, inf);
465 			for (int dstoff = 0;;) {
466 				int n = inf.inflate(dstbuf, dstoff, dstbuf.length - dstoff);
467 				dstoff += n;
468 				if (inf.finished())
469 					return dstbuf;
470 				if (inf.needsInput())
471 					pos += setInput(pos, inf);
472 				else if (n == 0)
473 					throw new DataFormatException();
474 			}
475 		}
476 
477 		private int setInput(long pos, Inflater inf) throws IOException {
478 			if (pos < currPos)
479 				return getOrLoadBlock(pos).setInput(pos, inf);
480 			if (pos < currPos + currPtr) {
481 				int s = (int) (pos - currPos);
482 				int n = currPtr - s;
483 				inf.setInput(currBuf, s, n);
484 				return n;
485 			}
486 			throw new EOFException(DfsText.get().unexpectedEofInPack);
487 		}
488 
489 		private DfsBlock getOrLoadBlock(long pos) throws IOException {
490 			long s = toBlockStart(pos);
491 			DfsBlock b = cache.get(packKey, s);
492 			if (b != null)
493 				return b;
494 
495 			byte[] d = new byte[blockSize];
496 			for (int p = 0; p < blockSize;) {
497 				int n = out.read(s + p, ByteBuffer.wrap(d, p, blockSize - p));
498 				if (n <= 0)
499 					throw new EOFException(DfsText.get().unexpectedEofInPack);
500 				p += n;
501 			}
502 			b = new DfsBlock(packKey, s, d);
503 			cache.put(b);
504 			return b;
505 		}
506 
507 		private long toBlockStart(long pos) {
508 			return (pos / blockSize) * blockSize;
509 		}
510 
511 		@Override
512 		public void close() throws IOException {
513 			deflater.end();
514 			out.close();
515 		}
516 	}
517 
518 	private class Reader extends ObjectReader {
519 		private final DfsReader ctx = new DfsReader(db);
520 
521 		@Override
522 		public ObjectReader newReader() {
523 			return db.newReader();
524 		}
525 
526 		@Override
527 		public Collection<ObjectId> resolve(AbbreviatedObjectId id)
528 				throws IOException {
529 			Collection<ObjectId> stored = ctx.resolve(id);
530 			if (objectList == null)
531 				return stored;
532 
533 			Set<ObjectId> r = new HashSet<ObjectId>(stored.size() + 2);
534 			r.addAll(stored);
535 			for (PackedObjectInfo obj : objectList) {
536 				if (id.prefixCompare(obj) == 0)
537 					r.add(obj.copy());
538 			}
539 			return r;
540 		}
541 
542 		@Override
543 		public ObjectLoader open(AnyObjectId objectId, int typeHint)
544 				throws IOException {
545 			if (objectMap == null)
546 				return ctx.open(objectId, typeHint);
547 
548 			PackedObjectInfo obj = objectMap.get(objectId);
549 			if (obj == null)
550 				return ctx.open(objectId, typeHint);
551 
552 			byte[] buf = buffer();
553 			int cnt = packOut.read(obj.getOffset(), buf, 0, 20);
554 			if (cnt <= 0)
555 					throw new EOFException(DfsText.get().unexpectedEofInPack);
556 
557 			int c = buf[0] & 0xff;
558 			int type = (c >> 4) & 7;
559 			if (type == OBJ_OFS_DELTA || type == OBJ_REF_DELTA)
560 				throw new IOException(MessageFormat.format(
561 						DfsText.get().cannotReadBackDelta, Integer.toString(type)));
562 
563 			long sz = c & 0x0f;
564 			int ptr = 1;
565 			int shift = 4;
566 			while ((c & 0x80) != 0) {
567 				if (ptr >= cnt)
568 					throw new EOFException(DfsText.get().unexpectedEofInPack);
569 				c = buf[ptr++] & 0xff;
570 				sz += ((long) (c & 0x7f)) << shift;
571 				shift += 7;
572 			}
573 
574 			long zpos = obj.getOffset() + ptr;
575 			if (sz < ctx.getStreamFileThreshold()) {
576 				byte[] data = inflate(obj, zpos, (int) sz);
577 				if (data != null)
578 					return new ObjectLoader.SmallObject(type, data);
579 			}
580 			return new StreamLoader(obj.copy(), type, sz, packKey, zpos);
581 		}
582 
583 		private byte[] inflate(PackedObjectInfo obj, long zpos, int sz)
584 				throws IOException, CorruptObjectException {
585 			try {
586 				return packOut.inflate(ctx, zpos, sz);
587 			} catch (DataFormatException dfe) {
588 				CorruptObjectException coe = new CorruptObjectException(
589 						MessageFormat.format(
590 								JGitText.get().objectAtHasBadZlibStream,
591 								Long.valueOf(obj.getOffset()),
592 								packDsc.getFileName(PackExt.PACK)));
593 				coe.initCause(dfe);
594 				throw coe;
595 			}
596 		}
597 
598 		@Override
599 		public Set<ObjectId> getShallowCommits() throws IOException {
600 			return ctx.getShallowCommits();
601 		}
602 
603 		@Override
604 		public void close() {
605 			ctx.close();
606 		}
607 	}
608 
609 	private class StreamLoader extends ObjectLoader {
610 		private final ObjectId id;
611 		private final int type;
612 		private final long size;
613 
614 		private final DfsPackKey srcPack;
615 		private final long pos;
616 
617 		StreamLoader(ObjectId id, int type, long sz,
618 				DfsPackKey key, long pos) {
619 			this.id = id;
620 			this.type = type;
621 			this.size = sz;
622 			this.srcPack = key;
623 			this.pos = pos;
624 		}
625 
626 		@Override
627 		public ObjectStream openStream() throws IOException {
628 			final DfsReader ctx = new DfsReader(db);
629 			if (srcPack != packKey) {
630 				try {
631 					// Post DfsInserter.flush() use the normal code path.
632 					// The newly created pack is registered in the cache.
633 					return ctx.open(id, type).openStream();
634 				} finally {
635 					ctx.close();
636 				}
637 			}
638 
639 			int bufsz = 8192;
640 			final Inflater inf = ctx.inflater();
641 			return new ObjectStream.Filter(type,
642 					size, new BufferedInputStream(new InflaterInputStream(
643 							new ReadBackStream(pos), inf, bufsz), bufsz)) {
644 				@Override
645 				public void close() throws IOException {
646 					ctx.close();
647 					super.close();
648 				}
649 			};
650 		}
651 
652 		@Override
653 		public int getType() {
654 			return type;
655 		}
656 
657 		@Override
658 		public long getSize() {
659 			return size;
660 		}
661 
662 		@Override
663 		public boolean isLarge() {
664 			return true;
665 		}
666 
667 		@Override
668 		public byte[] getCachedBytes() throws LargeObjectException {
669 			throw new LargeObjectException.ExceedsLimit(
670 					db.getReaderOptions().getStreamFileThreshold(), size);
671 		}
672 	}
673 
674 	private final class ReadBackStream extends InputStream {
675 		private long pos;
676 
677 		ReadBackStream(long offset) {
678 			pos = offset;
679 		}
680 
681 		@Override
682 		public int read() throws IOException {
683 			byte[] b = new byte[1];
684 			int n = read(b);
685 			return n == 1 ? b[0] & 0xff : -1;
686 		}
687 
688 		@Override
689 		public int read(byte[] buf, int ptr, int len) throws IOException {
690 			int n = packOut.read(pos, buf, ptr, len);
691 			if (n > 0) {
692 				pos += n;
693 			}
694 			return n;
695 		}
696 	}
697 }