1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.storage.dfs;
45
46 import static org.eclipse.jgit.internal.storage.pack.PackExt.INDEX;
47 import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
48 import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
49 import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;
50
51 import java.io.BufferedInputStream;
52 import java.io.EOFException;
53 import java.io.IOException;
54 import java.io.InputStream;
55 import java.io.OutputStream;
56 import java.nio.ByteBuffer;
57 import java.security.MessageDigest;
58 import java.text.MessageFormat;
59 import java.util.Collection;
60 import java.util.Collections;
61 import java.util.HashSet;
62 import java.util.List;
63 import java.util.Set;
64 import java.util.zip.CRC32;
65 import java.util.zip.DataFormatException;
66 import java.util.zip.Deflater;
67 import java.util.zip.DeflaterOutputStream;
68 import java.util.zip.Inflater;
69 import java.util.zip.InflaterInputStream;
70
71 import org.eclipse.jgit.errors.CorruptObjectException;
72 import org.eclipse.jgit.errors.IncorrectObjectTypeException;
73 import org.eclipse.jgit.errors.LargeObjectException;
74 import org.eclipse.jgit.internal.JGitText;
75 import org.eclipse.jgit.internal.storage.file.PackIndex;
76 import org.eclipse.jgit.internal.storage.file.PackIndexWriter;
77 import org.eclipse.jgit.internal.storage.pack.PackExt;
78 import org.eclipse.jgit.lib.AbbreviatedObjectId;
79 import org.eclipse.jgit.lib.AnyObjectId;
80 import org.eclipse.jgit.lib.Constants;
81 import org.eclipse.jgit.lib.ObjectId;
82 import org.eclipse.jgit.lib.ObjectIdOwnerMap;
83 import org.eclipse.jgit.lib.ObjectInserter;
84 import org.eclipse.jgit.lib.ObjectLoader;
85 import org.eclipse.jgit.lib.ObjectReader;
86 import org.eclipse.jgit.lib.ObjectStream;
87 import org.eclipse.jgit.transport.PackedObjectInfo;
88 import org.eclipse.jgit.util.BlockList;
89 import org.eclipse.jgit.util.IO;
90 import org.eclipse.jgit.util.NB;
91 import org.eclipse.jgit.util.TemporaryBuffer;
92 import org.eclipse.jgit.util.io.CountingOutputStream;
93
94
95 public class DfsInserter extends ObjectInserter {
96
97 private static final int INDEX_VERSION = 2;
98
99 final DfsObjDatabase db;
100 int compression = Deflater.BEST_COMPRESSION;
101
102 List<PackedObjectInfo> objectList;
103 ObjectIdOwnerMap<PackedObjectInfo> objectMap;
104
105 DfsBlockCache cache;
106 DfsPackKey packKey;
107 DfsPackDescription packDsc;
108 PackStream packOut;
109 private boolean rollback;
110 private boolean checkExisting = true;
111
112
113
114
115
116
117
118 protected DfsInserter(DfsObjDatabase db) {
119 this.db = db;
120 }
121
122
123
124
125
126
127 public void checkExisting(boolean check) {
128 checkExisting = check;
129 }
130
131 void setCompressionLevel(int compression) {
132 this.compression = compression;
133 }
134
135 @Override
136 public DfsPackParser newPackParser(InputStream in) throws IOException {
137 return new DfsPackParser(db, this, in);
138 }
139
140 @Override
141 public ObjectReader newReader() {
142 return new Reader();
143 }
144
145 @Override
146 public ObjectId insert(int type, byte[] data, int off, int len)
147 throws IOException {
148 ObjectId id = idFor(type, data, off, len);
149 if (objectMap != null && objectMap.contains(id))
150 return id;
151
152 if (checkExisting && db.has(id, true))
153 return id;
154
155 long offset = beginObject(type, len);
156 packOut.compress.write(data, off, len);
157 packOut.compress.finish();
158 return endObject(id, offset);
159 }
160
161 @Override
162 public ObjectId insert(int type, long len, InputStream in)
163 throws IOException {
164 byte[] buf = insertBuffer(len);
165 if (len <= buf.length) {
166 IO.readFully(in, buf, 0, (int) len);
167 return insert(type, buf, 0, (int) len);
168 }
169
170 long offset = beginObject(type, len);
171 MessageDigest md = digest();
172 md.update(Constants.encodedTypeString(type));
173 md.update((byte) ' ');
174 md.update(Constants.encodeASCII(len));
175 md.update((byte) 0);
176
177 while (0 < len) {
178 int n = in.read(buf, 0, (int) Math.min(buf.length, len));
179 if (n <= 0)
180 throw new EOFException();
181 md.update(buf, 0, n);
182 packOut.compress.write(buf, 0, n);
183 len -= n;
184 }
185 packOut.compress.finish();
186 return endObject(ObjectId.fromRaw(md.digest()), offset);
187 }
188
189 private byte[] insertBuffer(long len) {
190 byte[] buf = buffer();
191 if (len <= buf.length)
192 return buf;
193 if (len < db.getReaderOptions().getStreamFileThreshold()) {
194 try {
195 return new byte[(int) len];
196 } catch (OutOfMemoryError noMem) {
197 return buf;
198 }
199 }
200 return buf;
201 }
202
203 @Override
204 public void flush() throws IOException {
205 if (packDsc == null)
206 return;
207
208 if (packOut == null)
209 throw new IOException();
210
211 byte[] packHash = packOut.writePackFooter();
212 packDsc.addFileExt(PACK);
213 packDsc.setFileSize(PACK, packOut.getCount());
214 packOut.close();
215 packOut = null;
216
217 sortObjectsById();
218
219 PackIndex index = writePackIndex(packDsc, packHash, objectList);
220 db.commitPack(Collections.singletonList(packDsc), null);
221 rollback = false;
222
223 DfsPackFile p = cache.getOrCreate(packDsc, packKey);
224 if (index != null)
225 p.setPackIndex(index);
226 db.addPack(p);
227 clear();
228 }
229
230 @Override
231 public void close() {
232 if (packOut != null) {
233 try {
234 packOut.close();
235 } catch (IOException err) {
236
237 } finally {
238 packOut = null;
239 }
240 }
241 if (rollback && packDsc != null) {
242 try {
243 db.rollbackPack(Collections.singletonList(packDsc));
244 } finally {
245 packDsc = null;
246 rollback = false;
247 }
248 }
249 clear();
250 }
251
252 private void clear() {
253 objectList = null;
254 objectMap = null;
255 packKey = null;
256 packDsc = null;
257 }
258
259 private long beginObject(int type, long len) throws IOException {
260 if (packOut == null)
261 beginPack();
262 long offset = packOut.getCount();
263 packOut.beginObject(type, len);
264 return offset;
265 }
266
267 private ObjectId endObject(ObjectId id, long offset) {
268 PackedObjectInfo obj = new PackedObjectInfo(id);
269 obj.setOffset(offset);
270 obj.setCRC((int) packOut.crc32.getValue());
271 objectList.add(obj);
272 objectMap.addIfAbsent(obj);
273 return id;
274 }
275
276 private void beginPack() throws IOException {
277 objectList = new BlockList<PackedObjectInfo>();
278 objectMap = new ObjectIdOwnerMap<PackedObjectInfo>();
279 cache = DfsBlockCache.getInstance();
280
281 rollback = true;
282 packDsc = db.newPack(DfsObjDatabase.PackSource.INSERT);
283 packOut = new PackStream(db.writeFile(packDsc, PACK));
284 packKey = new DfsPackKey();
285
286
287 byte[] buf = packOut.hdrBuf;
288 System.arraycopy(Constants.PACK_SIGNATURE, 0, buf, 0, 4);
289 NB.encodeInt32(buf, 4, 2);
290 NB.encodeInt32(buf, 8, 1);
291 packOut.write(buf, 0, 12);
292 }
293
294 private void sortObjectsById() {
295 Collections.sort(objectList);
296 }
297
298 PackIndex writePackIndex(DfsPackDescription pack, byte[] packHash,
299 List<PackedObjectInfo> list) throws IOException {
300 pack.setIndexVersion(INDEX_VERSION);
301 pack.setObjectCount(list.size());
302
303
304
305
306 TemporaryBuffer.Heap buf = null;
307 PackIndex packIndex = null;
308 if (list.size() <= 58000) {
309 buf = new TemporaryBuffer.Heap(2 << 20);
310 index(buf, packHash, list);
311 packIndex = PackIndex.read(buf.openInputStream());
312 }
313
314 DfsOutputStream os = db.writeFile(pack, INDEX);
315 try {
316 CountingOutputStream cnt = new CountingOutputStream(os);
317 if (buf != null)
318 buf.writeTo(cnt, null);
319 else
320 index(cnt, packHash, list);
321 pack.addFileExt(INDEX);
322 pack.setFileSize(INDEX, cnt.getCount());
323 } finally {
324 os.close();
325 }
326 return packIndex;
327 }
328
329 private static void index(OutputStream out, byte[] packHash,
330 List<PackedObjectInfo> list) throws IOException {
331 PackIndexWriter.createVersion(out, INDEX_VERSION).write(list, packHash);
332 }
333
334 private class PackStream extends OutputStream {
335 private final DfsOutputStream out;
336 private final MessageDigest md;
337 final byte[] hdrBuf;
338 private final Deflater deflater;
339 private final int blockSize;
340
341 private long currPos;
342 private int currPtr;
343 private byte[] currBuf;
344
345 final CRC32 crc32;
346 final DeflaterOutputStream compress;
347
348 PackStream(DfsOutputStream out) {
349 this.out = out;
350
351 hdrBuf = new byte[32];
352 md = Constants.newMessageDigest();
353 crc32 = new CRC32();
354 deflater = new Deflater(compression);
355 compress = new DeflaterOutputStream(this, deflater, 8192);
356
357 int size = out.blockSize();
358 if (size <= 0)
359 size = cache.getBlockSize();
360 else if (size < cache.getBlockSize())
361 size = (cache.getBlockSize() / size) * size;
362 blockSize = size;
363 currBuf = new byte[blockSize];
364 }
365
366 long getCount() {
367 return currPos + currPtr;
368 }
369
370 void beginObject(int objectType, long length) throws IOException {
371 crc32.reset();
372 deflater.reset();
373 write(hdrBuf, 0, encodeTypeSize(objectType, length));
374 }
375
376 private int encodeTypeSize(int type, long rawLength) {
377 long nextLength = rawLength >>> 4;
378 hdrBuf[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F));
379 rawLength = nextLength;
380 int n = 1;
381 while (rawLength > 0) {
382 nextLength >>>= 7;
383 hdrBuf[n++] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (rawLength & 0x7F));
384 rawLength = nextLength;
385 }
386 return n;
387 }
388
389 @Override
390 public void write(final int b) throws IOException {
391 hdrBuf[0] = (byte) b;
392 write(hdrBuf, 0, 1);
393 }
394
395 @Override
396 public void write(byte[] data, int off, int len) throws IOException {
397 crc32.update(data, off, len);
398 md.update(data, off, len);
399 writeNoHash(data, off, len);
400 }
401
402 private void writeNoHash(byte[] data, int off, int len)
403 throws IOException {
404 while (0 < len) {
405 int n = Math.min(len, currBuf.length - currPtr);
406 if (n == 0) {
407 flushBlock();
408 currBuf = new byte[blockSize];
409 continue;
410 }
411
412 System.arraycopy(data, off, currBuf, currPtr, n);
413 off += n;
414 len -= n;
415 currPtr += n;
416 }
417 }
418
419 private void flushBlock() throws IOException {
420 out.write(currBuf, 0, currPtr);
421
422 byte[] buf;
423 if (currPtr == currBuf.length)
424 buf = currBuf;
425 else
426 buf = copyOf(currBuf, 0, currPtr);
427 cache.put(new DfsBlock(packKey, currPos, buf));
428
429 currPos += currPtr;
430 currPtr = 0;
431 currBuf = null;
432 }
433
434 private byte[] copyOf(byte[] src, int ptr, int cnt) {
435 byte[] dst = new byte[cnt];
436 System.arraycopy(src, ptr, dst, 0, cnt);
437 return dst;
438 }
439
440 byte[] writePackFooter() throws IOException {
441 byte[] packHash = md.digest();
442 writeNoHash(packHash, 0, packHash.length);
443 if (currPtr != 0)
444 flushBlock();
445 return packHash;
446 }
447
448 int read(long pos, byte[] dst, int ptr, int cnt) throws IOException {
449 int r = 0;
450 while (pos < currPos && r < cnt) {
451 DfsBlock b = getOrLoadBlock(pos);
452 int n = b.copy(pos, dst, ptr + r, cnt - r);
453 pos += n;
454 r += n;
455 }
456 if (currPos <= pos && r < cnt) {
457 int s = (int) (pos - currPos);
458 int n = Math.min(currPtr - s, cnt - r);
459 System.arraycopy(currBuf, s, dst, ptr + r, n);
460 r += n;
461 }
462 return r;
463 }
464
465 byte[] inflate(DfsReader ctx, long pos, int len) throws IOException,
466 DataFormatException {
467 byte[] dstbuf;
468 try {
469 dstbuf = new byte[len];
470 } catch (OutOfMemoryError noMemory) {
471 return null;
472 }
473
474 Inflater inf = ctx.inflater();
475 pos += setInput(pos, inf);
476 for (int dstoff = 0;;) {
477 int n = inf.inflate(dstbuf, dstoff, dstbuf.length - dstoff);
478 dstoff += n;
479 if (inf.finished())
480 return dstbuf;
481 if (inf.needsInput())
482 pos += setInput(pos, inf);
483 else if (n == 0)
484 throw new DataFormatException();
485 }
486 }
487
488 private int setInput(long pos, Inflater inf)
489 throws IOException, DataFormatException {
490 if (pos < currPos)
491 return getOrLoadBlock(pos).setInput(pos, inf);
492 if (pos < currPos + currPtr) {
493 int s = (int) (pos - currPos);
494 int n = currPtr - s;
495 inf.setInput(currBuf, s, n);
496 return n;
497 }
498 throw new EOFException(DfsText.get().unexpectedEofInPack);
499 }
500
501 private DfsBlock getOrLoadBlock(long pos) throws IOException {
502 long s = toBlockStart(pos);
503 DfsBlock b = cache.get(packKey, s);
504 if (b != null)
505 return b;
506
507 byte[] d = new byte[blockSize];
508 for (int p = 0; p < blockSize;) {
509 int n = out.read(s + p, ByteBuffer.wrap(d, p, blockSize - p));
510 if (n <= 0)
511 throw new EOFException(DfsText.get().unexpectedEofInPack);
512 p += n;
513 }
514 b = new DfsBlock(packKey, s, d);
515 cache.put(b);
516 return b;
517 }
518
519 private long toBlockStart(long pos) {
520 return (pos / blockSize) * blockSize;
521 }
522
523 @Override
524 public void close() throws IOException {
525 deflater.end();
526 out.close();
527 }
528 }
529
530 private class Reader extends ObjectReader {
531 private final DfsReader ctx = new DfsReader(db);
532
533 @Override
534 public ObjectReader newReader() {
535 return db.newReader();
536 }
537
538 @Override
539 public Collection<ObjectId> resolve(AbbreviatedObjectId id)
540 throws IOException {
541 Collection<ObjectId> stored = ctx.resolve(id);
542 if (objectList == null)
543 return stored;
544
545 Set<ObjectId> r = new HashSet<ObjectId>(stored.size() + 2);
546 r.addAll(stored);
547 for (PackedObjectInfo obj : objectList) {
548 if (id.prefixCompare(obj) == 0)
549 r.add(obj.copy());
550 }
551 return r;
552 }
553
554 @Override
555 public ObjectLoader open(AnyObjectId objectId, int typeHint)
556 throws IOException {
557 if (objectMap == null)
558 return ctx.open(objectId, typeHint);
559
560 PackedObjectInfo obj = objectMap.get(objectId);
561 if (obj == null)
562 return ctx.open(objectId, typeHint);
563
564 byte[] buf = buffer();
565 int cnt = packOut.read(obj.getOffset(), buf, 0, 20);
566 if (cnt <= 0)
567 throw new EOFException(DfsText.get().unexpectedEofInPack);
568
569 int c = buf[0] & 0xff;
570 int type = (c >> 4) & 7;
571 if (type == OBJ_OFS_DELTA || type == OBJ_REF_DELTA)
572 throw new IOException(MessageFormat.format(
573 DfsText.get().cannotReadBackDelta, Integer.toString(type)));
574 if (typeHint != OBJ_ANY && type != typeHint) {
575 throw new IncorrectObjectTypeException(objectId.copy(), typeHint);
576 }
577
578 long sz = c & 0x0f;
579 int ptr = 1;
580 int shift = 4;
581 while ((c & 0x80) != 0) {
582 if (ptr >= cnt)
583 throw new EOFException(DfsText.get().unexpectedEofInPack);
584 c = buf[ptr++] & 0xff;
585 sz += ((long) (c & 0x7f)) << shift;
586 shift += 7;
587 }
588
589 long zpos = obj.getOffset() + ptr;
590 if (sz < ctx.getStreamFileThreshold()) {
591 byte[] data = inflate(obj, zpos, (int) sz);
592 if (data != null)
593 return new ObjectLoader.SmallObject(type, data);
594 }
595 return new StreamLoader(obj.copy(), type, sz, packKey, zpos);
596 }
597
598 private byte[] inflate(PackedObjectInfo obj, long zpos, int sz)
599 throws IOException, CorruptObjectException {
600 try {
601 return packOut.inflate(ctx, zpos, sz);
602 } catch (DataFormatException dfe) {
603 CorruptObjectException coe = new CorruptObjectException(
604 MessageFormat.format(
605 JGitText.get().objectAtHasBadZlibStream,
606 Long.valueOf(obj.getOffset()),
607 packDsc.getFileName(PackExt.PACK)));
608 coe.initCause(dfe);
609 throw coe;
610 }
611 }
612
613 @Override
614 public Set<ObjectId> getShallowCommits() throws IOException {
615 return ctx.getShallowCommits();
616 }
617
618 @Override
619 public ObjectInserter getCreatedFromInserter() {
620 return DfsInserter.this;
621 }
622
623 @Override
624 public void close() {
625 ctx.close();
626 }
627 }
628
629 private class StreamLoader extends ObjectLoader {
630 private final ObjectId id;
631 private final int type;
632 private final long size;
633
634 private final DfsPackKey srcPack;
635 private final long pos;
636
637 StreamLoader(ObjectId id, int type, long sz,
638 DfsPackKey key, long pos) {
639 this.id = id;
640 this.type = type;
641 this.size = sz;
642 this.srcPack = key;
643 this.pos = pos;
644 }
645
646 @Override
647 public ObjectStream openStream() throws IOException {
648 final DfsReader ctx = new DfsReader(db);
649 if (srcPack != packKey) {
650 try {
651
652
653 return ctx.open(id, type).openStream();
654 } finally {
655 ctx.close();
656 }
657 }
658
659 int bufsz = 8192;
660 final Inflater inf = ctx.inflater();
661 return new ObjectStream.Filter(type,
662 size, new BufferedInputStream(new InflaterInputStream(
663 new ReadBackStream(pos), inf, bufsz), bufsz)) {
664 @Override
665 public void close() throws IOException {
666 ctx.close();
667 super.close();
668 }
669 };
670 }
671
672 @Override
673 public int getType() {
674 return type;
675 }
676
677 @Override
678 public long getSize() {
679 return size;
680 }
681
682 @Override
683 public boolean isLarge() {
684 return true;
685 }
686
687 @Override
688 public byte[] getCachedBytes() throws LargeObjectException {
689 throw new LargeObjectException.ExceedsLimit(
690 db.getReaderOptions().getStreamFileThreshold(), size);
691 }
692 }
693
694 private final class ReadBackStream extends InputStream {
695 private long pos;
696
697 ReadBackStream(long offset) {
698 pos = offset;
699 }
700
701 @Override
702 public int read() throws IOException {
703 byte[] b = new byte[1];
704 int n = read(b);
705 return n == 1 ? b[0] & 0xff : -1;
706 }
707
708 @Override
709 public int read(byte[] buf, int ptr, int len) throws IOException {
710 int n = packOut.read(pos, buf, ptr, len);
711 if (n > 0) {
712 pos += n;
713 }
714 return n;
715 }
716 }
717 }