1
2
3
4
5
6
7
8
9
10
11 package org.eclipse.jgit.internal.storage.dfs;
12
13 import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
14
15 import java.io.EOFException;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.nio.ByteBuffer;
19 import java.security.MessageDigest;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.zip.CRC32;
23 import java.util.zip.Deflater;
24
25 import org.eclipse.jgit.internal.storage.file.PackIndex;
26 import org.eclipse.jgit.lib.AnyObjectId;
27 import org.eclipse.jgit.lib.Constants;
28 import org.eclipse.jgit.lib.ProgressMonitor;
29 import org.eclipse.jgit.transport.PackLock;
30 import org.eclipse.jgit.transport.PackParser;
31 import org.eclipse.jgit.transport.PackedObjectInfo;
32
33
34
35
36 public class DfsPackParser extends PackParser {
37 private final DfsObjDatabase objdb;
38
39 private final DfsInserter objins;
40
41
42 private final CRC32 crc;
43
44
45 private final MessageDigest packDigest;
46
47
48 private int blockSize;
49
50
51 private long packEnd;
52
53
54 private byte[] packHash;
55
56
57 private Deflater def;
58
59
60 private boolean isEmptyPack;
61
62
63 private DfsPackDescription packDsc;
64
65
66 private DfsStreamKey packKey;
67
68
69 private PackIndex packIndex;
70
71
72 private DfsOutputStream out;
73
74
75 private byte[] currBuf;
76 private long currPos;
77 private int currEnd;
78
79
80 private DfsBlockCache blockCache;
81
82
83 private long readPos;
84 private DfsBlock readBlock;
85
86
87
88
89
90
91
92
93
94
95
96 protected DfsPackParser(DfsObjDatabase db, DfsInserter ins, InputStream in) {
97 super(db, in);
98 this.objdb = db;
99 this.objins = ins;
100 this.crc = new CRC32();
101 this.packDigest = Constants.newMessageDigest();
102 }
103
104
105 @Override
106 public PackLock parse(ProgressMonitor receiving, ProgressMonitor resolving)
107 throws IOException {
108 boolean rollback = true;
109 try {
110 blockCache = DfsBlockCache.getInstance();
111 super.parse(receiving, resolving);
112 if (isEmptyPack)
113 return null;
114 buffer(packHash, 0, packHash.length);
115 if (currEnd != 0)
116 flushBlock();
117 out.close();
118 out = null;
119 currBuf = null;
120 readBlock = null;
121 packDsc.addFileExt(PACK);
122 packDsc.setFileSize(PACK, packEnd);
123 packDsc.setBlockSize(PACK, blockSize);
124
125 writePackIndex();
126 objdb.commitPack(Collections.singletonList(packDsc), null);
127 rollback = false;
128
129 DfsPackFile p = new DfsPackFile(blockCache, packDsc);
130 p.setBlockSize(blockSize);
131 if (packIndex != null)
132 p.setPackIndex(packIndex);
133
134 objdb.addPack(p);
135
136 return null;
137 } finally {
138 blockCache = null;
139 currBuf = null;
140 readBlock = null;
141
142 if (def != null) {
143 def.end();
144 def = null;
145 }
146
147 if (out != null) {
148 try {
149 out.close();
150 } catch (IOException err) {
151
152 }
153 out = null;
154 }
155
156 if (rollback && packDsc != null) {
157 try {
158 objdb.rollbackPack(Collections.singletonList(packDsc));
159 } finally {
160 packDsc = null;
161 }
162 }
163 }
164 }
165
166
167
168
169
170
171 public DfsPackDescription getPackDescription() {
172 return packDsc;
173 }
174
175
176 @Override
177 protected void onPackHeader(long objectCount) throws IOException {
178 if (objectCount == 0) {
179 isEmptyPack = true;
180 currBuf = new byte[256];
181 return;
182 }
183
184 packDsc = objdb.newPack(DfsObjDatabase.PackSource.RECEIVE);
185 out = objdb.writeFile(packDsc, PACK);
186 packKey = packDsc.getStreamKey(PACK);
187
188 int size = out.blockSize();
189 if (size <= 0)
190 size = blockCache.getBlockSize();
191 else if (size < blockCache.getBlockSize())
192 size = (blockCache.getBlockSize() / size) * size;
193 blockSize = size;
194 currBuf = new byte[blockSize];
195 }
196
197
198 @Override
199 protected void onBeginWholeObject(long streamPosition, int type,
200 long inflatedSize) throws IOException {
201 crc.reset();
202 }
203
204
205 @Override
206 protected void onEndWholeObject(PackedObjectInfo info) throws IOException {
207 info.setCRC((int) crc.getValue());
208 }
209
210
211 @Override
212 protected void onBeginOfsDelta(long streamPosition,
213 long baseStreamPosition, long inflatedSize) throws IOException {
214 crc.reset();
215 }
216
217
218 @Override
219 protected void onBeginRefDelta(long streamPosition, AnyObjectId baseId,
220 long inflatedSize) throws IOException {
221 crc.reset();
222 }
223
224
225 @Override
226 protected UnresolvedDelta onEndDelta() throws IOException {
227 UnresolvedDelta delta = new UnresolvedDelta();
228 delta.setCRC((int) crc.getValue());
229 return delta;
230 }
231
232
233 @Override
234 protected void onInflatedObjectData(PackedObjectInfo obj, int typeCode,
235 byte[] data) throws IOException {
236
237 }
238
239
240 @Override
241 protected void onObjectHeader(Source src, byte[] raw, int pos, int len)
242 throws IOException {
243 crc.update(raw, pos, len);
244 }
245
246
247 @Override
248 protected void onObjectData(Source src, byte[] raw, int pos, int len)
249 throws IOException {
250 crc.update(raw, pos, len);
251 }
252
253
254 @Override
255 protected void onStoreStream(byte[] raw, int pos, int len)
256 throws IOException {
257 buffer(raw, pos, len);
258 packDigest.update(raw, pos, len);
259 }
260
261 private void buffer(byte[] raw, int pos, int len) throws IOException {
262 while (0 < len) {
263 int n = Math.min(len, currBuf.length - currEnd);
264 if (n == 0) {
265 DfsBlock v = flushBlock();
266 currBuf = new byte[blockSize];
267 currEnd = 0;
268 currPos += v.size();
269 continue;
270 }
271
272 System.arraycopy(raw, pos, currBuf, currEnd, n);
273 pos += n;
274 len -= n;
275 currEnd += n;
276 packEnd += n;
277 }
278 }
279
280 private DfsBlock flushBlock() throws IOException {
281 if (isEmptyPack)
282 throw new IOException(DfsText.get().willNotStoreEmptyPack);
283
284 out.write(currBuf, 0, currEnd);
285
286 byte[] buf;
287 if (currEnd == currBuf.length) {
288 buf = currBuf;
289 } else {
290 buf = new byte[currEnd];
291 System.arraycopy(currBuf, 0, buf, 0, currEnd);
292 }
293
294 DfsBlock v = new DfsBlock(packKey, currPos, buf);
295 readBlock = v;
296 blockCache.put(v);
297 return v;
298 }
299
300
301 @Override
302 protected void onPackFooter(byte[] hash) throws IOException {
303
304
305
306
307 packHash = hash;
308 }
309
310
311 @Override
312 protected ObjectTypeAndSize seekDatabase(PackedObjectInfo obj,
313 ObjectTypeAndSize info) throws IOException {
314 readPos = obj.getOffset();
315 crc.reset();
316 return readObjectHeader(info);
317 }
318
319
320 @Override
321 protected ObjectTypeAndSize seekDatabase(UnresolvedDelta delta,
322 ObjectTypeAndSize info) throws IOException {
323 readPos = delta.getOffset();
324 crc.reset();
325 return readObjectHeader(info);
326 }
327
328
329 @Override
330 protected int readDatabase(byte[] dst, int pos, int cnt) throws IOException {
331 if (cnt == 0)
332 return 0;
333
334 if (currPos <= readPos) {
335
336 int p = (int) (readPos - currPos);
337 int n = Math.min(cnt, currEnd - p);
338 if (n == 0)
339 throw new EOFException();
340 System.arraycopy(currBuf, p, dst, pos, n);
341 readPos += n;
342 return n;
343 }
344
345 if (readBlock == null || !readBlock.contains(packKey, readPos)) {
346 long start = toBlockStart(readPos);
347 readBlock = blockCache.get(packKey, start);
348 if (readBlock == null) {
349 int size = (int) Math.min(blockSize, packEnd - start);
350 byte[] buf = new byte[size];
351 if (read(start, buf, 0, size) != size)
352 throw new EOFException();
353 readBlock = new DfsBlock(packKey, start, buf);
354 blockCache.put(readBlock);
355 }
356 }
357
358 int n = readBlock.copy(readPos, dst, pos, cnt);
359 readPos += n;
360 return n;
361 }
362
363 private int read(long pos, byte[] dst, int off, int len) throws IOException {
364 if (len == 0)
365 return 0;
366
367 int cnt = 0;
368 while (0 < len) {
369 int r = out.read(pos, ByteBuffer.wrap(dst, off, len));
370 if (r <= 0)
371 break;
372 pos += r;
373 off += r;
374 len -= r;
375 cnt += r;
376 }
377 return cnt != 0 ? cnt : -1;
378 }
379
380 private long toBlockStart(long pos) {
381 return (pos / blockSize) * blockSize;
382 }
383
384
385 @Override
386 protected boolean checkCRC(int oldCRC) {
387 return oldCRC == (int) crc.getValue();
388 }
389
390
391 @Override
392 protected boolean onAppendBase(final int typeCode, final byte[] data,
393 final PackedObjectInfo info) throws IOException {
394 info.setOffset(packEnd);
395
396 final byte[] buf = buffer();
397 int sz = data.length;
398 int len = 0;
399 buf[len++] = (byte) ((typeCode << 4) | (sz & 15));
400 sz >>>= 4;
401 while (sz > 0) {
402 buf[len - 1] |= (byte) 0x80;
403 buf[len++] = (byte) (sz & 0x7f);
404 sz >>>= 7;
405 }
406
407 packDigest.update(buf, 0, len);
408 crc.reset();
409 crc.update(buf, 0, len);
410 buffer(buf, 0, len);
411
412 if (def == null)
413 def = new Deflater(Deflater.DEFAULT_COMPRESSION, false);
414 else
415 def.reset();
416 def.setInput(data);
417 def.finish();
418
419 while (!def.finished()) {
420 len = def.deflate(buf);
421 packDigest.update(buf, 0, len);
422 crc.update(buf, 0, len);
423 buffer(buf, 0, len);
424 }
425
426 info.setCRC((int) crc.getValue());
427 return true;
428 }
429
430
431 @Override
432 protected void onEndThinPack() throws IOException {
433
434
435
436
437
438 packHash = packDigest.digest();
439 }
440
441 private void writePackIndex() throws IOException {
442 List<PackedObjectInfo> list = getSortedObjectList(null );
443 packIndex = objins.writePackIndex(packDsc, packHash, list);
444 }
445 }