1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.storage.dfs;
45
46 import static org.eclipse.jgit.internal.storage.pack.PackExt.INDEX;
47 import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
48 import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
49 import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;
50
51 import java.io.BufferedInputStream;
52 import java.io.EOFException;
53 import java.io.IOException;
54 import java.io.InputStream;
55 import java.io.OutputStream;
56 import java.nio.ByteBuffer;
57 import java.security.MessageDigest;
58 import java.text.MessageFormat;
59 import java.util.Collection;
60 import java.util.Collections;
61 import java.util.HashSet;
62 import java.util.List;
63 import java.util.Set;
64 import java.util.zip.CRC32;
65 import java.util.zip.DataFormatException;
66 import java.util.zip.Deflater;
67 import java.util.zip.DeflaterOutputStream;
68 import java.util.zip.Inflater;
69 import java.util.zip.InflaterInputStream;
70
71 import org.eclipse.jgit.errors.CorruptObjectException;
72 import org.eclipse.jgit.errors.IncorrectObjectTypeException;
73 import org.eclipse.jgit.errors.LargeObjectException;
74 import org.eclipse.jgit.internal.JGitText;
75 import org.eclipse.jgit.internal.storage.file.PackIndex;
76 import org.eclipse.jgit.internal.storage.file.PackIndexWriter;
77 import org.eclipse.jgit.internal.storage.pack.PackExt;
78 import org.eclipse.jgit.lib.AbbreviatedObjectId;
79 import org.eclipse.jgit.lib.AnyObjectId;
80 import org.eclipse.jgit.lib.Constants;
81 import org.eclipse.jgit.lib.ObjectId;
82 import org.eclipse.jgit.lib.ObjectIdOwnerMap;
83 import org.eclipse.jgit.lib.ObjectInserter;
84 import org.eclipse.jgit.lib.ObjectLoader;
85 import org.eclipse.jgit.lib.ObjectReader;
86 import org.eclipse.jgit.lib.ObjectStream;
87 import org.eclipse.jgit.transport.PackedObjectInfo;
88 import org.eclipse.jgit.util.BlockList;
89 import org.eclipse.jgit.util.IO;
90 import org.eclipse.jgit.util.NB;
91 import org.eclipse.jgit.util.TemporaryBuffer;
92 import org.eclipse.jgit.util.io.CountingOutputStream;
93 import org.eclipse.jgit.util.sha1.SHA1;
94
95
96
97
98 public class DfsInserter extends ObjectInserter {
99
100 private static final int INDEX_VERSION = 2;
101
102 final DfsObjDatabase db;
103 int compression = Deflater.BEST_COMPRESSION;
104
105 List<PackedObjectInfo> objectList;
106 ObjectIdOwnerMap<PackedObjectInfo> objectMap;
107
108 DfsBlockCache cache;
109 DfsStreamKey packKey;
110 DfsPackDescription packDsc;
111 PackStream packOut;
112 private boolean rollback;
113 private boolean checkExisting = true;
114
115
116
117
118
119
120
121 protected DfsInserter(DfsObjDatabase db) {
122 this.db = db;
123 }
124
125
126
127
128
129
130
131
132
133 public void checkExisting(boolean check) {
134 checkExisting = check;
135 }
136
137 void setCompressionLevel(int compression) {
138 this.compression = compression;
139 }
140
141
142 @Override
143 public DfsPackParser newPackParser(InputStream in) throws IOException {
144 return new DfsPackParser(db, this, in);
145 }
146
147
148 @Override
149 public ObjectReader newReader() {
150 return new Reader();
151 }
152
153
154 @Override
155 public ObjectId insert(int type, byte[] data, int off, int len)
156 throws IOException {
157 ObjectId id = idFor(type, data, off, len);
158 if (objectMap != null && objectMap.contains(id))
159 return id;
160
161 if (checkExisting && db.has(id, true))
162 return id;
163
164 long offset = beginObject(type, len);
165 packOut.compress.write(data, off, len);
166 packOut.compress.finish();
167 return endObject(id, offset);
168 }
169
170
171 @Override
172 public ObjectId insert(int type, long len, InputStream in)
173 throws IOException {
174 byte[] buf = insertBuffer(len);
175 if (len <= buf.length) {
176 IO.readFully(in, buf, 0, (int) len);
177 return insert(type, buf, 0, (int) len);
178 }
179
180 long offset = beginObject(type, len);
181 SHA1 md = digest();
182 md.update(Constants.encodedTypeString(type));
183 md.update((byte) ' ');
184 md.update(Constants.encodeASCII(len));
185 md.update((byte) 0);
186
187 while (0 < len) {
188 int n = in.read(buf, 0, (int) Math.min(buf.length, len));
189 if (n <= 0)
190 throw new EOFException();
191 md.update(buf, 0, n);
192 packOut.compress.write(buf, 0, n);
193 len -= n;
194 }
195 packOut.compress.finish();
196 return endObject(md.toObjectId(), offset);
197 }
198
199 private byte[] insertBuffer(long len) {
200 byte[] buf = buffer();
201 if (len <= buf.length)
202 return buf;
203 if (len < db.getReaderOptions().getStreamFileThreshold()) {
204 try {
205 return new byte[(int) len];
206 } catch (OutOfMemoryError noMem) {
207 return buf;
208 }
209 }
210 return buf;
211 }
212
213
214 @Override
215 public void flush() throws IOException {
216 if (packDsc == null)
217 return;
218
219 if (packOut == null)
220 throw new IOException();
221
222 byte[] packHash = packOut.writePackFooter();
223 packDsc.addFileExt(PACK);
224 packDsc.setFileSize(PACK, packOut.getCount());
225 packOut.close();
226 packOut = null;
227
228 sortObjectsById();
229
230 PackIndex index = writePackIndex(packDsc, packHash, objectList);
231 db.commitPack(Collections.singletonList(packDsc), null);
232 rollback = false;
233
234 DfsPackFile p = new DfsPackFile(cache, packDsc);
235 if (index != null)
236 p.setPackIndex(index);
237 db.addPack(p);
238 clear();
239 }
240
241
242 @Override
243 public void close() {
244 if (packOut != null) {
245 try {
246 packOut.close();
247 } catch (IOException err) {
248
249 } finally {
250 packOut = null;
251 }
252 }
253 if (rollback && packDsc != null) {
254 try {
255 db.rollbackPack(Collections.singletonList(packDsc));
256 } finally {
257 packDsc = null;
258 rollback = false;
259 }
260 }
261 clear();
262 }
263
264 private void clear() {
265 objectList = null;
266 objectMap = null;
267 packKey = null;
268 packDsc = null;
269 }
270
271 private long beginObject(int type, long len) throws IOException {
272 if (packOut == null)
273 beginPack();
274 long offset = packOut.getCount();
275 packOut.beginObject(type, len);
276 return offset;
277 }
278
279 private ObjectId endObject(ObjectId id, long offset) {
280 PackedObjectInfo obj = new PackedObjectInfo(id);
281 obj.setOffset(offset);
282 obj.setCRC((int) packOut.crc32.getValue());
283 objectList.add(obj);
284 objectMap.addIfAbsent(obj);
285 return id;
286 }
287
288 private void beginPack() throws IOException {
289 objectList = new BlockList<>();
290 objectMap = new ObjectIdOwnerMap<>();
291 cache = DfsBlockCache.getInstance();
292
293 rollback = true;
294 packDsc = db.newPack(DfsObjDatabase.PackSource.INSERT);
295 DfsOutputStream dfsOut = db.writeFile(packDsc, PACK);
296 packDsc.setBlockSize(PACK, dfsOut.blockSize());
297 packOut = new PackStream(dfsOut);
298 packKey = packDsc.getStreamKey(PACK);
299
300
301 byte[] buf = packOut.hdrBuf;
302 System.arraycopy(Constants.PACK_SIGNATURE, 0, buf, 0, 4);
303 NB.encodeInt32(buf, 4, 2);
304 NB.encodeInt32(buf, 8, 1);
305 packOut.write(buf, 0, 12);
306 }
307
308 private void sortObjectsById() {
309 Collections.sort(objectList);
310 }
311
312 PackIndex writePackIndex(DfsPackDescription pack, byte[] packHash,
313 List<PackedObjectInfo> list) throws IOException {
314 pack.setIndexVersion(INDEX_VERSION);
315 pack.setObjectCount(list.size());
316
317
318
319
320 TemporaryBuffer.Heap buf = null;
321 PackIndex packIndex = null;
322 if (list.size() <= 58000) {
323 buf = new TemporaryBuffer.Heap(2 << 20);
324 index(buf, packHash, list);
325 packIndex = PackIndex.read(buf.openInputStream());
326 }
327
328 try (DfsOutputStream os = db.writeFile(pack, INDEX)) {
329 CountingOutputStream cnt = new CountingOutputStream(os);
330 if (buf != null)
331 buf.writeTo(cnt, null);
332 else
333 index(cnt, packHash, list);
334 pack.addFileExt(INDEX);
335 pack.setBlockSize(INDEX, os.blockSize());
336 pack.setFileSize(INDEX, cnt.getCount());
337 } finally {
338 if (buf != null) {
339 buf.close();
340 }
341 }
342 return packIndex;
343 }
344
345 private static void index(OutputStream out, byte[] packHash,
346 List<PackedObjectInfo> list) throws IOException {
347 PackIndexWriter.createVersion(out, INDEX_VERSION).write(list, packHash);
348 }
349
350 private class PackStream extends OutputStream {
351 private final DfsOutputStream out;
352 private final MessageDigest md;
353 final byte[] hdrBuf;
354 private final Deflater deflater;
355 private final int blockSize;
356
357 private long currPos;
358 private int currPtr;
359 private byte[] currBuf;
360
361 final CRC32 crc32;
362 final DeflaterOutputStream compress;
363
364 PackStream(DfsOutputStream out) {
365 this.out = out;
366
367 hdrBuf = new byte[32];
368 md = Constants.newMessageDigest();
369 crc32 = new CRC32();
370 deflater = new Deflater(compression);
371 compress = new DeflaterOutputStream(this, deflater, 8192);
372
373 int size = out.blockSize();
374 if (size <= 0)
375 size = cache.getBlockSize();
376 else if (size < cache.getBlockSize())
377 size = (cache.getBlockSize() / size) * size;
378 blockSize = size;
379 currBuf = new byte[blockSize];
380 }
381
382 long getCount() {
383 return currPos + currPtr;
384 }
385
386 void beginObject(int objectType, long length) throws IOException {
387 crc32.reset();
388 deflater.reset();
389 write(hdrBuf, 0, encodeTypeSize(objectType, length));
390 }
391
392 private int encodeTypeSize(int type, long rawLength) {
393 long nextLength = rawLength >>> 4;
394 hdrBuf[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F));
395 rawLength = nextLength;
396 int n = 1;
397 while (rawLength > 0) {
398 nextLength >>>= 7;
399 hdrBuf[n++] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (rawLength & 0x7F));
400 rawLength = nextLength;
401 }
402 return n;
403 }
404
405 @Override
406 public void write(final int b) throws IOException {
407 hdrBuf[0] = (byte) b;
408 write(hdrBuf, 0, 1);
409 }
410
411 @Override
412 public void write(byte[] data, int off, int len) throws IOException {
413 crc32.update(data, off, len);
414 md.update(data, off, len);
415 writeNoHash(data, off, len);
416 }
417
418 private void writeNoHash(byte[] data, int off, int len)
419 throws IOException {
420 while (0 < len) {
421 int n = Math.min(len, currBuf.length - currPtr);
422 if (n == 0) {
423 flushBlock();
424 currBuf = new byte[blockSize];
425 continue;
426 }
427
428 System.arraycopy(data, off, currBuf, currPtr, n);
429 off += n;
430 len -= n;
431 currPtr += n;
432 }
433 }
434
435 private void flushBlock() throws IOException {
436 out.write(currBuf, 0, currPtr);
437
438 byte[] buf;
439 if (currPtr == currBuf.length)
440 buf = currBuf;
441 else
442 buf = copyOf(currBuf, 0, currPtr);
443 cache.put(new DfsBlock(packKey, currPos, buf));
444
445 currPos += currPtr;
446 currPtr = 0;
447 currBuf = null;
448 }
449
450 private byte[] copyOf(byte[] src, int ptr, int cnt) {
451 byte[] dst = new byte[cnt];
452 System.arraycopy(src, ptr, dst, 0, cnt);
453 return dst;
454 }
455
456 byte[] writePackFooter() throws IOException {
457 byte[] packHash = md.digest();
458 writeNoHash(packHash, 0, packHash.length);
459 if (currPtr != 0)
460 flushBlock();
461 return packHash;
462 }
463
464 int read(long pos, byte[] dst, int ptr, int cnt) throws IOException {
465 int r = 0;
466 while (pos < currPos && r < cnt) {
467 DfsBlock b = getOrLoadBlock(pos);
468 int n = b.copy(pos, dst, ptr + r, cnt - r);
469 pos += n;
470 r += n;
471 }
472 if (currPos <= pos && r < cnt) {
473 int s = (int) (pos - currPos);
474 int n = Math.min(currPtr - s, cnt - r);
475 System.arraycopy(currBuf, s, dst, ptr + r, n);
476 r += n;
477 }
478 return r;
479 }
480
481 byte[] inflate(DfsReader ctx, long pos, int len) throws IOException,
482 DataFormatException {
483 byte[] dstbuf;
484 try {
485 dstbuf = new byte[len];
486 } catch (OutOfMemoryError noMemory) {
487 return null;
488 }
489
490 Inflater inf = ctx.inflater();
491 pos += setInput(pos, inf);
492 for (int dstoff = 0;;) {
493 int n = inf.inflate(dstbuf, dstoff, dstbuf.length - dstoff);
494 dstoff += n;
495 if (inf.finished())
496 return dstbuf;
497 if (inf.needsInput())
498 pos += setInput(pos, inf);
499 else if (n == 0)
500 throw new DataFormatException();
501 }
502 }
503
504 private int setInput(long pos, Inflater inf)
505 throws IOException, DataFormatException {
506 if (pos < currPos)
507 return getOrLoadBlock(pos).setInput(pos, inf);
508 if (pos < currPos + currPtr) {
509 int s = (int) (pos - currPos);
510 int n = currPtr - s;
511 inf.setInput(currBuf, s, n);
512 return n;
513 }
514 throw new EOFException(JGitText.get().unexpectedEofInPack);
515 }
516
517 private DfsBlock getOrLoadBlock(long pos) throws IOException {
518 long s = toBlockStart(pos);
519 DfsBlock b = cache.get(packKey, s);
520 if (b != null)
521 return b;
522
523 byte[] d = new byte[blockSize];
524 for (int p = 0; p < blockSize;) {
525 int n = out.read(s + p, ByteBuffer.wrap(d, p, blockSize - p));
526 if (n <= 0)
527 throw new EOFException(JGitText.get().unexpectedEofInPack);
528 p += n;
529 }
530 b = new DfsBlock(packKey, s, d);
531 cache.put(b);
532 return b;
533 }
534
535 private long toBlockStart(long pos) {
536 return (pos / blockSize) * blockSize;
537 }
538
539 @Override
540 public void close() throws IOException {
541 deflater.end();
542 out.close();
543 }
544 }
545
546 private class Reader extends ObjectReader {
547 private final DfsReader ctx = db.newReader();
548
549 @Override
550 public ObjectReader newReader() {
551 return db.newReader();
552 }
553
554 @Override
555 public Collection<ObjectId> resolve(AbbreviatedObjectId id)
556 throws IOException {
557 Collection<ObjectId> stored = ctx.resolve(id);
558 if (objectList == null)
559 return stored;
560
561 Set<ObjectId> r = new HashSet<>(stored.size() + 2);
562 r.addAll(stored);
563 for (PackedObjectInfo obj : objectList) {
564 if (id.prefixCompare(obj) == 0)
565 r.add(obj.copy());
566 }
567 return r;
568 }
569
570 @Override
571 public ObjectLoader open(AnyObjectId objectId, int typeHint)
572 throws IOException {
573 if (objectMap == null)
574 return ctx.open(objectId, typeHint);
575
576 PackedObjectInfo obj = objectMap.get(objectId);
577 if (obj == null)
578 return ctx.open(objectId, typeHint);
579
580 byte[] buf = buffer();
581 int cnt = packOut.read(obj.getOffset(), buf, 0, 20);
582 if (cnt <= 0)
583 throw new EOFException(JGitText.get().unexpectedEofInPack);
584
585 int c = buf[0] & 0xff;
586 int type = (c >> 4) & 7;
587 if (type == OBJ_OFS_DELTA || type == OBJ_REF_DELTA)
588 throw new IOException(MessageFormat.format(
589 JGitText.get().cannotReadBackDelta, Integer.toString(type)));
590 if (typeHint != OBJ_ANY && type != typeHint) {
591 throw new IncorrectObjectTypeException(objectId.copy(), typeHint);
592 }
593
594 long sz = c & 0x0f;
595 int ptr = 1;
596 int shift = 4;
597 while ((c & 0x80) != 0) {
598 if (ptr >= cnt)
599 throw new EOFException(JGitText.get().unexpectedEofInPack);
600 c = buf[ptr++] & 0xff;
601 sz += ((long) (c & 0x7f)) << shift;
602 shift += 7;
603 }
604
605 long zpos = obj.getOffset() + ptr;
606 if (sz < ctx.getStreamFileThreshold()) {
607 byte[] data = inflate(obj, zpos, (int) sz);
608 if (data != null)
609 return new ObjectLoader.SmallObject(type, data);
610 }
611 return new StreamLoader(obj.copy(), type, sz, packKey, zpos);
612 }
613
614 private byte[] inflate(PackedObjectInfo obj, long zpos, int sz)
615 throws IOException, CorruptObjectException {
616 try {
617 return packOut.inflate(ctx, zpos, sz);
618 } catch (DataFormatException dfe) {
619 throw new CorruptObjectException(
620 MessageFormat.format(
621 JGitText.get().objectAtHasBadZlibStream,
622 Long.valueOf(obj.getOffset()),
623 packDsc.getFileName(PackExt.PACK)),
624 dfe);
625 }
626 }
627
628 @Override
629 public boolean has(AnyObjectId objectId) throws IOException {
630 return (objectMap != null && objectMap.contains(objectId))
631 || ctx.has(objectId);
632 }
633
634 @Override
635 public Set<ObjectId> getShallowCommits() throws IOException {
636 return ctx.getShallowCommits();
637 }
638
639 @Override
640 public ObjectInserter getCreatedFromInserter() {
641 return DfsInserter.this;
642 }
643
644 @Override
645 public void close() {
646 ctx.close();
647 }
648 }
649
650 private class StreamLoader extends ObjectLoader {
651 private final ObjectId id;
652 private final int type;
653 private final long size;
654
655 private final DfsStreamKey srcPack;
656 private final long pos;
657
658 StreamLoader(ObjectId id, int type, long sz,
659 DfsStreamKey key, long pos) {
660 this.id = id;
661 this.type = type;
662 this.size = sz;
663 this.srcPack = key;
664 this.pos = pos;
665 }
666
667 @Override
668 public ObjectStream openStream() throws IOException {
669 final DfsReader ctx = db.newReader();
670 if (srcPack != packKey) {
671 try {
672
673
674 return ctx.open(id, type).openStream();
675 } finally {
676 ctx.close();
677 }
678 }
679
680 int bufsz = 8192;
681 final Inflater inf = ctx.inflater();
682 return new ObjectStream.Filter(type,
683 size, new BufferedInputStream(new InflaterInputStream(
684 new ReadBackStream(pos), inf, bufsz), bufsz)) {
685 @Override
686 public void close() throws IOException {
687 ctx.close();
688 super.close();
689 }
690 };
691 }
692
693 @Override
694 public int getType() {
695 return type;
696 }
697
698 @Override
699 public long getSize() {
700 return size;
701 }
702
703 @Override
704 public boolean isLarge() {
705 return true;
706 }
707
708 @Override
709 public byte[] getCachedBytes() throws LargeObjectException {
710 throw new LargeObjectException.ExceedsLimit(
711 db.getReaderOptions().getStreamFileThreshold(), size);
712 }
713 }
714
715 private final class ReadBackStream extends InputStream {
716 private long pos;
717
718 ReadBackStream(long offset) {
719 pos = offset;
720 }
721
722 @Override
723 public int read() throws IOException {
724 byte[] b = new byte[1];
725 int n = read(b);
726 return n == 1 ? b[0] & 0xff : -1;
727 }
728
729 @Override
730 public int read(byte[] buf, int ptr, int len) throws IOException {
731 int n = packOut.read(pos, buf, ptr, len);
732 if (n > 0) {
733 pos += n;
734 }
735 return n;
736 }
737 }
738 }