1
2
3
4
5
6
7
8
9
10
11 package org.eclipse.jgit.internal.storage.dfs;
12
13 import static org.eclipse.jgit.internal.storage.pack.PackExt.INDEX;
14 import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
15 import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
16 import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;
17
18 import java.io.BufferedInputStream;
19 import java.io.EOFException;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.OutputStream;
23 import java.nio.ByteBuffer;
24 import java.security.MessageDigest;
25 import java.text.MessageFormat;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Set;
31 import java.util.zip.CRC32;
32 import java.util.zip.DataFormatException;
33 import java.util.zip.Deflater;
34 import java.util.zip.DeflaterOutputStream;
35 import java.util.zip.Inflater;
36 import java.util.zip.InflaterInputStream;
37
38 import org.eclipse.jgit.annotations.Nullable;
39 import org.eclipse.jgit.errors.CorruptObjectException;
40 import org.eclipse.jgit.errors.IncorrectObjectTypeException;
41 import org.eclipse.jgit.errors.LargeObjectException;
42 import org.eclipse.jgit.internal.JGitText;
43 import org.eclipse.jgit.internal.storage.file.PackIndex;
44 import org.eclipse.jgit.internal.storage.file.PackIndexWriter;
45 import org.eclipse.jgit.internal.storage.pack.PackExt;
46 import org.eclipse.jgit.lib.AbbreviatedObjectId;
47 import org.eclipse.jgit.lib.AnyObjectId;
48 import org.eclipse.jgit.lib.Constants;
49 import org.eclipse.jgit.lib.ObjectId;
50 import org.eclipse.jgit.lib.ObjectIdOwnerMap;
51 import org.eclipse.jgit.lib.ObjectInserter;
52 import org.eclipse.jgit.lib.ObjectLoader;
53 import org.eclipse.jgit.lib.ObjectReader;
54 import org.eclipse.jgit.lib.ObjectStream;
55 import org.eclipse.jgit.transport.PackedObjectInfo;
56 import org.eclipse.jgit.util.BlockList;
57 import org.eclipse.jgit.util.IO;
58 import org.eclipse.jgit.util.NB;
59 import org.eclipse.jgit.util.TemporaryBuffer;
60 import org.eclipse.jgit.util.io.CountingOutputStream;
61 import org.eclipse.jgit.util.sha1.SHA1;
62
63
64
65
66 public class DfsInserter extends ObjectInserter {
67
68 private static final int INDEX_VERSION = 2;
69
70 final DfsObjDatabase db;
71 int compression = Deflater.BEST_COMPRESSION;
72
73 List<PackedObjectInfo> objectList;
74 ObjectIdOwnerMap<PackedObjectInfo> objectMap;
75
76 DfsBlockCache cache;
77 DfsStreamKey packKey;
78 DfsPackDescription packDsc;
79 PackStream packOut;
80 private boolean rollback;
81 private boolean checkExisting = true;
82
83
84
85
86
87
88
89 protected DfsInserter(DfsObjDatabase db) {
90 this.db = db;
91 }
92
93
94
95
96
97
98
99
100
101 public void checkExisting(boolean check) {
102 checkExisting = check;
103 }
104
105 void setCompressionLevel(int compression) {
106 this.compression = compression;
107 }
108
109
110 @Override
111 public DfsPackParser newPackParser(InputStream in) throws IOException {
112 return new DfsPackParser(db, this, in);
113 }
114
115
116 @Override
117 public ObjectReader newReader() {
118 return new Reader();
119 }
120
121
122 @Override
123 public ObjectId insert(int type, byte[] data, int off, int len)
124 throws IOException {
125 ObjectId id = idFor(type, data, off, len);
126 if (objectMap != null && objectMap.contains(id))
127 return id;
128
129 if (checkExisting && db.has(id, true))
130 return id;
131
132 long offset = beginObject(type, len);
133 packOut.compress.write(data, off, len);
134 packOut.compress.finish();
135 return endObject(id, offset);
136 }
137
138
139 @Override
140 public ObjectId insert(int type, long len, InputStream in)
141 throws IOException {
142 byte[] buf = insertBuffer(len);
143 if (len <= buf.length) {
144 IO.readFully(in, buf, 0, (int) len);
145 return insert(type, buf, 0, (int) len);
146 }
147
148 long offset = beginObject(type, len);
149 SHA1 md = digest();
150 md.update(Constants.encodedTypeString(type));
151 md.update((byte) ' ');
152 md.update(Constants.encodeASCII(len));
153 md.update((byte) 0);
154
155 while (0 < len) {
156 int n = in.read(buf, 0, (int) Math.min(buf.length, len));
157 if (n <= 0)
158 throw new EOFException();
159 md.update(buf, 0, n);
160 packOut.compress.write(buf, 0, n);
161 len -= n;
162 }
163 packOut.compress.finish();
164 return endObject(md.toObjectId(), offset);
165 }
166
167 private byte[] insertBuffer(long len) {
168 byte[] buf = buffer();
169 if (len <= buf.length)
170 return buf;
171 if (len < db.getReaderOptions().getStreamFileThreshold()) {
172 try {
173 return new byte[(int) len];
174 } catch (OutOfMemoryError noMem) {
175 return buf;
176 }
177 }
178 return buf;
179 }
180
181
182 @Override
183 public void flush() throws IOException {
184 if (packDsc == null)
185 return;
186
187 if (packOut == null)
188 throw new IOException();
189
190 byte[] packHash = packOut.writePackFooter();
191 packDsc.addFileExt(PACK);
192 packDsc.setFileSize(PACK, packOut.getCount());
193 packOut.close();
194 packOut = null;
195
196 sortObjectsById();
197
198 PackIndex index = writePackIndex(packDsc, packHash, objectList);
199 db.commitPack(Collections.singletonList(packDsc), null);
200 rollback = false;
201
202 DfsPackFile p = new DfsPackFile(cache, packDsc);
203 if (index != null)
204 p.setPackIndex(index);
205 db.addPack(p);
206 clear();
207 }
208
209
210 @Override
211 public void close() {
212 if (packOut != null) {
213 try {
214 packOut.close();
215 } catch (IOException err) {
216
217 } finally {
218 packOut = null;
219 }
220 }
221 if (rollback && packDsc != null) {
222 try {
223 db.rollbackPack(Collections.singletonList(packDsc));
224 } finally {
225 packDsc = null;
226 rollback = false;
227 }
228 }
229 clear();
230 }
231
232 private void clear() {
233 objectList = null;
234 objectMap = null;
235 packKey = null;
236 packDsc = null;
237 }
238
239 private long beginObject(int type, long len) throws IOException {
240 if (packOut == null)
241 beginPack();
242 long offset = packOut.getCount();
243 packOut.beginObject(type, len);
244 return offset;
245 }
246
247 private ObjectId endObject(ObjectId id, long offset) {
248 PackedObjectInfo obj = new PackedObjectInfo(id);
249 obj.setOffset(offset);
250 obj.setCRC((int) packOut.crc32.getValue());
251 objectList.add(obj);
252 objectMap.addIfAbsent(obj);
253 return id;
254 }
255
256 private void beginPack() throws IOException {
257 objectList = new BlockList<>();
258 objectMap = new ObjectIdOwnerMap<>();
259 cache = DfsBlockCache.getInstance();
260
261 rollback = true;
262 packDsc = db.newPack(DfsObjDatabase.PackSource.INSERT);
263 DfsOutputStream dfsOut = db.writeFile(packDsc, PACK);
264 packDsc.setBlockSize(PACK, dfsOut.blockSize());
265 packOut = new PackStream(dfsOut);
266 packKey = packDsc.getStreamKey(PACK);
267
268
269 byte[] buf = packOut.hdrBuf;
270 System.arraycopy(Constants.PACK_SIGNATURE, 0, buf, 0, 4);
271 NB.encodeInt32(buf, 4, 2);
272 NB.encodeInt32(buf, 8, 1);
273 packOut.write(buf, 0, 12);
274 }
275
276 private void sortObjectsById() {
277 Collections.sort(objectList);
278 }
279
280 @Nullable
281 private TemporaryBuffer.Heap maybeGetTemporaryBuffer(
282 List<PackedObjectInfo> list) {
283 if (list.size() <= 58000) {
284 return new TemporaryBuffer.Heap(2 << 20);
285 }
286 return null;
287 }
288
289 PackIndex writePackIndex(DfsPackDescription pack, byte[] packHash,
290 List<PackedObjectInfo> list) throws IOException {
291 pack.setIndexVersion(INDEX_VERSION);
292 pack.setObjectCount(list.size());
293
294
295
296
297 PackIndex packIndex = null;
298 try (TemporaryBuffer.Heap buf = maybeGetTemporaryBuffer(list);
299 DfsOutputStream os = db.writeFile(pack, INDEX);
300 CountingOutputStream cnt = new CountingOutputStream(os)) {
301 if (buf != null) {
302 index(buf, packHash, list);
303 packIndex = PackIndex.read(buf.openInputStream());
304 buf.writeTo(cnt, null);
305 } else {
306 index(cnt, packHash, list);
307 }
308 pack.addFileExt(INDEX);
309 pack.setBlockSize(INDEX, os.blockSize());
310 pack.setFileSize(INDEX, cnt.getCount());
311 }
312 return packIndex;
313 }
314
315 private static void index(OutputStream out, byte[] packHash,
316 List<PackedObjectInfo> list) throws IOException {
317 PackIndexWriter.createVersion(out, INDEX_VERSION).write(list, packHash);
318 }
319
320 private class PackStream extends OutputStream {
321 private final DfsOutputStream out;
322 private final MessageDigest md;
323 final byte[] hdrBuf;
324 private final Deflater deflater;
325 private final int blockSize;
326
327 private long currPos;
328 private int currPtr;
329 private byte[] currBuf;
330
331 final CRC32 crc32;
332 final DeflaterOutputStream compress;
333
334 PackStream(DfsOutputStream out) {
335 this.out = out;
336
337 hdrBuf = new byte[32];
338 md = Constants.newMessageDigest();
339 crc32 = new CRC32();
340 deflater = new Deflater(compression);
341 compress = new DeflaterOutputStream(this, deflater, 8192);
342
343 int size = out.blockSize();
344 if (size <= 0)
345 size = cache.getBlockSize();
346 else if (size < cache.getBlockSize())
347 size = (cache.getBlockSize() / size) * size;
348 blockSize = size;
349 currBuf = new byte[blockSize];
350 }
351
352 long getCount() {
353 return currPos + currPtr;
354 }
355
356 void beginObject(int objectType, long length) throws IOException {
357 crc32.reset();
358 deflater.reset();
359 write(hdrBuf, 0, encodeTypeSize(objectType, length));
360 }
361
362 private int encodeTypeSize(int type, long rawLength) {
363 long nextLength = rawLength >>> 4;
364 hdrBuf[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F));
365 rawLength = nextLength;
366 int n = 1;
367 while (rawLength > 0) {
368 nextLength >>>= 7;
369 hdrBuf[n++] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (rawLength & 0x7F));
370 rawLength = nextLength;
371 }
372 return n;
373 }
374
375 @Override
376 public void write(int b) throws IOException {
377 hdrBuf[0] = (byte) b;
378 write(hdrBuf, 0, 1);
379 }
380
381 @Override
382 public void write(byte[] data, int off, int len) throws IOException {
383 crc32.update(data, off, len);
384 md.update(data, off, len);
385 writeNoHash(data, off, len);
386 }
387
388 private void writeNoHash(byte[] data, int off, int len)
389 throws IOException {
390 while (0 < len) {
391 int n = Math.min(len, currBuf.length - currPtr);
392 if (n == 0) {
393 flushBlock();
394 currBuf = new byte[blockSize];
395 continue;
396 }
397
398 System.arraycopy(data, off, currBuf, currPtr, n);
399 off += n;
400 len -= n;
401 currPtr += n;
402 }
403 }
404
405 private void flushBlock() throws IOException {
406 out.write(currBuf, 0, currPtr);
407
408 byte[] buf;
409 if (currPtr == currBuf.length)
410 buf = currBuf;
411 else
412 buf = copyOf(currBuf, 0, currPtr);
413 cache.put(new DfsBlock(packKey, currPos, buf));
414
415 currPos += currPtr;
416 currPtr = 0;
417 currBuf = null;
418 }
419
420 private byte[] copyOf(byte[] src, int ptr, int cnt) {
421 byte[] dst = new byte[cnt];
422 System.arraycopy(src, ptr, dst, 0, cnt);
423 return dst;
424 }
425
426 byte[] writePackFooter() throws IOException {
427 byte[] packHash = md.digest();
428 writeNoHash(packHash, 0, packHash.length);
429 if (currPtr != 0)
430 flushBlock();
431 return packHash;
432 }
433
434 int read(long pos, byte[] dst, int ptr, int cnt) throws IOException {
435 int r = 0;
436 while (pos < currPos && r < cnt) {
437 DfsBlock b = getOrLoadBlock(pos);
438 int n = b.copy(pos, dst, ptr + r, cnt - r);
439 pos += n;
440 r += n;
441 }
442 if (currPos <= pos && r < cnt) {
443 int s = (int) (pos - currPos);
444 int n = Math.min(currPtr - s, cnt - r);
445 System.arraycopy(currBuf, s, dst, ptr + r, n);
446 r += n;
447 }
448 return r;
449 }
450
451 byte[] inflate(DfsReader ctx, long pos, int len) throws IOException,
452 DataFormatException {
453 byte[] dstbuf;
454 try {
455 dstbuf = new byte[len];
456 } catch (OutOfMemoryError noMemory) {
457 return null;
458 }
459
460 Inflater inf = ctx.inflater();
461 pos += setInput(pos, inf);
462 for (int dstoff = 0;;) {
463 int n = inf.inflate(dstbuf, dstoff, dstbuf.length - dstoff);
464 dstoff += n;
465 if (inf.finished())
466 return dstbuf;
467 if (inf.needsInput())
468 pos += setInput(pos, inf);
469 else if (n == 0)
470 throw new DataFormatException();
471 }
472 }
473
474 private int setInput(long pos, Inflater inf)
475 throws IOException, DataFormatException {
476 if (pos < currPos)
477 return getOrLoadBlock(pos).setInput(pos, inf);
478 if (pos < currPos + currPtr) {
479 int s = (int) (pos - currPos);
480 int n = currPtr - s;
481 inf.setInput(currBuf, s, n);
482 return n;
483 }
484 throw new EOFException(JGitText.get().unexpectedEofInPack);
485 }
486
487 private DfsBlock getOrLoadBlock(long pos) throws IOException {
488 long s = toBlockStart(pos);
489 DfsBlock b = cache.get(packKey, s);
490 if (b != null)
491 return b;
492
493 byte[] d = new byte[blockSize];
494 for (int p = 0; p < blockSize;) {
495 int n = out.read(s + p, ByteBuffer.wrap(d, p, blockSize - p));
496 if (n <= 0)
497 throw new EOFException(JGitText.get().unexpectedEofInPack);
498 p += n;
499 }
500 b = new DfsBlock(packKey, s, d);
501 cache.put(b);
502 return b;
503 }
504
505 private long toBlockStart(long pos) {
506 return (pos / blockSize) * blockSize;
507 }
508
509 @Override
510 public void close() throws IOException {
511 deflater.end();
512 out.close();
513 }
514 }
515
516 private class Reader extends ObjectReader {
517 private final DfsReader ctx = db.newReader();
518
519 @Override
520 public ObjectReader newReader() {
521 return db.newReader();
522 }
523
524 @Override
525 public Collection<ObjectId> resolve(AbbreviatedObjectId id)
526 throws IOException {
527 Collection<ObjectId> stored = ctx.resolve(id);
528 if (objectList == null)
529 return stored;
530
531 Set<ObjectId> r = new HashSet<>(stored.size() + 2);
532 r.addAll(stored);
533 for (PackedObjectInfo obj : objectList) {
534 if (id.prefixCompare(obj) == 0)
535 r.add(obj.copy());
536 }
537 return r;
538 }
539
540 @Override
541 public ObjectLoader open(AnyObjectId objectId, int typeHint)
542 throws IOException {
543 if (objectMap == null)
544 return ctx.open(objectId, typeHint);
545
546 PackedObjectInfo obj = objectMap.get(objectId);
547 if (obj == null)
548 return ctx.open(objectId, typeHint);
549
550 byte[] buf = buffer();
551 int cnt = packOut.read(obj.getOffset(), buf, 0, 20);
552 if (cnt <= 0)
553 throw new EOFException(JGitText.get().unexpectedEofInPack);
554
555 int c = buf[0] & 0xff;
556 int type = (c >> 4) & 7;
557 if (type == OBJ_OFS_DELTA || type == OBJ_REF_DELTA)
558 throw new IOException(MessageFormat.format(
559 JGitText.get().cannotReadBackDelta, Integer.toString(type)));
560 if (typeHint != OBJ_ANY && type != typeHint) {
561 throw new IncorrectObjectTypeException(objectId.copy(), typeHint);
562 }
563
564 long sz = c & 0x0f;
565 int ptr = 1;
566 int shift = 4;
567 while ((c & 0x80) != 0) {
568 if (ptr >= cnt)
569 throw new EOFException(JGitText.get().unexpectedEofInPack);
570 c = buf[ptr++] & 0xff;
571 sz += ((long) (c & 0x7f)) << shift;
572 shift += 7;
573 }
574
575 long zpos = obj.getOffset() + ptr;
576 if (sz < ctx.getStreamFileThreshold()) {
577 byte[] data = inflate(obj, zpos, (int) sz);
578 if (data != null)
579 return new ObjectLoader.SmallObject(type, data);
580 }
581 return new StreamLoader(obj.copy(), type, sz, packKey, zpos);
582 }
583
584 private byte[] inflate(PackedObjectInfo obj, long zpos, int sz)
585 throws IOException, CorruptObjectException {
586 try {
587 return packOut.inflate(ctx, zpos, sz);
588 } catch (DataFormatException dfe) {
589 throw new CorruptObjectException(
590 MessageFormat.format(
591 JGitText.get().objectAtHasBadZlibStream,
592 Long.valueOf(obj.getOffset()),
593 packDsc.getFileName(PackExt.PACK)),
594 dfe);
595 }
596 }
597
598 @Override
599 public boolean has(AnyObjectId objectId) throws IOException {
600 return (objectMap != null && objectMap.contains(objectId))
601 || ctx.has(objectId);
602 }
603
604 @Override
605 public Set<ObjectId> getShallowCommits() throws IOException {
606 return ctx.getShallowCommits();
607 }
608
609 @Override
610 public ObjectInserter getCreatedFromInserter() {
611 return DfsInserter.this;
612 }
613
614 @Override
615 public void close() {
616 ctx.close();
617 }
618 }
619
620 private class StreamLoader extends ObjectLoader {
621 private final ObjectId id;
622 private final int type;
623 private final long size;
624
625 private final DfsStreamKey srcPack;
626 private final long pos;
627
628 StreamLoader(ObjectId id, int type, long sz,
629 DfsStreamKey key, long pos) {
630 this.id = id;
631 this.type = type;
632 this.size = sz;
633 this.srcPack = key;
634 this.pos = pos;
635 }
636
637 @Override
638 public ObjectStream openStream() throws IOException {
639 @SuppressWarnings("resource")
640 final DfsReader ctx = db.newReader();
641 if (srcPack != packKey) {
642 try {
643
644
645 return ctx.open(id, type).openStream();
646 } finally {
647 ctx.close();
648 }
649 }
650
651 int bufsz = 8192;
652 final Inflater inf = ctx.inflater();
653 return new ObjectStream.Filter(type,
654 size, new BufferedInputStream(new InflaterInputStream(
655 new ReadBackStream(pos), inf, bufsz), bufsz)) {
656 @Override
657 public void close() throws IOException {
658 ctx.close();
659 super.close();
660 }
661 };
662 }
663
664 @Override
665 public int getType() {
666 return type;
667 }
668
669 @Override
670 public long getSize() {
671 return size;
672 }
673
674 @Override
675 public boolean isLarge() {
676 return true;
677 }
678
679 @Override
680 public byte[] getCachedBytes() throws LargeObjectException {
681 throw new LargeObjectException.ExceedsLimit(
682 db.getReaderOptions().getStreamFileThreshold(), size);
683 }
684 }
685
686 private final class ReadBackStream extends InputStream {
687 private long pos;
688
689 ReadBackStream(long offset) {
690 pos = offset;
691 }
692
693 @Override
694 public int read() throws IOException {
695 byte[] b = new byte[1];
696 int n = read(b);
697 return n == 1 ? b[0] & 0xff : -1;
698 }
699
700 @Override
701 public int read(byte[] buf, int ptr, int len) throws IOException {
702 int n = packOut.read(pos, buf, ptr, len);
703 if (n > 0) {
704 pos += n;
705 }
706 return n;
707 }
708 }
709 }