1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.storage.dfs;
45
46 import static org.eclipse.jgit.internal.storage.pack.PackExt.INDEX;
47 import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
48 import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
49 import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;
50
51 import java.io.BufferedInputStream;
52 import java.io.EOFException;
53 import java.io.IOException;
54 import java.io.InputStream;
55 import java.io.OutputStream;
56 import java.nio.ByteBuffer;
57 import java.security.MessageDigest;
58 import java.text.MessageFormat;
59 import java.util.Collection;
60 import java.util.Collections;
61 import java.util.HashSet;
62 import java.util.List;
63 import java.util.Set;
64 import java.util.zip.CRC32;
65 import java.util.zip.DataFormatException;
66 import java.util.zip.Deflater;
67 import java.util.zip.DeflaterOutputStream;
68 import java.util.zip.Inflater;
69 import java.util.zip.InflaterInputStream;
70
71 import org.eclipse.jgit.errors.CorruptObjectException;
72 import org.eclipse.jgit.errors.IncorrectObjectTypeException;
73 import org.eclipse.jgit.errors.LargeObjectException;
74 import org.eclipse.jgit.internal.JGitText;
75 import org.eclipse.jgit.internal.storage.file.PackIndex;
76 import org.eclipse.jgit.internal.storage.file.PackIndexWriter;
77 import org.eclipse.jgit.internal.storage.pack.PackExt;
78 import org.eclipse.jgit.lib.AbbreviatedObjectId;
79 import org.eclipse.jgit.lib.AnyObjectId;
80 import org.eclipse.jgit.lib.Constants;
81 import org.eclipse.jgit.lib.ObjectId;
82 import org.eclipse.jgit.lib.ObjectIdOwnerMap;
83 import org.eclipse.jgit.lib.ObjectInserter;
84 import org.eclipse.jgit.lib.ObjectLoader;
85 import org.eclipse.jgit.lib.ObjectReader;
86 import org.eclipse.jgit.lib.ObjectStream;
87 import org.eclipse.jgit.transport.PackedObjectInfo;
88 import org.eclipse.jgit.util.BlockList;
89 import org.eclipse.jgit.util.IO;
90 import org.eclipse.jgit.util.NB;
91 import org.eclipse.jgit.util.TemporaryBuffer;
92 import org.eclipse.jgit.util.io.CountingOutputStream;
93 import org.eclipse.jgit.util.sha1.SHA1;
94
95
96 public class DfsInserter extends ObjectInserter {
97
98 private static final int INDEX_VERSION = 2;
99
100 final DfsObjDatabase db;
101 int compression = Deflater.BEST_COMPRESSION;
102
103 List<PackedObjectInfo> objectList;
104 ObjectIdOwnerMap<PackedObjectInfo> objectMap;
105
106 DfsBlockCache cache;
107 DfsPackKey packKey;
108 DfsPackDescription packDsc;
109 PackStream packOut;
110 private boolean rollback;
111 private boolean checkExisting = true;
112
113
114
115
116
117
118
119 protected DfsInserter(DfsObjDatabase db) {
120 this.db = db;
121 }
122
123
124
125
126
127
128 public void checkExisting(boolean check) {
129 checkExisting = check;
130 }
131
132 void setCompressionLevel(int compression) {
133 this.compression = compression;
134 }
135
136 @Override
137 public DfsPackParser newPackParser(InputStream in) throws IOException {
138 return new DfsPackParser(db, this, in);
139 }
140
141 @Override
142 public ObjectReader newReader() {
143 return new Reader();
144 }
145
146 @Override
147 public ObjectId insert(int type, byte[] data, int off, int len)
148 throws IOException {
149 ObjectId id = idFor(type, data, off, len);
150 if (objectMap != null && objectMap.contains(id))
151 return id;
152
153 if (checkExisting && db.has(id, true))
154 return id;
155
156 long offset = beginObject(type, len);
157 packOut.compress.write(data, off, len);
158 packOut.compress.finish();
159 return endObject(id, offset);
160 }
161
162 @Override
163 public ObjectId insert(int type, long len, InputStream in)
164 throws IOException {
165 byte[] buf = insertBuffer(len);
166 if (len <= buf.length) {
167 IO.readFully(in, buf, 0, (int) len);
168 return insert(type, buf, 0, (int) len);
169 }
170
171 long offset = beginObject(type, len);
172 SHA1 md = digest();
173 md.update(Constants.encodedTypeString(type));
174 md.update((byte) ' ');
175 md.update(Constants.encodeASCII(len));
176 md.update((byte) 0);
177
178 while (0 < len) {
179 int n = in.read(buf, 0, (int) Math.min(buf.length, len));
180 if (n <= 0)
181 throw new EOFException();
182 md.update(buf, 0, n);
183 packOut.compress.write(buf, 0, n);
184 len -= n;
185 }
186 packOut.compress.finish();
187 return endObject(md.toObjectId(), offset);
188 }
189
190 private byte[] insertBuffer(long len) {
191 byte[] buf = buffer();
192 if (len <= buf.length)
193 return buf;
194 if (len < db.getReaderOptions().getStreamFileThreshold()) {
195 try {
196 return new byte[(int) len];
197 } catch (OutOfMemoryError noMem) {
198 return buf;
199 }
200 }
201 return buf;
202 }
203
204 @Override
205 public void flush() throws IOException {
206 if (packDsc == null)
207 return;
208
209 if (packOut == null)
210 throw new IOException();
211
212 byte[] packHash = packOut.writePackFooter();
213 packDsc.addFileExt(PACK);
214 packDsc.setFileSize(PACK, packOut.getCount());
215 packOut.close();
216 packOut = null;
217
218 sortObjectsById();
219
220 PackIndex index = writePackIndex(packDsc, packHash, objectList);
221 db.commitPack(Collections.singletonList(packDsc), null);
222 rollback = false;
223
224 DfsPackFile p = cache.getOrCreate(packDsc, packKey);
225 if (index != null)
226 p.setPackIndex(index);
227 db.addPack(p);
228 clear();
229 }
230
231 @Override
232 public void close() {
233 if (packOut != null) {
234 try {
235 packOut.close();
236 } catch (IOException err) {
237
238 } finally {
239 packOut = null;
240 }
241 }
242 if (rollback && packDsc != null) {
243 try {
244 db.rollbackPack(Collections.singletonList(packDsc));
245 } finally {
246 packDsc = null;
247 rollback = false;
248 }
249 }
250 clear();
251 }
252
253 private void clear() {
254 objectList = null;
255 objectMap = null;
256 packKey = null;
257 packDsc = null;
258 }
259
260 private long beginObject(int type, long len) throws IOException {
261 if (packOut == null)
262 beginPack();
263 long offset = packOut.getCount();
264 packOut.beginObject(type, len);
265 return offset;
266 }
267
268 private ObjectId endObject(ObjectId id, long offset) {
269 PackedObjectInfo obj = new PackedObjectInfo(id);
270 obj.setOffset(offset);
271 obj.setCRC((int) packOut.crc32.getValue());
272 objectList.add(obj);
273 objectMap.addIfAbsent(obj);
274 return id;
275 }
276
277 private void beginPack() throws IOException {
278 objectList = new BlockList<>();
279 objectMap = new ObjectIdOwnerMap<>();
280 cache = DfsBlockCache.getInstance();
281
282 rollback = true;
283 packDsc = db.newPack(DfsObjDatabase.PackSource.INSERT);
284 packOut = new PackStream(db.writeFile(packDsc, PACK));
285 packKey = new DfsPackKey();
286
287
288 byte[] buf = packOut.hdrBuf;
289 System.arraycopy(Constants.PACK_SIGNATURE, 0, buf, 0, 4);
290 NB.encodeInt32(buf, 4, 2);
291 NB.encodeInt32(buf, 8, 1);
292 packOut.write(buf, 0, 12);
293 }
294
295 private void sortObjectsById() {
296 Collections.sort(objectList);
297 }
298
299 PackIndex writePackIndex(DfsPackDescription pack, byte[] packHash,
300 List<PackedObjectInfo> list) throws IOException {
301 pack.setIndexVersion(INDEX_VERSION);
302 pack.setObjectCount(list.size());
303
304
305
306
307 TemporaryBuffer.Heap buf = null;
308 PackIndex packIndex = null;
309 if (list.size() <= 58000) {
310 buf = new TemporaryBuffer.Heap(2 << 20);
311 index(buf, packHash, list);
312 packIndex = PackIndex.read(buf.openInputStream());
313 }
314
315 DfsOutputStream os = db.writeFile(pack, INDEX);
316 try (CountingOutputStream cnt = new CountingOutputStream(os)) {
317 if (buf != null)
318 buf.writeTo(cnt, null);
319 else
320 index(cnt, packHash, list);
321 pack.addFileExt(INDEX);
322 pack.setFileSize(INDEX, cnt.getCount());
323 } finally {
324 if (buf != null) {
325 buf.close();
326 }
327 }
328 return packIndex;
329 }
330
331 private static void index(OutputStream out, byte[] packHash,
332 List<PackedObjectInfo> list) throws IOException {
333 PackIndexWriter.createVersion(out, INDEX_VERSION).write(list, packHash);
334 }
335
336 private class PackStream extends OutputStream {
337 private final DfsOutputStream out;
338 private final MessageDigest md;
339 final byte[] hdrBuf;
340 private final Deflater deflater;
341 private final int blockSize;
342
343 private long currPos;
344 private int currPtr;
345 private byte[] currBuf;
346
347 final CRC32 crc32;
348 final DeflaterOutputStream compress;
349
350 PackStream(DfsOutputStream out) {
351 this.out = out;
352
353 hdrBuf = new byte[32];
354 md = Constants.newMessageDigest();
355 crc32 = new CRC32();
356 deflater = new Deflater(compression);
357 compress = new DeflaterOutputStream(this, deflater, 8192);
358
359 int size = out.blockSize();
360 if (size <= 0)
361 size = cache.getBlockSize();
362 else if (size < cache.getBlockSize())
363 size = (cache.getBlockSize() / size) * size;
364 blockSize = size;
365 currBuf = new byte[blockSize];
366 }
367
368 long getCount() {
369 return currPos + currPtr;
370 }
371
372 void beginObject(int objectType, long length) throws IOException {
373 crc32.reset();
374 deflater.reset();
375 write(hdrBuf, 0, encodeTypeSize(objectType, length));
376 }
377
378 private int encodeTypeSize(int type, long rawLength) {
379 long nextLength = rawLength >>> 4;
380 hdrBuf[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F));
381 rawLength = nextLength;
382 int n = 1;
383 while (rawLength > 0) {
384 nextLength >>>= 7;
385 hdrBuf[n++] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (rawLength & 0x7F));
386 rawLength = nextLength;
387 }
388 return n;
389 }
390
391 @Override
392 public void write(final int b) throws IOException {
393 hdrBuf[0] = (byte) b;
394 write(hdrBuf, 0, 1);
395 }
396
397 @Override
398 public void write(byte[] data, int off, int len) throws IOException {
399 crc32.update(data, off, len);
400 md.update(data, off, len);
401 writeNoHash(data, off, len);
402 }
403
404 private void writeNoHash(byte[] data, int off, int len)
405 throws IOException {
406 while (0 < len) {
407 int n = Math.min(len, currBuf.length - currPtr);
408 if (n == 0) {
409 flushBlock();
410 currBuf = new byte[blockSize];
411 continue;
412 }
413
414 System.arraycopy(data, off, currBuf, currPtr, n);
415 off += n;
416 len -= n;
417 currPtr += n;
418 }
419 }
420
421 private void flushBlock() throws IOException {
422 out.write(currBuf, 0, currPtr);
423
424 byte[] buf;
425 if (currPtr == currBuf.length)
426 buf = currBuf;
427 else
428 buf = copyOf(currBuf, 0, currPtr);
429 cache.put(new DfsBlock(packKey, currPos, buf));
430
431 currPos += currPtr;
432 currPtr = 0;
433 currBuf = null;
434 }
435
436 private byte[] copyOf(byte[] src, int ptr, int cnt) {
437 byte[] dst = new byte[cnt];
438 System.arraycopy(src, ptr, dst, 0, cnt);
439 return dst;
440 }
441
442 byte[] writePackFooter() throws IOException {
443 byte[] packHash = md.digest();
444 writeNoHash(packHash, 0, packHash.length);
445 if (currPtr != 0)
446 flushBlock();
447 return packHash;
448 }
449
450 int read(long pos, byte[] dst, int ptr, int cnt) throws IOException {
451 int r = 0;
452 while (pos < currPos && r < cnt) {
453 DfsBlock b = getOrLoadBlock(pos);
454 int n = b.copy(pos, dst, ptr + r, cnt - r);
455 pos += n;
456 r += n;
457 }
458 if (currPos <= pos && r < cnt) {
459 int s = (int) (pos - currPos);
460 int n = Math.min(currPtr - s, cnt - r);
461 System.arraycopy(currBuf, s, dst, ptr + r, n);
462 r += n;
463 }
464 return r;
465 }
466
467 byte[] inflate(DfsReader ctx, long pos, int len) throws IOException,
468 DataFormatException {
469 byte[] dstbuf;
470 try {
471 dstbuf = new byte[len];
472 } catch (OutOfMemoryError noMemory) {
473 return null;
474 }
475
476 Inflater inf = ctx.inflater();
477 pos += setInput(pos, inf);
478 for (int dstoff = 0;;) {
479 int n = inf.inflate(dstbuf, dstoff, dstbuf.length - dstoff);
480 dstoff += n;
481 if (inf.finished())
482 return dstbuf;
483 if (inf.needsInput())
484 pos += setInput(pos, inf);
485 else if (n == 0)
486 throw new DataFormatException();
487 }
488 }
489
490 private int setInput(long pos, Inflater inf)
491 throws IOException, DataFormatException {
492 if (pos < currPos)
493 return getOrLoadBlock(pos).setInput(pos, inf);
494 if (pos < currPos + currPtr) {
495 int s = (int) (pos - currPos);
496 int n = currPtr - s;
497 inf.setInput(currBuf, s, n);
498 return n;
499 }
500 throw new EOFException(DfsText.get().unexpectedEofInPack);
501 }
502
503 private DfsBlock getOrLoadBlock(long pos) throws IOException {
504 long s = toBlockStart(pos);
505 DfsBlock b = cache.get(packKey, s);
506 if (b != null)
507 return b;
508
509 byte[] d = new byte[blockSize];
510 for (int p = 0; p < blockSize;) {
511 int n = out.read(s + p, ByteBuffer.wrap(d, p, blockSize - p));
512 if (n <= 0)
513 throw new EOFException(DfsText.get().unexpectedEofInPack);
514 p += n;
515 }
516 b = new DfsBlock(packKey, s, d);
517 cache.put(b);
518 return b;
519 }
520
521 private long toBlockStart(long pos) {
522 return (pos / blockSize) * blockSize;
523 }
524
525 @Override
526 public void close() throws IOException {
527 deflater.end();
528 out.close();
529 }
530 }
531
532 private class Reader extends ObjectReader {
533 private final DfsReader ctx = new DfsReader(db);
534
535 @Override
536 public ObjectReader newReader() {
537 return db.newReader();
538 }
539
540 @Override
541 public Collection<ObjectId> resolve(AbbreviatedObjectId id)
542 throws IOException {
543 Collection<ObjectId> stored = ctx.resolve(id);
544 if (objectList == null)
545 return stored;
546
547 Set<ObjectId> r = new HashSet<>(stored.size() + 2);
548 r.addAll(stored);
549 for (PackedObjectInfo obj : objectList) {
550 if (id.prefixCompare(obj) == 0)
551 r.add(obj.copy());
552 }
553 return r;
554 }
555
556 @Override
557 public ObjectLoader open(AnyObjectId objectId, int typeHint)
558 throws IOException {
559 if (objectMap == null)
560 return ctx.open(objectId, typeHint);
561
562 PackedObjectInfo obj = objectMap.get(objectId);
563 if (obj == null)
564 return ctx.open(objectId, typeHint);
565
566 byte[] buf = buffer();
567 int cnt = packOut.read(obj.getOffset(), buf, 0, 20);
568 if (cnt <= 0)
569 throw new EOFException(DfsText.get().unexpectedEofInPack);
570
571 int c = buf[0] & 0xff;
572 int type = (c >> 4) & 7;
573 if (type == OBJ_OFS_DELTA || type == OBJ_REF_DELTA)
574 throw new IOException(MessageFormat.format(
575 DfsText.get().cannotReadBackDelta, Integer.toString(type)));
576 if (typeHint != OBJ_ANY && type != typeHint) {
577 throw new IncorrectObjectTypeException(objectId.copy(), typeHint);
578 }
579
580 long sz = c & 0x0f;
581 int ptr = 1;
582 int shift = 4;
583 while ((c & 0x80) != 0) {
584 if (ptr >= cnt)
585 throw new EOFException(DfsText.get().unexpectedEofInPack);
586 c = buf[ptr++] & 0xff;
587 sz += ((long) (c & 0x7f)) << shift;
588 shift += 7;
589 }
590
591 long zpos = obj.getOffset() + ptr;
592 if (sz < ctx.getStreamFileThreshold()) {
593 byte[] data = inflate(obj, zpos, (int) sz);
594 if (data != null)
595 return new ObjectLoader.SmallObject(type, data);
596 }
597 return new StreamLoader(obj.copy(), type, sz, packKey, zpos);
598 }
599
600 private byte[] inflate(PackedObjectInfo obj, long zpos, int sz)
601 throws IOException, CorruptObjectException {
602 try {
603 return packOut.inflate(ctx, zpos, sz);
604 } catch (DataFormatException dfe) {
605 CorruptObjectException coe = new CorruptObjectException(
606 MessageFormat.format(
607 JGitText.get().objectAtHasBadZlibStream,
608 Long.valueOf(obj.getOffset()),
609 packDsc.getFileName(PackExt.PACK)));
610 coe.initCause(dfe);
611 throw coe;
612 }
613 }
614
615 @Override
616 public Set<ObjectId> getShallowCommits() throws IOException {
617 return ctx.getShallowCommits();
618 }
619
620 @Override
621 public ObjectInserter getCreatedFromInserter() {
622 return DfsInserter.this;
623 }
624
625 @Override
626 public void close() {
627 ctx.close();
628 }
629 }
630
631 private class StreamLoader extends ObjectLoader {
632 private final ObjectId id;
633 private final int type;
634 private final long size;
635
636 private final DfsPackKey srcPack;
637 private final long pos;
638
639 StreamLoader(ObjectId id, int type, long sz,
640 DfsPackKey key, long pos) {
641 this.id = id;
642 this.type = type;
643 this.size = sz;
644 this.srcPack = key;
645 this.pos = pos;
646 }
647
648 @Override
649 public ObjectStream openStream() throws IOException {
650 final DfsReader ctx = new DfsReader(db);
651 if (srcPack != packKey) {
652 try {
653
654
655 return ctx.open(id, type).openStream();
656 } finally {
657 ctx.close();
658 }
659 }
660
661 int bufsz = 8192;
662 final Inflater inf = ctx.inflater();
663 return new ObjectStream.Filter(type,
664 size, new BufferedInputStream(new InflaterInputStream(
665 new ReadBackStream(pos), inf, bufsz), bufsz)) {
666 @Override
667 public void close() throws IOException {
668 ctx.close();
669 super.close();
670 }
671 };
672 }
673
674 @Override
675 public int getType() {
676 return type;
677 }
678
679 @Override
680 public long getSize() {
681 return size;
682 }
683
684 @Override
685 public boolean isLarge() {
686 return true;
687 }
688
689 @Override
690 public byte[] getCachedBytes() throws LargeObjectException {
691 throw new LargeObjectException.ExceedsLimit(
692 db.getReaderOptions().getStreamFileThreshold(), size);
693 }
694 }
695
696 private final class ReadBackStream extends InputStream {
697 private long pos;
698
699 ReadBackStream(long offset) {
700 pos = offset;
701 }
702
703 @Override
704 public int read() throws IOException {
705 byte[] b = new byte[1];
706 int n = read(b);
707 return n == 1 ? b[0] & 0xff : -1;
708 }
709
710 @Override
711 public int read(byte[] buf, int ptr, int len) throws IOException {
712 int n = packOut.read(pos, buf, ptr, len);
713 if (n > 0) {
714 pos += n;
715 }
716 return n;
717 }
718 }
719 }