1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.storage.dfs;
45
46 import static org.eclipse.jgit.internal.storage.pack.PackExt.INDEX;
47 import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
48 import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
49 import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;
50
51 import java.io.BufferedInputStream;
52 import java.io.EOFException;
53 import java.io.IOException;
54 import java.io.InputStream;
55 import java.io.OutputStream;
56 import java.nio.ByteBuffer;
57 import java.security.MessageDigest;
58 import java.text.MessageFormat;
59 import java.util.Collection;
60 import java.util.Collections;
61 import java.util.HashSet;
62 import java.util.List;
63 import java.util.Set;
64 import java.util.zip.CRC32;
65 import java.util.zip.DataFormatException;
66 import java.util.zip.Deflater;
67 import java.util.zip.DeflaterOutputStream;
68 import java.util.zip.Inflater;
69 import java.util.zip.InflaterInputStream;
70
71 import org.eclipse.jgit.annotations.Nullable;
72 import org.eclipse.jgit.errors.CorruptObjectException;
73 import org.eclipse.jgit.errors.IncorrectObjectTypeException;
74 import org.eclipse.jgit.errors.LargeObjectException;
75 import org.eclipse.jgit.internal.JGitText;
76 import org.eclipse.jgit.internal.storage.file.PackIndex;
77 import org.eclipse.jgit.internal.storage.file.PackIndexWriter;
78 import org.eclipse.jgit.internal.storage.pack.PackExt;
79 import org.eclipse.jgit.lib.AbbreviatedObjectId;
80 import org.eclipse.jgit.lib.AnyObjectId;
81 import org.eclipse.jgit.lib.Constants;
82 import org.eclipse.jgit.lib.ObjectId;
83 import org.eclipse.jgit.lib.ObjectIdOwnerMap;
84 import org.eclipse.jgit.lib.ObjectInserter;
85 import org.eclipse.jgit.lib.ObjectLoader;
86 import org.eclipse.jgit.lib.ObjectReader;
87 import org.eclipse.jgit.lib.ObjectStream;
88 import org.eclipse.jgit.transport.PackedObjectInfo;
89 import org.eclipse.jgit.util.BlockList;
90 import org.eclipse.jgit.util.IO;
91 import org.eclipse.jgit.util.NB;
92 import org.eclipse.jgit.util.TemporaryBuffer;
93 import org.eclipse.jgit.util.io.CountingOutputStream;
94 import org.eclipse.jgit.util.sha1.SHA1;
95
96
97
98
99 public class DfsInserter extends ObjectInserter {
100
101 private static final int INDEX_VERSION = 2;
102
103 final DfsObjDatabase db;
104 int compression = Deflater.BEST_COMPRESSION;
105
106 List<PackedObjectInfo> objectList;
107 ObjectIdOwnerMap<PackedObjectInfo> objectMap;
108
109 DfsBlockCache cache;
110 DfsStreamKey packKey;
111 DfsPackDescription packDsc;
112 PackStream packOut;
113 private boolean rollback;
114 private boolean checkExisting = true;
115
116
117
118
119
120
121
122 protected DfsInserter(DfsObjDatabase db) {
123 this.db = db;
124 }
125
126
127
128
129
130
131
132
133
134 public void checkExisting(boolean check) {
135 checkExisting = check;
136 }
137
138 void setCompressionLevel(int compression) {
139 this.compression = compression;
140 }
141
142
143 @Override
144 public DfsPackParser newPackParser(InputStream in) throws IOException {
145 return new DfsPackParser(db, this, in);
146 }
147
148
149 @Override
150 public ObjectReader newReader() {
151 return new Reader();
152 }
153
154
155 @Override
156 public ObjectId insert(int type, byte[] data, int off, int len)
157 throws IOException {
158 ObjectId id = idFor(type, data, off, len);
159 if (objectMap != null && objectMap.contains(id))
160 return id;
161
162 if (checkExisting && db.has(id, true))
163 return id;
164
165 long offset = beginObject(type, len);
166 packOut.compress.write(data, off, len);
167 packOut.compress.finish();
168 return endObject(id, offset);
169 }
170
171
172 @Override
173 public ObjectId insert(int type, long len, InputStream in)
174 throws IOException {
175 byte[] buf = insertBuffer(len);
176 if (len <= buf.length) {
177 IO.readFully(in, buf, 0, (int) len);
178 return insert(type, buf, 0, (int) len);
179 }
180
181 long offset = beginObject(type, len);
182 SHA1 md = digest();
183 md.update(Constants.encodedTypeString(type));
184 md.update((byte) ' ');
185 md.update(Constants.encodeASCII(len));
186 md.update((byte) 0);
187
188 while (0 < len) {
189 int n = in.read(buf, 0, (int) Math.min(buf.length, len));
190 if (n <= 0)
191 throw new EOFException();
192 md.update(buf, 0, n);
193 packOut.compress.write(buf, 0, n);
194 len -= n;
195 }
196 packOut.compress.finish();
197 return endObject(md.toObjectId(), offset);
198 }
199
200 private byte[] insertBuffer(long len) {
201 byte[] buf = buffer();
202 if (len <= buf.length)
203 return buf;
204 if (len < db.getReaderOptions().getStreamFileThreshold()) {
205 try {
206 return new byte[(int) len];
207 } catch (OutOfMemoryError noMem) {
208 return buf;
209 }
210 }
211 return buf;
212 }
213
214
215 @Override
216 public void flush() throws IOException {
217 if (packDsc == null)
218 return;
219
220 if (packOut == null)
221 throw new IOException();
222
223 byte[] packHash = packOut.writePackFooter();
224 packDsc.addFileExt(PACK);
225 packDsc.setFileSize(PACK, packOut.getCount());
226 packOut.close();
227 packOut = null;
228
229 sortObjectsById();
230
231 PackIndex index = writePackIndex(packDsc, packHash, objectList);
232 db.commitPack(Collections.singletonList(packDsc), null);
233 rollback = false;
234
235 DfsPackFile p = new DfsPackFile(cache, packDsc);
236 if (index != null)
237 p.setPackIndex(index);
238 db.addPack(p);
239 clear();
240 }
241
242
243 @Override
244 public void close() {
245 if (packOut != null) {
246 try {
247 packOut.close();
248 } catch (IOException err) {
249
250 } finally {
251 packOut = null;
252 }
253 }
254 if (rollback && packDsc != null) {
255 try {
256 db.rollbackPack(Collections.singletonList(packDsc));
257 } finally {
258 packDsc = null;
259 rollback = false;
260 }
261 }
262 clear();
263 }
264
265 private void clear() {
266 objectList = null;
267 objectMap = null;
268 packKey = null;
269 packDsc = null;
270 }
271
272 private long beginObject(int type, long len) throws IOException {
273 if (packOut == null)
274 beginPack();
275 long offset = packOut.getCount();
276 packOut.beginObject(type, len);
277 return offset;
278 }
279
280 private ObjectId endObject(ObjectId id, long offset) {
281 PackedObjectInfo obj = new PackedObjectInfo(id);
282 obj.setOffset(offset);
283 obj.setCRC((int) packOut.crc32.getValue());
284 objectList.add(obj);
285 objectMap.addIfAbsent(obj);
286 return id;
287 }
288
289 private void beginPack() throws IOException {
290 objectList = new BlockList<>();
291 objectMap = new ObjectIdOwnerMap<>();
292 cache = DfsBlockCache.getInstance();
293
294 rollback = true;
295 packDsc = db.newPack(DfsObjDatabase.PackSource.INSERT);
296 DfsOutputStream dfsOut = db.writeFile(packDsc, PACK);
297 packDsc.setBlockSize(PACK, dfsOut.blockSize());
298 packOut = new PackStream(dfsOut);
299 packKey = packDsc.getStreamKey(PACK);
300
301
302 byte[] buf = packOut.hdrBuf;
303 System.arraycopy(Constants.PACK_SIGNATURE, 0, buf, 0, 4);
304 NB.encodeInt32(buf, 4, 2);
305 NB.encodeInt32(buf, 8, 1);
306 packOut.write(buf, 0, 12);
307 }
308
309 private void sortObjectsById() {
310 Collections.sort(objectList);
311 }
312
313 @Nullable
314 private TemporaryBuffer.Heap maybeGetTemporaryBuffer(
315 List<PackedObjectInfo> list) {
316 if (list.size() <= 58000) {
317 return new TemporaryBuffer.Heap(2 << 20);
318 }
319 return null;
320 }
321
322 PackIndex writePackIndex(DfsPackDescription pack, byte[] packHash,
323 List<PackedObjectInfo> list) throws IOException {
324 pack.setIndexVersion(INDEX_VERSION);
325 pack.setObjectCount(list.size());
326
327
328
329
330 PackIndex packIndex = null;
331 try (TemporaryBuffer.Heap buf = maybeGetTemporaryBuffer(list);
332 DfsOutputStream os = db.writeFile(pack, INDEX);
333 CountingOutputStream cnt = new CountingOutputStream(os)) {
334 if (buf != null) {
335 index(buf, packHash, list);
336 packIndex = PackIndex.read(buf.openInputStream());
337 buf.writeTo(cnt, null);
338 } else {
339 index(cnt, packHash, list);
340 }
341 pack.addFileExt(INDEX);
342 pack.setBlockSize(INDEX, os.blockSize());
343 pack.setFileSize(INDEX, cnt.getCount());
344 }
345 return packIndex;
346 }
347
348 private static void index(OutputStream out, byte[] packHash,
349 List<PackedObjectInfo> list) throws IOException {
350 PackIndexWriter.createVersion(out, INDEX_VERSION).write(list, packHash);
351 }
352
353 private class PackStream extends OutputStream {
354 private final DfsOutputStream out;
355 private final MessageDigest md;
356 final byte[] hdrBuf;
357 private final Deflater deflater;
358 private final int blockSize;
359
360 private long currPos;
361 private int currPtr;
362 private byte[] currBuf;
363
364 final CRC32 crc32;
365 final DeflaterOutputStream compress;
366
367 PackStream(DfsOutputStream out) {
368 this.out = out;
369
370 hdrBuf = new byte[32];
371 md = Constants.newMessageDigest();
372 crc32 = new CRC32();
373 deflater = new Deflater(compression);
374 compress = new DeflaterOutputStream(this, deflater, 8192);
375
376 int size = out.blockSize();
377 if (size <= 0)
378 size = cache.getBlockSize();
379 else if (size < cache.getBlockSize())
380 size = (cache.getBlockSize() / size) * size;
381 blockSize = size;
382 currBuf = new byte[blockSize];
383 }
384
385 long getCount() {
386 return currPos + currPtr;
387 }
388
389 void beginObject(int objectType, long length) throws IOException {
390 crc32.reset();
391 deflater.reset();
392 write(hdrBuf, 0, encodeTypeSize(objectType, length));
393 }
394
395 private int encodeTypeSize(int type, long rawLength) {
396 long nextLength = rawLength >>> 4;
397 hdrBuf[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F));
398 rawLength = nextLength;
399 int n = 1;
400 while (rawLength > 0) {
401 nextLength >>>= 7;
402 hdrBuf[n++] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (rawLength & 0x7F));
403 rawLength = nextLength;
404 }
405 return n;
406 }
407
408 @Override
409 public void write(int b) throws IOException {
410 hdrBuf[0] = (byte) b;
411 write(hdrBuf, 0, 1);
412 }
413
414 @Override
415 public void write(byte[] data, int off, int len) throws IOException {
416 crc32.update(data, off, len);
417 md.update(data, off, len);
418 writeNoHash(data, off, len);
419 }
420
421 private void writeNoHash(byte[] data, int off, int len)
422 throws IOException {
423 while (0 < len) {
424 int n = Math.min(len, currBuf.length - currPtr);
425 if (n == 0) {
426 flushBlock();
427 currBuf = new byte[blockSize];
428 continue;
429 }
430
431 System.arraycopy(data, off, currBuf, currPtr, n);
432 off += n;
433 len -= n;
434 currPtr += n;
435 }
436 }
437
438 private void flushBlock() throws IOException {
439 out.write(currBuf, 0, currPtr);
440
441 byte[] buf;
442 if (currPtr == currBuf.length)
443 buf = currBuf;
444 else
445 buf = copyOf(currBuf, 0, currPtr);
446 cache.put(new DfsBlock(packKey, currPos, buf));
447
448 currPos += currPtr;
449 currPtr = 0;
450 currBuf = null;
451 }
452
453 private byte[] copyOf(byte[] src, int ptr, int cnt) {
454 byte[] dst = new byte[cnt];
455 System.arraycopy(src, ptr, dst, 0, cnt);
456 return dst;
457 }
458
459 byte[] writePackFooter() throws IOException {
460 byte[] packHash = md.digest();
461 writeNoHash(packHash, 0, packHash.length);
462 if (currPtr != 0)
463 flushBlock();
464 return packHash;
465 }
466
467 int read(long pos, byte[] dst, int ptr, int cnt) throws IOException {
468 int r = 0;
469 while (pos < currPos && r < cnt) {
470 DfsBlock b = getOrLoadBlock(pos);
471 int n = b.copy(pos, dst, ptr + r, cnt - r);
472 pos += n;
473 r += n;
474 }
475 if (currPos <= pos && r < cnt) {
476 int s = (int) (pos - currPos);
477 int n = Math.min(currPtr - s, cnt - r);
478 System.arraycopy(currBuf, s, dst, ptr + r, n);
479 r += n;
480 }
481 return r;
482 }
483
484 byte[] inflate(DfsReader ctx, long pos, int len) throws IOException,
485 DataFormatException {
486 byte[] dstbuf;
487 try {
488 dstbuf = new byte[len];
489 } catch (OutOfMemoryError noMemory) {
490 return null;
491 }
492
493 Inflater inf = ctx.inflater();
494 pos += setInput(pos, inf);
495 for (int dstoff = 0;;) {
496 int n = inf.inflate(dstbuf, dstoff, dstbuf.length - dstoff);
497 dstoff += n;
498 if (inf.finished())
499 return dstbuf;
500 if (inf.needsInput())
501 pos += setInput(pos, inf);
502 else if (n == 0)
503 throw new DataFormatException();
504 }
505 }
506
507 private int setInput(long pos, Inflater inf)
508 throws IOException, DataFormatException {
509 if (pos < currPos)
510 return getOrLoadBlock(pos).setInput(pos, inf);
511 if (pos < currPos + currPtr) {
512 int s = (int) (pos - currPos);
513 int n = currPtr - s;
514 inf.setInput(currBuf, s, n);
515 return n;
516 }
517 throw new EOFException(JGitText.get().unexpectedEofInPack);
518 }
519
520 private DfsBlock getOrLoadBlock(long pos) throws IOException {
521 long s = toBlockStart(pos);
522 DfsBlock b = cache.get(packKey, s);
523 if (b != null)
524 return b;
525
526 byte[] d = new byte[blockSize];
527 for (int p = 0; p < blockSize;) {
528 int n = out.read(s + p, ByteBuffer.wrap(d, p, blockSize - p));
529 if (n <= 0)
530 throw new EOFException(JGitText.get().unexpectedEofInPack);
531 p += n;
532 }
533 b = new DfsBlock(packKey, s, d);
534 cache.put(b);
535 return b;
536 }
537
538 private long toBlockStart(long pos) {
539 return (pos / blockSize) * blockSize;
540 }
541
542 @Override
543 public void close() throws IOException {
544 deflater.end();
545 out.close();
546 }
547 }
548
549 private class Reader extends ObjectReader {
550 private final DfsReader ctx = db.newReader();
551
552 @Override
553 public ObjectReader newReader() {
554 return db.newReader();
555 }
556
557 @Override
558 public Collection<ObjectId> resolve(AbbreviatedObjectId id)
559 throws IOException {
560 Collection<ObjectId> stored = ctx.resolve(id);
561 if (objectList == null)
562 return stored;
563
564 Set<ObjectId> r = new HashSet<>(stored.size() + 2);
565 r.addAll(stored);
566 for (PackedObjectInfo obj : objectList) {
567 if (id.prefixCompare(obj) == 0)
568 r.add(obj.copy());
569 }
570 return r;
571 }
572
573 @Override
574 public ObjectLoader open(AnyObjectId objectId, int typeHint)
575 throws IOException {
576 if (objectMap == null)
577 return ctx.open(objectId, typeHint);
578
579 PackedObjectInfo obj = objectMap.get(objectId);
580 if (obj == null)
581 return ctx.open(objectId, typeHint);
582
583 byte[] buf = buffer();
584 int cnt = packOut.read(obj.getOffset(), buf, 0, 20);
585 if (cnt <= 0)
586 throw new EOFException(JGitText.get().unexpectedEofInPack);
587
588 int c = buf[0] & 0xff;
589 int type = (c >> 4) & 7;
590 if (type == OBJ_OFS_DELTA || type == OBJ_REF_DELTA)
591 throw new IOException(MessageFormat.format(
592 JGitText.get().cannotReadBackDelta, Integer.toString(type)));
593 if (typeHint != OBJ_ANY && type != typeHint) {
594 throw new IncorrectObjectTypeException(objectId.copy(), typeHint);
595 }
596
597 long sz = c & 0x0f;
598 int ptr = 1;
599 int shift = 4;
600 while ((c & 0x80) != 0) {
601 if (ptr >= cnt)
602 throw new EOFException(JGitText.get().unexpectedEofInPack);
603 c = buf[ptr++] & 0xff;
604 sz += ((long) (c & 0x7f)) << shift;
605 shift += 7;
606 }
607
608 long zpos = obj.getOffset() + ptr;
609 if (sz < ctx.getStreamFileThreshold()) {
610 byte[] data = inflate(obj, zpos, (int) sz);
611 if (data != null)
612 return new ObjectLoader.SmallObject(type, data);
613 }
614 return new StreamLoader(obj.copy(), type, sz, packKey, zpos);
615 }
616
617 private byte[] inflate(PackedObjectInfo obj, long zpos, int sz)
618 throws IOException, CorruptObjectException {
619 try {
620 return packOut.inflate(ctx, zpos, sz);
621 } catch (DataFormatException dfe) {
622 throw new CorruptObjectException(
623 MessageFormat.format(
624 JGitText.get().objectAtHasBadZlibStream,
625 Long.valueOf(obj.getOffset()),
626 packDsc.getFileName(PackExt.PACK)),
627 dfe);
628 }
629 }
630
631 @Override
632 public boolean has(AnyObjectId objectId) throws IOException {
633 return (objectMap != null && objectMap.contains(objectId))
634 || ctx.has(objectId);
635 }
636
637 @Override
638 public Set<ObjectId> getShallowCommits() throws IOException {
639 return ctx.getShallowCommits();
640 }
641
642 @Override
643 public ObjectInserter getCreatedFromInserter() {
644 return DfsInserter.this;
645 }
646
647 @Override
648 public void close() {
649 ctx.close();
650 }
651 }
652
653 private class StreamLoader extends ObjectLoader {
654 private final ObjectId id;
655 private final int type;
656 private final long size;
657
658 private final DfsStreamKey srcPack;
659 private final long pos;
660
661 StreamLoader(ObjectId id, int type, long sz,
662 DfsStreamKey key, long pos) {
663 this.id = id;
664 this.type = type;
665 this.size = sz;
666 this.srcPack = key;
667 this.pos = pos;
668 }
669
670 @Override
671 public ObjectStream openStream() throws IOException {
672 @SuppressWarnings("resource")
673 final DfsReader ctx = db.newReader();
674 if (srcPack != packKey) {
675 try {
676
677
678 return ctx.open(id, type).openStream();
679 } finally {
680 ctx.close();
681 }
682 }
683
684 int bufsz = 8192;
685 final Inflater inf = ctx.inflater();
686 return new ObjectStream.Filter(type,
687 size, new BufferedInputStream(new InflaterInputStream(
688 new ReadBackStream(pos), inf, bufsz), bufsz)) {
689 @Override
690 public void close() throws IOException {
691 ctx.close();
692 super.close();
693 }
694 };
695 }
696
697 @Override
698 public int getType() {
699 return type;
700 }
701
702 @Override
703 public long getSize() {
704 return size;
705 }
706
707 @Override
708 public boolean isLarge() {
709 return true;
710 }
711
712 @Override
713 public byte[] getCachedBytes() throws LargeObjectException {
714 throw new LargeObjectException.ExceedsLimit(
715 db.getReaderOptions().getStreamFileThreshold(), size);
716 }
717 }
718
719 private final class ReadBackStream extends InputStream {
720 private long pos;
721
722 ReadBackStream(long offset) {
723 pos = offset;
724 }
725
726 @Override
727 public int read() throws IOException {
728 byte[] b = new byte[1];
729 int n = read(b);
730 return n == 1 ? b[0] & 0xff : -1;
731 }
732
733 @Override
734 public int read(byte[] buf, int ptr, int len) throws IOException {
735 int n = packOut.read(pos, buf, ptr, len);
736 if (n > 0) {
737 pos += n;
738 }
739 return n;
740 }
741 }
742 }