1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.storage.dfs;
45
46 import static org.eclipse.jgit.internal.storage.pack.PackExt.INDEX;
47 import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
48 import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
49 import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;
50
51 import java.io.BufferedInputStream;
52 import java.io.EOFException;
53 import java.io.IOException;
54 import java.io.InputStream;
55 import java.io.OutputStream;
56 import java.nio.ByteBuffer;
57 import java.security.MessageDigest;
58 import java.text.MessageFormat;
59 import java.util.Collection;
60 import java.util.Collections;
61 import java.util.HashSet;
62 import java.util.List;
63 import java.util.Set;
64 import java.util.zip.CRC32;
65 import java.util.zip.DataFormatException;
66 import java.util.zip.Deflater;
67 import java.util.zip.DeflaterOutputStream;
68 import java.util.zip.Inflater;
69 import java.util.zip.InflaterInputStream;
70
71 import org.eclipse.jgit.errors.CorruptObjectException;
72 import org.eclipse.jgit.errors.LargeObjectException;
73 import org.eclipse.jgit.internal.JGitText;
74 import org.eclipse.jgit.internal.storage.file.PackIndex;
75 import org.eclipse.jgit.internal.storage.file.PackIndexWriter;
76 import org.eclipse.jgit.internal.storage.pack.PackExt;
77 import org.eclipse.jgit.lib.AbbreviatedObjectId;
78 import org.eclipse.jgit.lib.AnyObjectId;
79 import org.eclipse.jgit.lib.Constants;
80 import org.eclipse.jgit.lib.ObjectId;
81 import org.eclipse.jgit.lib.ObjectIdOwnerMap;
82 import org.eclipse.jgit.lib.ObjectInserter;
83 import org.eclipse.jgit.lib.ObjectLoader;
84 import org.eclipse.jgit.lib.ObjectReader;
85 import org.eclipse.jgit.lib.ObjectStream;
86 import org.eclipse.jgit.transport.PackedObjectInfo;
87 import org.eclipse.jgit.util.BlockList;
88 import org.eclipse.jgit.util.IO;
89 import org.eclipse.jgit.util.NB;
90 import org.eclipse.jgit.util.TemporaryBuffer;
91 import org.eclipse.jgit.util.io.CountingOutputStream;
92
93
94 public class DfsInserter extends ObjectInserter {
95
96 private static final int INDEX_VERSION = 2;
97
98 private final DfsObjDatabase db;
99 private int compression = Deflater.BEST_COMPRESSION;
100
101 private List<PackedObjectInfo> objectList;
102 private ObjectIdOwnerMap<PackedObjectInfo> objectMap;
103
104 private DfsBlockCache cache;
105 private DfsPackKey packKey;
106 private DfsPackDescription packDsc;
107 private PackStream packOut;
108 private boolean rollback;
109
110
111
112
113
114
115
116 protected DfsInserter(DfsObjDatabase db) {
117 this.db = db;
118 }
119
120 void setCompressionLevel(int compression) {
121 this.compression = compression;
122 }
123
124 @Override
125 public DfsPackParser newPackParser(InputStream in) throws IOException {
126 return new DfsPackParser(db, this, in);
127 }
128
129 @Override
130 public ObjectReader newReader() {
131 return new Reader();
132 }
133
134 @Override
135 public ObjectId insert(int type, byte[] data, int off, int len)
136 throws IOException {
137 ObjectId id = idFor(type, data, off, len);
138 if (objectMap != null && objectMap.contains(id))
139 return id;
140 if (db.has(id))
141 return id;
142
143 long offset = beginObject(type, len);
144 packOut.compress.write(data, off, len);
145 packOut.compress.finish();
146 return endObject(id, offset);
147 }
148
149 @Override
150 public ObjectId insert(int type, long len, InputStream in)
151 throws IOException {
152 byte[] buf = insertBuffer(len);
153 if (len <= buf.length) {
154 IO.readFully(in, buf, 0, (int) len);
155 return insert(type, buf, 0, (int) len);
156 }
157
158 long offset = beginObject(type, len);
159 MessageDigest md = digest();
160 md.update(Constants.encodedTypeString(type));
161 md.update((byte) ' ');
162 md.update(Constants.encodeASCII(len));
163 md.update((byte) 0);
164
165 while (0 < len) {
166 int n = in.read(buf, 0, (int) Math.min(buf.length, len));
167 if (n <= 0)
168 throw new EOFException();
169 md.update(buf, 0, n);
170 packOut.compress.write(buf, 0, n);
171 len -= n;
172 }
173 packOut.compress.finish();
174 return endObject(ObjectId.fromRaw(md.digest()), offset);
175 }
176
177 private byte[] insertBuffer(long len) {
178 byte[] buf = buffer();
179 if (len <= buf.length)
180 return buf;
181 if (len < db.getReaderOptions().getStreamFileThreshold()) {
182 try {
183 return new byte[(int) len];
184 } catch (OutOfMemoryError noMem) {
185 return buf;
186 }
187 }
188 return buf;
189 }
190
191 @Override
192 public void flush() throws IOException {
193 if (packDsc == null)
194 return;
195
196 if (packOut == null)
197 throw new IOException();
198
199 byte[] packHash = packOut.writePackFooter();
200 packDsc.addFileExt(PACK);
201 packDsc.setFileSize(PACK, packOut.getCount());
202 packOut.close();
203 packOut = null;
204
205 sortObjectsById();
206
207 PackIndex index = writePackIndex(packDsc, packHash, objectList);
208 db.commitPack(Collections.singletonList(packDsc), null);
209 rollback = false;
210
211 DfsPackFile p = cache.getOrCreate(packDsc, packKey);
212 if (index != null)
213 p.setPackIndex(index);
214 db.addPack(p);
215 clear();
216 }
217
218 @Override
219 public void close() {
220 if (packOut != null) {
221 try {
222 packOut.close();
223 } catch (IOException err) {
224
225 } finally {
226 packOut = null;
227 }
228 }
229 if (rollback && packDsc != null) {
230 try {
231 db.rollbackPack(Collections.singletonList(packDsc));
232 } finally {
233 packDsc = null;
234 rollback = false;
235 }
236 }
237 clear();
238 }
239
240 private void clear() {
241 objectList = null;
242 objectMap = null;
243 packKey = null;
244 packDsc = null;
245 }
246
247 private long beginObject(int type, long len) throws IOException {
248 if (packOut == null)
249 beginPack();
250 long offset = packOut.getCount();
251 packOut.beginObject(type, len);
252 return offset;
253 }
254
255 private ObjectId endObject(ObjectId id, long offset) {
256 PackedObjectInfo obj = new PackedObjectInfo(id);
257 obj.setOffset(offset);
258 obj.setCRC((int) packOut.crc32.getValue());
259 objectList.add(obj);
260 objectMap.addIfAbsent(obj);
261 return id;
262 }
263
264 private void beginPack() throws IOException {
265 objectList = new BlockList<PackedObjectInfo>();
266 objectMap = new ObjectIdOwnerMap<PackedObjectInfo>();
267 cache = DfsBlockCache.getInstance();
268
269 rollback = true;
270 packDsc = db.newPack(DfsObjDatabase.PackSource.INSERT);
271 packOut = new PackStream(db.writeFile(packDsc, PACK));
272 packKey = new DfsPackKey();
273
274
275 byte[] buf = packOut.hdrBuf;
276 System.arraycopy(Constants.PACK_SIGNATURE, 0, buf, 0, 4);
277 NB.encodeInt32(buf, 4, 2);
278 NB.encodeInt32(buf, 8, 1);
279 packOut.write(buf, 0, 12);
280 }
281
282 private void sortObjectsById() {
283 Collections.sort(objectList);
284 }
285
286 PackIndex writePackIndex(DfsPackDescription pack, byte[] packHash,
287 List<PackedObjectInfo> list) throws IOException {
288 pack.setIndexVersion(INDEX_VERSION);
289 pack.setObjectCount(list.size());
290
291
292
293
294 TemporaryBuffer.Heap buf = null;
295 PackIndex packIndex = null;
296 if (list.size() <= 58000) {
297 buf = new TemporaryBuffer.Heap(2 << 20);
298 index(buf, packHash, list);
299 packIndex = PackIndex.read(buf.openInputStream());
300 }
301
302 DfsOutputStream os = db.writeFile(pack, INDEX);
303 try {
304 CountingOutputStream cnt = new CountingOutputStream(os);
305 if (buf != null)
306 buf.writeTo(cnt, null);
307 else
308 index(cnt, packHash, list);
309 pack.addFileExt(INDEX);
310 pack.setFileSize(INDEX, cnt.getCount());
311 } finally {
312 os.close();
313 }
314 return packIndex;
315 }
316
317 private static void index(OutputStream out, byte[] packHash,
318 List<PackedObjectInfo> list) throws IOException {
319 PackIndexWriter.createVersion(out, INDEX_VERSION).write(list, packHash);
320 }
321
322 private class PackStream extends OutputStream {
323 private final DfsOutputStream out;
324 private final MessageDigest md;
325 private final byte[] hdrBuf;
326 private final Deflater deflater;
327 private final int blockSize;
328
329 private long currPos;
330 private int currPtr;
331 private byte[] currBuf;
332
333 final CRC32 crc32;
334 final DeflaterOutputStream compress;
335
336 PackStream(DfsOutputStream out) {
337 this.out = out;
338
339 hdrBuf = new byte[32];
340 md = Constants.newMessageDigest();
341 crc32 = new CRC32();
342 deflater = new Deflater(compression);
343 compress = new DeflaterOutputStream(this, deflater, 8192);
344
345 int size = out.blockSize();
346 if (size <= 0)
347 size = cache.getBlockSize();
348 else if (size < cache.getBlockSize())
349 size = (cache.getBlockSize() / size) * size;
350 blockSize = size;
351 currBuf = new byte[blockSize];
352 }
353
354 long getCount() {
355 return currPos + currPtr;
356 }
357
358 void beginObject(int objectType, long length) throws IOException {
359 crc32.reset();
360 deflater.reset();
361 write(hdrBuf, 0, encodeTypeSize(objectType, length));
362 }
363
364 private int encodeTypeSize(int type, long rawLength) {
365 long nextLength = rawLength >>> 4;
366 hdrBuf[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F));
367 rawLength = nextLength;
368 int n = 1;
369 while (rawLength > 0) {
370 nextLength >>>= 7;
371 hdrBuf[n++] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (rawLength & 0x7F));
372 rawLength = nextLength;
373 }
374 return n;
375 }
376
377 @Override
378 public void write(final int b) throws IOException {
379 hdrBuf[0] = (byte) b;
380 write(hdrBuf, 0, 1);
381 }
382
383 @Override
384 public void write(byte[] data, int off, int len) throws IOException {
385 crc32.update(data, off, len);
386 md.update(data, off, len);
387 writeNoHash(data, off, len);
388 }
389
390 private void writeNoHash(byte[] data, int off, int len)
391 throws IOException {
392 while (0 < len) {
393 int n = Math.min(len, currBuf.length - currPtr);
394 if (n == 0) {
395 flushBlock();
396 currBuf = new byte[blockSize];
397 continue;
398 }
399
400 System.arraycopy(data, off, currBuf, currPtr, n);
401 off += n;
402 len -= n;
403 currPtr += n;
404 }
405 }
406
407 private void flushBlock() throws IOException {
408 out.write(currBuf, 0, currPtr);
409
410 byte[] buf;
411 if (currPtr == currBuf.length)
412 buf = currBuf;
413 else
414 buf = copyOf(currBuf, 0, currPtr);
415 cache.put(new DfsBlock(packKey, currPos, buf));
416
417 currPos += currPtr;
418 currPtr = 0;
419 currBuf = null;
420 }
421
422 private byte[] copyOf(byte[] src, int ptr, int cnt) {
423 byte[] dst = new byte[cnt];
424 System.arraycopy(src, ptr, dst, 0, cnt);
425 return dst;
426 }
427
428 byte[] writePackFooter() throws IOException {
429 byte[] packHash = md.digest();
430 writeNoHash(packHash, 0, packHash.length);
431 if (currPtr != 0)
432 flushBlock();
433 return packHash;
434 }
435
436 int read(long pos, byte[] dst, int ptr, int cnt) throws IOException {
437 int r = 0;
438 while (pos < currPos && r < cnt) {
439 DfsBlock b = getOrLoadBlock(pos);
440 int n = b.copy(pos, dst, ptr + r, cnt - r);
441 pos += n;
442 r += n;
443 }
444 if (currPos <= pos && r < cnt) {
445 int s = (int) (pos - currPos);
446 int n = Math.min(currPtr - s, cnt - r);
447 System.arraycopy(currBuf, s, dst, ptr + r, n);
448 r += n;
449 }
450 return r;
451 }
452
453 byte[] inflate(DfsReader ctx, long pos, int len) throws IOException,
454 DataFormatException {
455 byte[] dstbuf;
456 try {
457 dstbuf = new byte[len];
458 } catch (OutOfMemoryError noMemory) {
459 return null;
460 }
461
462 Inflater inf = ctx.inflater();
463 pos += setInput(pos, inf);
464 for (int dstoff = 0;;) {
465 int n = inf.inflate(dstbuf, dstoff, dstbuf.length - dstoff);
466 dstoff += n;
467 if (inf.finished())
468 return dstbuf;
469 if (inf.needsInput())
470 pos += setInput(pos, inf);
471 else if (n == 0)
472 throw new DataFormatException();
473 }
474 }
475
476 private int setInput(long pos, Inflater inf) throws IOException {
477 if (pos < currPos)
478 return getOrLoadBlock(pos).setInput(pos, inf);
479 if (pos < currPos + currPtr) {
480 int s = (int) (pos - currPos);
481 int n = currPtr - s;
482 inf.setInput(currBuf, s, n);
483 return n;
484 }
485 throw new EOFException(DfsText.get().unexpectedEofInPack);
486 }
487
488 private DfsBlock getOrLoadBlock(long pos) throws IOException {
489 long s = toBlockStart(pos);
490 DfsBlock b = cache.get(packKey, s);
491 if (b != null)
492 return b;
493
494 byte[] d = new byte[blockSize];
495 for (int p = 0; p < blockSize;) {
496 int n = out.read(s + p, ByteBuffer.wrap(d, p, blockSize - p));
497 if (n <= 0)
498 throw new EOFException(DfsText.get().unexpectedEofInPack);
499 p += n;
500 }
501 b = new DfsBlock(packKey, s, d);
502 cache.put(b);
503 return b;
504 }
505
506 private long toBlockStart(long pos) {
507 return (pos / blockSize) * blockSize;
508 }
509
510 @Override
511 public void close() throws IOException {
512 deflater.end();
513 out.close();
514 }
515 }
516
517 private class Reader extends ObjectReader {
518 private final DfsReader ctx = new DfsReader(db);
519
520 @Override
521 public ObjectReader newReader() {
522 return db.newReader();
523 }
524
525 @Override
526 public Collection<ObjectId> resolve(AbbreviatedObjectId id)
527 throws IOException {
528 Collection<ObjectId> stored = ctx.resolve(id);
529 if (objectList == null)
530 return stored;
531
532 Set<ObjectId> r = new HashSet<ObjectId>(stored.size() + 2);
533 r.addAll(stored);
534 for (PackedObjectInfo obj : objectList) {
535 if (id.prefixCompare(obj) == 0)
536 r.add(obj.copy());
537 }
538 return r;
539 }
540
541 @Override
542 public ObjectLoader open(AnyObjectId objectId, int typeHint)
543 throws IOException {
544 if (objectMap == null)
545 return ctx.open(objectId, typeHint);
546
547 PackedObjectInfo obj = objectMap.get(objectId);
548 if (obj == null)
549 return ctx.open(objectId, typeHint);
550
551 byte[] buf = buffer();
552 int cnt = packOut.read(obj.getOffset(), buf, 0, 20);
553 if (cnt <= 0)
554 throw new EOFException(DfsText.get().unexpectedEofInPack);
555
556 int c = buf[0] & 0xff;
557 int type = (c >> 4) & 7;
558 if (type == OBJ_OFS_DELTA || type == OBJ_REF_DELTA)
559 throw new IOException(MessageFormat.format(
560 DfsText.get().cannotReadBackDelta, Integer.toString(type)));
561
562 long sz = c & 0x0f;
563 int ptr = 1;
564 int shift = 4;
565 while ((c & 0x80) != 0) {
566 if (ptr >= cnt)
567 throw new EOFException(DfsText.get().unexpectedEofInPack);
568 c = buf[ptr++] & 0xff;
569 sz += ((long) (c & 0x7f)) << shift;
570 shift += 7;
571 }
572
573 long zpos = obj.getOffset() + ptr;
574 if (sz < ctx.getStreamFileThreshold()) {
575 byte[] data = inflate(obj, zpos, (int) sz);
576 if (data != null)
577 return new ObjectLoader.SmallObject(type, data);
578 }
579 return new StreamLoader(obj.copy(), type, sz, packKey, zpos);
580 }
581
582 private byte[] inflate(PackedObjectInfo obj, long zpos, int sz)
583 throws IOException, CorruptObjectException {
584 try {
585 return packOut.inflate(ctx, zpos, sz);
586 } catch (DataFormatException dfe) {
587 CorruptObjectException coe = new CorruptObjectException(
588 MessageFormat.format(
589 JGitText.get().objectAtHasBadZlibStream,
590 Long.valueOf(obj.getOffset()),
591 packDsc.getFileName(PackExt.PACK)));
592 coe.initCause(dfe);
593 throw coe;
594 }
595 }
596
597 @Override
598 public Set<ObjectId> getShallowCommits() throws IOException {
599 return ctx.getShallowCommits();
600 }
601
602 @Override
603 public void close() {
604 ctx.close();
605 }
606 }
607
608 private class StreamLoader extends ObjectLoader {
609 private final ObjectId id;
610 private final int type;
611 private final long size;
612
613 private final DfsPackKey srcPack;
614 private final long pos;
615
616 StreamLoader(ObjectId id, int type, long sz,
617 DfsPackKey key, long pos) {
618 this.id = id;
619 this.type = type;
620 this.size = sz;
621 this.srcPack = key;
622 this.pos = pos;
623 }
624
625 @Override
626 public ObjectStream openStream() throws IOException {
627 final DfsReader ctx = new DfsReader(db);
628 if (srcPack != packKey) {
629 try {
630
631
632 return ctx.open(id, type).openStream();
633 } finally {
634 ctx.close();
635 }
636 }
637
638 int bufsz = 8192;
639 final Inflater inf = ctx.inflater();
640 return new ObjectStream.Filter(type,
641 size, new BufferedInputStream(new InflaterInputStream(
642 new ReadBackStream(pos), inf, bufsz), bufsz)) {
643 @Override
644 public void close() throws IOException {
645 ctx.close();
646 super.close();
647 }
648 };
649 }
650
651 @Override
652 public int getType() {
653 return type;
654 }
655
656 @Override
657 public long getSize() {
658 return size;
659 }
660
661 @Override
662 public boolean isLarge() {
663 return true;
664 }
665
666 @Override
667 public byte[] getCachedBytes() throws LargeObjectException {
668 throw new LargeObjectException.ExceedsLimit(
669 db.getReaderOptions().getStreamFileThreshold(), size);
670 }
671 }
672
673 private final class ReadBackStream extends InputStream {
674 private long pos;
675
676 ReadBackStream(long offset) {
677 pos = offset;
678 }
679
680 @Override
681 public int read() throws IOException {
682 byte[] b = new byte[1];
683 int n = read(b);
684 return n == 1 ? b[0] & 0xff : -1;
685 }
686
687 @Override
688 public int read(byte[] buf, int ptr, int len) throws IOException {
689 int n = packOut.read(pos, buf, ptr, len);
690 if (n > 0) {
691 pos += n;
692 }
693 return n;
694 }
695 }
696 }