1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.storage.dfs;
45
46 import static org.eclipse.jgit.internal.storage.pack.PackExt.INDEX;
47 import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
48 import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
49 import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;
50
51 import java.io.BufferedInputStream;
52 import java.io.EOFException;
53 import java.io.IOException;
54 import java.io.InputStream;
55 import java.io.OutputStream;
56 import java.nio.ByteBuffer;
57 import java.security.MessageDigest;
58 import java.text.MessageFormat;
59 import java.util.Collection;
60 import java.util.Collections;
61 import java.util.HashSet;
62 import java.util.List;
63 import java.util.Set;
64 import java.util.zip.CRC32;
65 import java.util.zip.DataFormatException;
66 import java.util.zip.Deflater;
67 import java.util.zip.DeflaterOutputStream;
68 import java.util.zip.Inflater;
69 import java.util.zip.InflaterInputStream;
70
71 import org.eclipse.jgit.errors.CorruptObjectException;
72 import org.eclipse.jgit.errors.LargeObjectException;
73 import org.eclipse.jgit.internal.JGitText;
74 import org.eclipse.jgit.internal.storage.file.PackIndex;
75 import org.eclipse.jgit.internal.storage.file.PackIndexWriter;
76 import org.eclipse.jgit.internal.storage.pack.PackExt;
77 import org.eclipse.jgit.lib.AbbreviatedObjectId;
78 import org.eclipse.jgit.lib.AnyObjectId;
79 import org.eclipse.jgit.lib.Constants;
80 import org.eclipse.jgit.lib.ObjectId;
81 import org.eclipse.jgit.lib.ObjectIdOwnerMap;
82 import org.eclipse.jgit.lib.ObjectInserter;
83 import org.eclipse.jgit.lib.ObjectLoader;
84 import org.eclipse.jgit.lib.ObjectReader;
85 import org.eclipse.jgit.lib.ObjectStream;
86 import org.eclipse.jgit.transport.PackedObjectInfo;
87 import org.eclipse.jgit.util.BlockList;
88 import org.eclipse.jgit.util.IO;
89 import org.eclipse.jgit.util.NB;
90 import org.eclipse.jgit.util.TemporaryBuffer;
91 import org.eclipse.jgit.util.io.CountingOutputStream;
92
93
94 public class DfsInserter extends ObjectInserter {
95
96 private static final int INDEX_VERSION = 2;
97
98 final DfsObjDatabase db;
99 int compression = Deflater.BEST_COMPRESSION;
100
101 List<PackedObjectInfo> objectList;
102 ObjectIdOwnerMap<PackedObjectInfo> objectMap;
103
104 DfsBlockCache cache;
105 DfsPackKey packKey;
106 DfsPackDescription packDsc;
107 PackStream packOut;
108 private boolean rollback;
109
110
111
112
113
114
115
116 protected DfsInserter(DfsObjDatabase db) {
117 this.db = db;
118 }
119
120 void setCompressionLevel(int compression) {
121 this.compression = compression;
122 }
123
124 @Override
125 public DfsPackParser newPackParser(InputStream in) throws IOException {
126 return new DfsPackParser(db, this, in);
127 }
128
129 @Override
130 public ObjectReader newReader() {
131 return new Reader();
132 }
133
134 @Override
135 public ObjectId insert(int type, byte[] data, int off, int len)
136 throws IOException {
137 ObjectId id = idFor(type, data, off, len);
138 if (objectMap != null && objectMap.contains(id))
139 return id;
140
141 if (db.has(id, true))
142 return id;
143
144 long offset = beginObject(type, len);
145 packOut.compress.write(data, off, len);
146 packOut.compress.finish();
147 return endObject(id, offset);
148 }
149
150 @Override
151 public ObjectId insert(int type, long len, InputStream in)
152 throws IOException {
153 byte[] buf = insertBuffer(len);
154 if (len <= buf.length) {
155 IO.readFully(in, buf, 0, (int) len);
156 return insert(type, buf, 0, (int) len);
157 }
158
159 long offset = beginObject(type, len);
160 MessageDigest md = digest();
161 md.update(Constants.encodedTypeString(type));
162 md.update((byte) ' ');
163 md.update(Constants.encodeASCII(len));
164 md.update((byte) 0);
165
166 while (0 < len) {
167 int n = in.read(buf, 0, (int) Math.min(buf.length, len));
168 if (n <= 0)
169 throw new EOFException();
170 md.update(buf, 0, n);
171 packOut.compress.write(buf, 0, n);
172 len -= n;
173 }
174 packOut.compress.finish();
175 return endObject(ObjectId.fromRaw(md.digest()), offset);
176 }
177
178 private byte[] insertBuffer(long len) {
179 byte[] buf = buffer();
180 if (len <= buf.length)
181 return buf;
182 if (len < db.getReaderOptions().getStreamFileThreshold()) {
183 try {
184 return new byte[(int) len];
185 } catch (OutOfMemoryError noMem) {
186 return buf;
187 }
188 }
189 return buf;
190 }
191
192 @Override
193 public void flush() throws IOException {
194 if (packDsc == null)
195 return;
196
197 if (packOut == null)
198 throw new IOException();
199
200 byte[] packHash = packOut.writePackFooter();
201 packDsc.addFileExt(PACK);
202 packDsc.setFileSize(PACK, packOut.getCount());
203 packOut.close();
204 packOut = null;
205
206 sortObjectsById();
207
208 PackIndex index = writePackIndex(packDsc, packHash, objectList);
209 db.commitPack(Collections.singletonList(packDsc), null);
210 rollback = false;
211
212 DfsPackFile p = cache.getOrCreate(packDsc, packKey);
213 if (index != null)
214 p.setPackIndex(index);
215 db.addPack(p);
216 clear();
217 }
218
219 @Override
220 public void close() {
221 if (packOut != null) {
222 try {
223 packOut.close();
224 } catch (IOException err) {
225
226 } finally {
227 packOut = null;
228 }
229 }
230 if (rollback && packDsc != null) {
231 try {
232 db.rollbackPack(Collections.singletonList(packDsc));
233 } finally {
234 packDsc = null;
235 rollback = false;
236 }
237 }
238 clear();
239 }
240
241 private void clear() {
242 objectList = null;
243 objectMap = null;
244 packKey = null;
245 packDsc = null;
246 }
247
248 private long beginObject(int type, long len) throws IOException {
249 if (packOut == null)
250 beginPack();
251 long offset = packOut.getCount();
252 packOut.beginObject(type, len);
253 return offset;
254 }
255
256 private ObjectId endObject(ObjectId id, long offset) {
257 PackedObjectInfo obj = new PackedObjectInfo(id);
258 obj.setOffset(offset);
259 obj.setCRC((int) packOut.crc32.getValue());
260 objectList.add(obj);
261 objectMap.addIfAbsent(obj);
262 return id;
263 }
264
265 private void beginPack() throws IOException {
266 objectList = new BlockList<PackedObjectInfo>();
267 objectMap = new ObjectIdOwnerMap<PackedObjectInfo>();
268 cache = DfsBlockCache.getInstance();
269
270 rollback = true;
271 packDsc = db.newPack(DfsObjDatabase.PackSource.INSERT);
272 packOut = new PackStream(db.writeFile(packDsc, PACK));
273 packKey = new DfsPackKey();
274
275
276 byte[] buf = packOut.hdrBuf;
277 System.arraycopy(Constants.PACK_SIGNATURE, 0, buf, 0, 4);
278 NB.encodeInt32(buf, 4, 2);
279 NB.encodeInt32(buf, 8, 1);
280 packOut.write(buf, 0, 12);
281 }
282
283 private void sortObjectsById() {
284 Collections.sort(objectList);
285 }
286
287 PackIndex writePackIndex(DfsPackDescription pack, byte[] packHash,
288 List<PackedObjectInfo> list) throws IOException {
289 pack.setIndexVersion(INDEX_VERSION);
290 pack.setObjectCount(list.size());
291
292
293
294
295 TemporaryBuffer.Heap buf = null;
296 PackIndex packIndex = null;
297 if (list.size() <= 58000) {
298 buf = new TemporaryBuffer.Heap(2 << 20);
299 index(buf, packHash, list);
300 packIndex = PackIndex.read(buf.openInputStream());
301 }
302
303 DfsOutputStream os = db.writeFile(pack, INDEX);
304 try {
305 CountingOutputStream cnt = new CountingOutputStream(os);
306 if (buf != null)
307 buf.writeTo(cnt, null);
308 else
309 index(cnt, packHash, list);
310 pack.addFileExt(INDEX);
311 pack.setFileSize(INDEX, cnt.getCount());
312 } finally {
313 os.close();
314 }
315 return packIndex;
316 }
317
318 private static void index(OutputStream out, byte[] packHash,
319 List<PackedObjectInfo> list) throws IOException {
320 PackIndexWriter.createVersion(out, INDEX_VERSION).write(list, packHash);
321 }
322
323 private class PackStream extends OutputStream {
324 private final DfsOutputStream out;
325 private final MessageDigest md;
326 final byte[] hdrBuf;
327 private final Deflater deflater;
328 private final int blockSize;
329
330 private long currPos;
331 private int currPtr;
332 private byte[] currBuf;
333
334 final CRC32 crc32;
335 final DeflaterOutputStream compress;
336
337 PackStream(DfsOutputStream out) {
338 this.out = out;
339
340 hdrBuf = new byte[32];
341 md = Constants.newMessageDigest();
342 crc32 = new CRC32();
343 deflater = new Deflater(compression);
344 compress = new DeflaterOutputStream(this, deflater, 8192);
345
346 int size = out.blockSize();
347 if (size <= 0)
348 size = cache.getBlockSize();
349 else if (size < cache.getBlockSize())
350 size = (cache.getBlockSize() / size) * size;
351 blockSize = size;
352 currBuf = new byte[blockSize];
353 }
354
355 long getCount() {
356 return currPos + currPtr;
357 }
358
359 void beginObject(int objectType, long length) throws IOException {
360 crc32.reset();
361 deflater.reset();
362 write(hdrBuf, 0, encodeTypeSize(objectType, length));
363 }
364
365 private int encodeTypeSize(int type, long rawLength) {
366 long nextLength = rawLength >>> 4;
367 hdrBuf[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F));
368 rawLength = nextLength;
369 int n = 1;
370 while (rawLength > 0) {
371 nextLength >>>= 7;
372 hdrBuf[n++] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (rawLength & 0x7F));
373 rawLength = nextLength;
374 }
375 return n;
376 }
377
378 @Override
379 public void write(final int b) throws IOException {
380 hdrBuf[0] = (byte) b;
381 write(hdrBuf, 0, 1);
382 }
383
384 @Override
385 public void write(byte[] data, int off, int len) throws IOException {
386 crc32.update(data, off, len);
387 md.update(data, off, len);
388 writeNoHash(data, off, len);
389 }
390
391 private void writeNoHash(byte[] data, int off, int len)
392 throws IOException {
393 while (0 < len) {
394 int n = Math.min(len, currBuf.length - currPtr);
395 if (n == 0) {
396 flushBlock();
397 currBuf = new byte[blockSize];
398 continue;
399 }
400
401 System.arraycopy(data, off, currBuf, currPtr, n);
402 off += n;
403 len -= n;
404 currPtr += n;
405 }
406 }
407
408 private void flushBlock() throws IOException {
409 out.write(currBuf, 0, currPtr);
410
411 byte[] buf;
412 if (currPtr == currBuf.length)
413 buf = currBuf;
414 else
415 buf = copyOf(currBuf, 0, currPtr);
416 cache.put(new DfsBlock(packKey, currPos, buf));
417
418 currPos += currPtr;
419 currPtr = 0;
420 currBuf = null;
421 }
422
423 private byte[] copyOf(byte[] src, int ptr, int cnt) {
424 byte[] dst = new byte[cnt];
425 System.arraycopy(src, ptr, dst, 0, cnt);
426 return dst;
427 }
428
429 byte[] writePackFooter() throws IOException {
430 byte[] packHash = md.digest();
431 writeNoHash(packHash, 0, packHash.length);
432 if (currPtr != 0)
433 flushBlock();
434 return packHash;
435 }
436
437 int read(long pos, byte[] dst, int ptr, int cnt) throws IOException {
438 int r = 0;
439 while (pos < currPos && r < cnt) {
440 DfsBlock b = getOrLoadBlock(pos);
441 int n = b.copy(pos, dst, ptr + r, cnt - r);
442 pos += n;
443 r += n;
444 }
445 if (currPos <= pos && r < cnt) {
446 int s = (int) (pos - currPos);
447 int n = Math.min(currPtr - s, cnt - r);
448 System.arraycopy(currBuf, s, dst, ptr + r, n);
449 r += n;
450 }
451 return r;
452 }
453
454 byte[] inflate(DfsReader ctx, long pos, int len) throws IOException,
455 DataFormatException {
456 byte[] dstbuf;
457 try {
458 dstbuf = new byte[len];
459 } catch (OutOfMemoryError noMemory) {
460 return null;
461 }
462
463 Inflater inf = ctx.inflater();
464 pos += setInput(pos, inf);
465 for (int dstoff = 0;;) {
466 int n = inf.inflate(dstbuf, dstoff, dstbuf.length - dstoff);
467 dstoff += n;
468 if (inf.finished())
469 return dstbuf;
470 if (inf.needsInput())
471 pos += setInput(pos, inf);
472 else if (n == 0)
473 throw new DataFormatException();
474 }
475 }
476
477 private int setInput(long pos, Inflater inf) throws IOException {
478 if (pos < currPos)
479 return getOrLoadBlock(pos).setInput(pos, inf);
480 if (pos < currPos + currPtr) {
481 int s = (int) (pos - currPos);
482 int n = currPtr - s;
483 inf.setInput(currBuf, s, n);
484 return n;
485 }
486 throw new EOFException(DfsText.get().unexpectedEofInPack);
487 }
488
489 private DfsBlock getOrLoadBlock(long pos) throws IOException {
490 long s = toBlockStart(pos);
491 DfsBlock b = cache.get(packKey, s);
492 if (b != null)
493 return b;
494
495 byte[] d = new byte[blockSize];
496 for (int p = 0; p < blockSize;) {
497 int n = out.read(s + p, ByteBuffer.wrap(d, p, blockSize - p));
498 if (n <= 0)
499 throw new EOFException(DfsText.get().unexpectedEofInPack);
500 p += n;
501 }
502 b = new DfsBlock(packKey, s, d);
503 cache.put(b);
504 return b;
505 }
506
507 private long toBlockStart(long pos) {
508 return (pos / blockSize) * blockSize;
509 }
510
511 @Override
512 public void close() throws IOException {
513 deflater.end();
514 out.close();
515 }
516 }
517
518 private class Reader extends ObjectReader {
519 private final DfsReader ctx = new DfsReader(db);
520
521 @Override
522 public ObjectReader newReader() {
523 return db.newReader();
524 }
525
526 @Override
527 public Collection<ObjectId> resolve(AbbreviatedObjectId id)
528 throws IOException {
529 Collection<ObjectId> stored = ctx.resolve(id);
530 if (objectList == null)
531 return stored;
532
533 Set<ObjectId> r = new HashSet<ObjectId>(stored.size() + 2);
534 r.addAll(stored);
535 for (PackedObjectInfo obj : objectList) {
536 if (id.prefixCompare(obj) == 0)
537 r.add(obj.copy());
538 }
539 return r;
540 }
541
542 @Override
543 public ObjectLoader open(AnyObjectId objectId, int typeHint)
544 throws IOException {
545 if (objectMap == null)
546 return ctx.open(objectId, typeHint);
547
548 PackedObjectInfo obj = objectMap.get(objectId);
549 if (obj == null)
550 return ctx.open(objectId, typeHint);
551
552 byte[] buf = buffer();
553 int cnt = packOut.read(obj.getOffset(), buf, 0, 20);
554 if (cnt <= 0)
555 throw new EOFException(DfsText.get().unexpectedEofInPack);
556
557 int c = buf[0] & 0xff;
558 int type = (c >> 4) & 7;
559 if (type == OBJ_OFS_DELTA || type == OBJ_REF_DELTA)
560 throw new IOException(MessageFormat.format(
561 DfsText.get().cannotReadBackDelta, Integer.toString(type)));
562
563 long sz = c & 0x0f;
564 int ptr = 1;
565 int shift = 4;
566 while ((c & 0x80) != 0) {
567 if (ptr >= cnt)
568 throw new EOFException(DfsText.get().unexpectedEofInPack);
569 c = buf[ptr++] & 0xff;
570 sz += ((long) (c & 0x7f)) << shift;
571 shift += 7;
572 }
573
574 long zpos = obj.getOffset() + ptr;
575 if (sz < ctx.getStreamFileThreshold()) {
576 byte[] data = inflate(obj, zpos, (int) sz);
577 if (data != null)
578 return new ObjectLoader.SmallObject(type, data);
579 }
580 return new StreamLoader(obj.copy(), type, sz, packKey, zpos);
581 }
582
583 private byte[] inflate(PackedObjectInfo obj, long zpos, int sz)
584 throws IOException, CorruptObjectException {
585 try {
586 return packOut.inflate(ctx, zpos, sz);
587 } catch (DataFormatException dfe) {
588 CorruptObjectException coe = new CorruptObjectException(
589 MessageFormat.format(
590 JGitText.get().objectAtHasBadZlibStream,
591 Long.valueOf(obj.getOffset()),
592 packDsc.getFileName(PackExt.PACK)));
593 coe.initCause(dfe);
594 throw coe;
595 }
596 }
597
598 @Override
599 public Set<ObjectId> getShallowCommits() throws IOException {
600 return ctx.getShallowCommits();
601 }
602
603 @Override
604 public void close() {
605 ctx.close();
606 }
607 }
608
609 private class StreamLoader extends ObjectLoader {
610 private final ObjectId id;
611 private final int type;
612 private final long size;
613
614 private final DfsPackKey srcPack;
615 private final long pos;
616
617 StreamLoader(ObjectId id, int type, long sz,
618 DfsPackKey key, long pos) {
619 this.id = id;
620 this.type = type;
621 this.size = sz;
622 this.srcPack = key;
623 this.pos = pos;
624 }
625
626 @Override
627 public ObjectStream openStream() throws IOException {
628 final DfsReader ctx = new DfsReader(db);
629 if (srcPack != packKey) {
630 try {
631
632
633 return ctx.open(id, type).openStream();
634 } finally {
635 ctx.close();
636 }
637 }
638
639 int bufsz = 8192;
640 final Inflater inf = ctx.inflater();
641 return new ObjectStream.Filter(type,
642 size, new BufferedInputStream(new InflaterInputStream(
643 new ReadBackStream(pos), inf, bufsz), bufsz)) {
644 @Override
645 public void close() throws IOException {
646 ctx.close();
647 super.close();
648 }
649 };
650 }
651
652 @Override
653 public int getType() {
654 return type;
655 }
656
657 @Override
658 public long getSize() {
659 return size;
660 }
661
662 @Override
663 public boolean isLarge() {
664 return true;
665 }
666
667 @Override
668 public byte[] getCachedBytes() throws LargeObjectException {
669 throw new LargeObjectException.ExceedsLimit(
670 db.getReaderOptions().getStreamFileThreshold(), size);
671 }
672 }
673
674 private final class ReadBackStream extends InputStream {
675 private long pos;
676
677 ReadBackStream(long offset) {
678 pos = offset;
679 }
680
681 @Override
682 public int read() throws IOException {
683 byte[] b = new byte[1];
684 int n = read(b);
685 return n == 1 ? b[0] & 0xff : -1;
686 }
687
688 @Override
689 public int read(byte[] buf, int ptr, int len) throws IOException {
690 int n = packOut.read(pos, buf, ptr, len);
691 if (n > 0) {
692 pos += n;
693 }
694 return n;
695 }
696 }
697 }