View Javadoc
1   /*
2    * Copyright (C) 2011, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.storage.dfs;
45  
46  import static org.eclipse.jgit.internal.storage.pack.PackExt.INDEX;
47  import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
48  import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
49  import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;
50  
51  import java.io.BufferedInputStream;
52  import java.io.EOFException;
53  import java.io.IOException;
54  import java.io.InputStream;
55  import java.io.OutputStream;
56  import java.nio.ByteBuffer;
57  import java.security.MessageDigest;
58  import java.text.MessageFormat;
59  import java.util.Collection;
60  import java.util.Collections;
61  import java.util.HashSet;
62  import java.util.List;
63  import java.util.Set;
64  import java.util.zip.CRC32;
65  import java.util.zip.DataFormatException;
66  import java.util.zip.Deflater;
67  import java.util.zip.DeflaterOutputStream;
68  import java.util.zip.Inflater;
69  import java.util.zip.InflaterInputStream;
70  
71  import org.eclipse.jgit.errors.CorruptObjectException;
72  import org.eclipse.jgit.errors.LargeObjectException;
73  import org.eclipse.jgit.internal.JGitText;
74  import org.eclipse.jgit.internal.storage.file.PackIndex;
75  import org.eclipse.jgit.internal.storage.file.PackIndexWriter;
76  import org.eclipse.jgit.internal.storage.pack.PackExt;
77  import org.eclipse.jgit.lib.AbbreviatedObjectId;
78  import org.eclipse.jgit.lib.AnyObjectId;
79  import org.eclipse.jgit.lib.Constants;
80  import org.eclipse.jgit.lib.ObjectId;
81  import org.eclipse.jgit.lib.ObjectIdOwnerMap;
82  import org.eclipse.jgit.lib.ObjectInserter;
83  import org.eclipse.jgit.lib.ObjectLoader;
84  import org.eclipse.jgit.lib.ObjectReader;
85  import org.eclipse.jgit.lib.ObjectStream;
86  import org.eclipse.jgit.transport.PackedObjectInfo;
87  import org.eclipse.jgit.util.BlockList;
88  import org.eclipse.jgit.util.IO;
89  import org.eclipse.jgit.util.NB;
90  import org.eclipse.jgit.util.TemporaryBuffer;
91  import org.eclipse.jgit.util.io.CountingOutputStream;
92  
93  /** Inserts objects into the DFS. */
94  public class DfsInserter extends ObjectInserter {
95  	/** Always produce version 2 indexes, to get CRC data. */
96  	private static final int INDEX_VERSION = 2;
97  
98  	private final DfsObjDatabase db;
99  	private int compression = Deflater.BEST_COMPRESSION;
100 
101 	private List<PackedObjectInfo> objectList;
102 	private ObjectIdOwnerMap<PackedObjectInfo> objectMap;
103 
104 	private DfsBlockCache cache;
105 	private DfsPackKey packKey;
106 	private DfsPackDescription packDsc;
107 	private PackStream packOut;
108 	private boolean rollback;
109 
110 	/**
111 	 * Initialize a new inserter.
112 	 *
113 	 * @param db
114 	 *            database the inserter writes to.
115 	 */
116 	protected DfsInserter(DfsObjDatabase db) {
117 		this.db = db;
118 	}
119 
120 	void setCompressionLevel(int compression) {
121 		this.compression = compression;
122 	}
123 
124 	@Override
125 	public DfsPackParser newPackParser(InputStream in) throws IOException {
126 		return new DfsPackParser(db, this, in);
127 	}
128 
129 	@Override
130 	public ObjectReader newReader() {
131 		return new Reader();
132 	}
133 
134 	@Override
135 	public ObjectId insert(int type, byte[] data, int off, int len)
136 			throws IOException {
137 		ObjectId id = idFor(type, data, off, len);
138 		if (objectMap != null && objectMap.contains(id))
139 			return id;
140 		if (db.has(id))
141 			return id;
142 
143 		long offset = beginObject(type, len);
144 		packOut.compress.write(data, off, len);
145 		packOut.compress.finish();
146 		return endObject(id, offset);
147 	}
148 
149 	@Override
150 	public ObjectId insert(int type, long len, InputStream in)
151 			throws IOException {
152 		byte[] buf = insertBuffer(len);
153 		if (len <= buf.length) {
154 			IO.readFully(in, buf, 0, (int) len);
155 			return insert(type, buf, 0, (int) len);
156 		}
157 
158 		long offset = beginObject(type, len);
159 		MessageDigest md = digest();
160 		md.update(Constants.encodedTypeString(type));
161 		md.update((byte) ' ');
162 		md.update(Constants.encodeASCII(len));
163 		md.update((byte) 0);
164 
165 		while (0 < len) {
166 			int n = in.read(buf, 0, (int) Math.min(buf.length, len));
167 			if (n <= 0)
168 				throw new EOFException();
169 			md.update(buf, 0, n);
170 			packOut.compress.write(buf, 0, n);
171 			len -= n;
172 		}
173 		packOut.compress.finish();
174 		return endObject(ObjectId.fromRaw(md.digest()), offset);
175 	}
176 
177 	private byte[] insertBuffer(long len) {
178 		byte[] buf = buffer();
179 		if (len <= buf.length)
180 			return buf;
181 		if (len < db.getReaderOptions().getStreamFileThreshold()) {
182 			try {
183 				return new byte[(int) len];
184 			} catch (OutOfMemoryError noMem) {
185 				return buf;
186 			}
187 		}
188 		return buf;
189 	}
190 
191 	@Override
192 	public void flush() throws IOException {
193 		if (packDsc == null)
194 			return;
195 
196 		if (packOut == null)
197 			throw new IOException();
198 
199 		byte[] packHash = packOut.writePackFooter();
200 		packDsc.addFileExt(PACK);
201 		packDsc.setFileSize(PACK, packOut.getCount());
202 		packOut.close();
203 		packOut = null;
204 
205 		sortObjectsById();
206 
207 		PackIndex index = writePackIndex(packDsc, packHash, objectList);
208 		db.commitPack(Collections.singletonList(packDsc), null);
209 		rollback = false;
210 
211 		DfsPackFile p = cache.getOrCreate(packDsc, packKey);
212 		if (index != null)
213 			p.setPackIndex(index);
214 		db.addPack(p);
215 		clear();
216 	}
217 
218 	@Override
219 	public void close() {
220 		if (packOut != null) {
221 			try {
222 				packOut.close();
223 			} catch (IOException err) {
224 				// Ignore a close failure, the pack should be removed.
225 			} finally {
226 				packOut = null;
227 			}
228 		}
229 		if (rollback && packDsc != null) {
230 			try {
231 				db.rollbackPack(Collections.singletonList(packDsc));
232 			} finally {
233 				packDsc = null;
234 				rollback = false;
235 			}
236 		}
237 		clear();
238 	}
239 
240 	private void clear() {
241 		objectList = null;
242 		objectMap = null;
243 		packKey = null;
244 		packDsc = null;
245 	}
246 
247 	private long beginObject(int type, long len) throws IOException {
248 		if (packOut == null)
249 			beginPack();
250 		long offset = packOut.getCount();
251 		packOut.beginObject(type, len);
252 		return offset;
253 	}
254 
255 	private ObjectId endObject(ObjectId id, long offset) {
256 		PackedObjectInfo obj = new PackedObjectInfo(id);
257 		obj.setOffset(offset);
258 		obj.setCRC((int) packOut.crc32.getValue());
259 		objectList.add(obj);
260 		objectMap.addIfAbsent(obj);
261 		return id;
262 	}
263 
264 	private void beginPack() throws IOException {
265 		objectList = new BlockList<PackedObjectInfo>();
266 		objectMap = new ObjectIdOwnerMap<PackedObjectInfo>();
267 		cache = DfsBlockCache.getInstance();
268 
269 		rollback = true;
270 		packDsc = db.newPack(DfsObjDatabase.PackSource.INSERT);
271 		packOut = new PackStream(db.writeFile(packDsc, PACK));
272 		packKey = new DfsPackKey();
273 
274 		// Write the header as though it were a single object pack.
275 		byte[] buf = packOut.hdrBuf;
276 		System.arraycopy(Constants.PACK_SIGNATURE, 0, buf, 0, 4);
277 		NB.encodeInt32(buf, 4, 2); // Always use pack version 2.
278 		NB.encodeInt32(buf, 8, 1); // Always assume 1 object.
279 		packOut.write(buf, 0, 12);
280 	}
281 
282 	private void sortObjectsById() {
283 		Collections.sort(objectList);
284 	}
285 
286 	PackIndex writePackIndex(DfsPackDescription pack, byte[] packHash,
287 			List<PackedObjectInfo> list) throws IOException {
288 		pack.setIndexVersion(INDEX_VERSION);
289 		pack.setObjectCount(list.size());
290 
291 		// If there are less than 58,000 objects, the entire index fits in under
292 		// 2 MiB. Callers will probably need the index immediately, so buffer
293 		// the index in process and load from the buffer.
294 		TemporaryBuffer.Heap buf = null;
295 		PackIndex packIndex = null;
296 		if (list.size() <= 58000) {
297 			buf = new TemporaryBuffer.Heap(2 << 20);
298 			index(buf, packHash, list);
299 			packIndex = PackIndex.read(buf.openInputStream());
300 		}
301 
302 		DfsOutputStream os = db.writeFile(pack, INDEX);
303 		try {
304 			CountingOutputStream cnt = new CountingOutputStream(os);
305 			if (buf != null)
306 				buf.writeTo(cnt, null);
307 			else
308 				index(cnt, packHash, list);
309 			pack.addFileExt(INDEX);
310 			pack.setFileSize(INDEX, cnt.getCount());
311 		} finally {
312 			os.close();
313 		}
314 		return packIndex;
315 	}
316 
317 	private static void index(OutputStream out, byte[] packHash,
318 			List<PackedObjectInfo> list) throws IOException {
319 		PackIndexWriter.createVersion(out, INDEX_VERSION).write(list, packHash);
320 	}
321 
322 	private class PackStream extends OutputStream {
323 		private final DfsOutputStream out;
324 		private final MessageDigest md;
325 		private final byte[] hdrBuf;
326 		private final Deflater deflater;
327 		private final int blockSize;
328 
329 		private long currPos; // Position of currBuf[0] in the output stream.
330 		private int currPtr; // Number of bytes in currBuf.
331 		private byte[] currBuf;
332 
333 		final CRC32 crc32;
334 		final DeflaterOutputStream compress;
335 
336 		PackStream(DfsOutputStream out) {
337 			this.out = out;
338 
339 			hdrBuf = new byte[32];
340 			md = Constants.newMessageDigest();
341 			crc32 = new CRC32();
342 			deflater = new Deflater(compression);
343 			compress = new DeflaterOutputStream(this, deflater, 8192);
344 
345 			int size = out.blockSize();
346 			if (size <= 0)
347 				size = cache.getBlockSize();
348 			else if (size < cache.getBlockSize())
349 				size = (cache.getBlockSize() / size) * size;
350 			blockSize = size;
351 			currBuf = new byte[blockSize];
352 		}
353 
354 		long getCount() {
355 			return currPos + currPtr;
356 		}
357 
358 		void beginObject(int objectType, long length) throws IOException {
359 			crc32.reset();
360 			deflater.reset();
361 			write(hdrBuf, 0, encodeTypeSize(objectType, length));
362 		}
363 
364 		private int encodeTypeSize(int type, long rawLength) {
365 			long nextLength = rawLength >>> 4;
366 			hdrBuf[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F));
367 			rawLength = nextLength;
368 			int n = 1;
369 			while (rawLength > 0) {
370 				nextLength >>>= 7;
371 				hdrBuf[n++] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (rawLength & 0x7F));
372 				rawLength = nextLength;
373 			}
374 			return n;
375 		}
376 
377 		@Override
378 		public void write(final int b) throws IOException {
379 			hdrBuf[0] = (byte) b;
380 			write(hdrBuf, 0, 1);
381 		}
382 
383 		@Override
384 		public void write(byte[] data, int off, int len) throws IOException {
385 			crc32.update(data, off, len);
386 			md.update(data, off, len);
387 			writeNoHash(data, off, len);
388 		}
389 
390 		private void writeNoHash(byte[] data, int off, int len)
391 				throws IOException {
392 			while (0 < len) {
393 				int n = Math.min(len, currBuf.length - currPtr);
394 				if (n == 0) {
395 					flushBlock();
396 					currBuf = new byte[blockSize];
397 					continue;
398 				}
399 
400 				System.arraycopy(data, off, currBuf, currPtr, n);
401 				off += n;
402 				len -= n;
403 				currPtr += n;
404 			}
405 		}
406 
407 		private void flushBlock() throws IOException {
408 			out.write(currBuf, 0, currPtr);
409 
410 			byte[] buf;
411 			if (currPtr == currBuf.length)
412 				buf = currBuf;
413 			else
414 				buf = copyOf(currBuf, 0, currPtr);
415 			cache.put(new DfsBlock(packKey, currPos, buf));
416 
417 			currPos += currPtr;
418 			currPtr = 0;
419 			currBuf = null;
420 		}
421 
422 		private byte[] copyOf(byte[] src, int ptr, int cnt) {
423 			byte[] dst = new byte[cnt];
424 			System.arraycopy(src, ptr, dst, 0, cnt);
425 			return dst;
426 		}
427 
428 		byte[] writePackFooter() throws IOException {
429 			byte[] packHash = md.digest();
430 			writeNoHash(packHash, 0, packHash.length);
431 			if (currPtr != 0)
432 				flushBlock();
433 			return packHash;
434 		}
435 
436 		int read(long pos, byte[] dst, int ptr, int cnt) throws IOException {
437 			int r = 0;
438 			while (pos < currPos && r < cnt) {
439 				DfsBlock b = getOrLoadBlock(pos);
440 				int n = b.copy(pos, dst, ptr + r, cnt - r);
441 				pos += n;
442 				r += n;
443 			}
444 			if (currPos <= pos && r < cnt) {
445 				int s = (int) (pos - currPos);
446 				int n = Math.min(currPtr - s, cnt - r);
447 				System.arraycopy(currBuf, s, dst, ptr + r, n);
448 				r += n;
449 			}
450 			return r;
451 		}
452 
453 		byte[] inflate(DfsReader ctx, long pos, int len) throws IOException,
454 				DataFormatException {
455 			byte[] dstbuf;
456 			try {
457 				dstbuf = new byte[len];
458 			} catch (OutOfMemoryError noMemory) {
459 				return null; // Caller will switch to large object streaming.
460 			}
461 
462 			Inflater inf = ctx.inflater();
463 			pos += setInput(pos, inf);
464 			for (int dstoff = 0;;) {
465 				int n = inf.inflate(dstbuf, dstoff, dstbuf.length - dstoff);
466 				dstoff += n;
467 				if (inf.finished())
468 					return dstbuf;
469 				if (inf.needsInput())
470 					pos += setInput(pos, inf);
471 				else if (n == 0)
472 					throw new DataFormatException();
473 			}
474 		}
475 
476 		private int setInput(long pos, Inflater inf) throws IOException {
477 			if (pos < currPos)
478 				return getOrLoadBlock(pos).setInput(pos, inf);
479 			if (pos < currPos + currPtr) {
480 				int s = (int) (pos - currPos);
481 				int n = currPtr - s;
482 				inf.setInput(currBuf, s, n);
483 				return n;
484 			}
485 			throw new EOFException(DfsText.get().unexpectedEofInPack);
486 		}
487 
488 		private DfsBlock getOrLoadBlock(long pos) throws IOException {
489 			long s = toBlockStart(pos);
490 			DfsBlock b = cache.get(packKey, s);
491 			if (b != null)
492 				return b;
493 
494 			byte[] d = new byte[blockSize];
495 			for (int p = 0; p < blockSize;) {
496 				int n = out.read(s + p, ByteBuffer.wrap(d, p, blockSize - p));
497 				if (n <= 0)
498 					throw new EOFException(DfsText.get().unexpectedEofInPack);
499 				p += n;
500 			}
501 			b = new DfsBlock(packKey, s, d);
502 			cache.put(b);
503 			return b;
504 		}
505 
506 		private long toBlockStart(long pos) {
507 			return (pos / blockSize) * blockSize;
508 		}
509 
510 		@Override
511 		public void close() throws IOException {
512 			deflater.end();
513 			out.close();
514 		}
515 	}
516 
517 	private class Reader extends ObjectReader {
518 		private final DfsReader ctx = new DfsReader(db);
519 
520 		@Override
521 		public ObjectReader newReader() {
522 			return db.newReader();
523 		}
524 
525 		@Override
526 		public Collection<ObjectId> resolve(AbbreviatedObjectId id)
527 				throws IOException {
528 			Collection<ObjectId> stored = ctx.resolve(id);
529 			if (objectList == null)
530 				return stored;
531 
532 			Set<ObjectId> r = new HashSet<ObjectId>(stored.size() + 2);
533 			r.addAll(stored);
534 			for (PackedObjectInfo obj : objectList) {
535 				if (id.prefixCompare(obj) == 0)
536 					r.add(obj.copy());
537 			}
538 			return r;
539 		}
540 
541 		@Override
542 		public ObjectLoader open(AnyObjectId objectId, int typeHint)
543 				throws IOException {
544 			if (objectMap == null)
545 				return ctx.open(objectId, typeHint);
546 
547 			PackedObjectInfo obj = objectMap.get(objectId);
548 			if (obj == null)
549 				return ctx.open(objectId, typeHint);
550 
551 			byte[] buf = buffer();
552 			int cnt = packOut.read(obj.getOffset(), buf, 0, 20);
553 			if (cnt <= 0)
554 					throw new EOFException(DfsText.get().unexpectedEofInPack);
555 
556 			int c = buf[0] & 0xff;
557 			int type = (c >> 4) & 7;
558 			if (type == OBJ_OFS_DELTA || type == OBJ_REF_DELTA)
559 				throw new IOException(MessageFormat.format(
560 						DfsText.get().cannotReadBackDelta, Integer.toString(type)));
561 
562 			long sz = c & 0x0f;
563 			int ptr = 1;
564 			int shift = 4;
565 			while ((c & 0x80) != 0) {
566 				if (ptr >= cnt)
567 					throw new EOFException(DfsText.get().unexpectedEofInPack);
568 				c = buf[ptr++] & 0xff;
569 				sz += ((long) (c & 0x7f)) << shift;
570 				shift += 7;
571 			}
572 
573 			long zpos = obj.getOffset() + ptr;
574 			if (sz < ctx.getStreamFileThreshold()) {
575 				byte[] data = inflate(obj, zpos, (int) sz);
576 				if (data != null)
577 					return new ObjectLoader.SmallObject(type, data);
578 			}
579 			return new StreamLoader(obj.copy(), type, sz, packKey, zpos);
580 		}
581 
582 		private byte[] inflate(PackedObjectInfo obj, long zpos, int sz)
583 				throws IOException, CorruptObjectException {
584 			try {
585 				return packOut.inflate(ctx, zpos, sz);
586 			} catch (DataFormatException dfe) {
587 				CorruptObjectException coe = new CorruptObjectException(
588 						MessageFormat.format(
589 								JGitText.get().objectAtHasBadZlibStream,
590 								Long.valueOf(obj.getOffset()),
591 								packDsc.getFileName(PackExt.PACK)));
592 				coe.initCause(dfe);
593 				throw coe;
594 			}
595 		}
596 
597 		@Override
598 		public Set<ObjectId> getShallowCommits() throws IOException {
599 			return ctx.getShallowCommits();
600 		}
601 
602 		@Override
603 		public void close() {
604 			ctx.close();
605 		}
606 	}
607 
608 	private class StreamLoader extends ObjectLoader {
609 		private final ObjectId id;
610 		private final int type;
611 		private final long size;
612 
613 		private final DfsPackKey srcPack;
614 		private final long pos;
615 
616 		StreamLoader(ObjectId id, int type, long sz,
617 				DfsPackKey key, long pos) {
618 			this.id = id;
619 			this.type = type;
620 			this.size = sz;
621 			this.srcPack = key;
622 			this.pos = pos;
623 		}
624 
625 		@Override
626 		public ObjectStream openStream() throws IOException {
627 			final DfsReader ctx = new DfsReader(db);
628 			if (srcPack != packKey) {
629 				try {
630 					// Post DfsInserter.flush() use the normal code path.
631 					// The newly created pack is registered in the cache.
632 					return ctx.open(id, type).openStream();
633 				} finally {
634 					ctx.close();
635 				}
636 			}
637 
638 			int bufsz = 8192;
639 			final Inflater inf = ctx.inflater();
640 			return new ObjectStream.Filter(type,
641 					size, new BufferedInputStream(new InflaterInputStream(
642 							new ReadBackStream(pos), inf, bufsz), bufsz)) {
643 				@Override
644 				public void close() throws IOException {
645 					ctx.close();
646 					super.close();
647 				}
648 			};
649 		}
650 
651 		@Override
652 		public int getType() {
653 			return type;
654 		}
655 
656 		@Override
657 		public long getSize() {
658 			return size;
659 		}
660 
661 		@Override
662 		public boolean isLarge() {
663 			return true;
664 		}
665 
666 		@Override
667 		public byte[] getCachedBytes() throws LargeObjectException {
668 			throw new LargeObjectException.ExceedsLimit(
669 					db.getReaderOptions().getStreamFileThreshold(), size);
670 		}
671 	}
672 
673 	private final class ReadBackStream extends InputStream {
674 		private long pos;
675 
676 		ReadBackStream(long offset) {
677 			pos = offset;
678 		}
679 
680 		@Override
681 		public int read() throws IOException {
682 			byte[] b = new byte[1];
683 			int n = read(b);
684 			return n == 1 ? b[0] & 0xff : -1;
685 		}
686 
687 		@Override
688 		public int read(byte[] buf, int ptr, int len) throws IOException {
689 			int n = packOut.read(pos, buf, ptr, len);
690 			if (n > 0) {
691 				pos += n;
692 			}
693 			return n;
694 		}
695 	}
696 }