View Javadoc
1   /*
2    * Copyright (C) 2011, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.storage.dfs;
45  
46  import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
47  
48  import java.io.EOFException;
49  import java.io.IOException;
50  import java.io.InputStream;
51  import java.nio.ByteBuffer;
52  import java.security.MessageDigest;
53  import java.util.Collections;
54  import java.util.List;
55  import java.util.zip.CRC32;
56  import java.util.zip.Deflater;
57  
58  import org.eclipse.jgit.internal.storage.file.PackIndex;
59  import org.eclipse.jgit.internal.storage.file.PackLock;
60  import org.eclipse.jgit.lib.AnyObjectId;
61  import org.eclipse.jgit.lib.Constants;
62  import org.eclipse.jgit.lib.ProgressMonitor;
63  import org.eclipse.jgit.transport.PackParser;
64  import org.eclipse.jgit.transport.PackedObjectInfo;
65  
66  /**
67   * Parses a pack stream into the DFS, by creating a new pack and index.
68   */
69  public class DfsPackParser extends PackParser {
70  	private final DfsObjDatabase objdb;
71  
72  	private final DfsInserter objins;
73  
74  	/** CRC-32 computation for objects that are appended onto the pack. */
75  	private final CRC32 crc;
76  
77  	/** Running SHA-1 of the entire pack stream. */
78  	private final MessageDigest packDigest;
79  
80  	/** Block size to use when caching data for read back. */
81  	private int blockSize;
82  
83  	/** Current end of the pack file. */
84  	private long packEnd;
85  
86  	/** Checksum of the entire pack file. */
87  	private byte[] packHash;
88  
89  	/** Compresses delta bases when completing a thin pack. */
90  	private Deflater def;
91  
92  	/** True if the pack is an empty pack. */
93  	private boolean isEmptyPack;
94  
95  	/** Name of the pack file, computed in {@link #onPackHeader(long)}. */
96  	private DfsPackDescription packDsc;
97  
98  	/** Key used during delta resolution reading delta chains. */
99  	private DfsStreamKey packKey;
100 
101 	/** If the index was small enough, the entire index after writing. */
102 	private PackIndex packIndex;
103 
104 	/** Stream to the DFS storage, opened during {@link #onPackHeader(long)}. */
105 	private DfsOutputStream out;
106 
107 	/** Data being written that has not yet been cached. */
108 	private byte[] currBuf;
109 	private long currPos; // Position of currBuf in the file.
110 	private int currEnd; // Position inside of currBuf to append to next.
111 
112 	/** Cache the chunks were stored into or get read back from. */
113 	private DfsBlockCache blockCache;
114 
115 	/** Cached block that is being read. */
116 	private long readPos;
117 	private DfsBlock readBlock;
118 
119 	/**
120 	 * Initialize a new pack parser.
121 	 *
122 	 * @param db
123 	 *            database the objects will be imported into.
124 	 * @param ins
125 	 *            inserter the parser will use to help it inject the objects.
126 	 * @param in
127 	 *            the stream to parse.
128 	 */
129 	protected DfsPackParser(DfsObjDatabase db, DfsInserter ins, InputStream in) {
130 		super(db, in);
131 		this.objdb = db;
132 		this.objins = ins;
133 		this.crc = new CRC32();
134 		this.packDigest = Constants.newMessageDigest();
135 	}
136 
137 	/** {@inheritDoc} */
138 	@Override
139 	public PackLock parse(ProgressMonitor receiving, ProgressMonitor resolving)
140 			throws IOException {
141 		boolean rollback = true;
142 		try {
143 			blockCache = DfsBlockCache.getInstance();
144 			super.parse(receiving, resolving);
145 			if (isEmptyPack)
146 				return null;
147 			buffer(packHash, 0, packHash.length);
148 			if (currEnd != 0)
149 				flushBlock();
150 			out.close();
151 			out = null;
152 			currBuf = null;
153 			readBlock = null;
154 			packDsc.addFileExt(PACK);
155 			packDsc.setFileSize(PACK, packEnd);
156 			packDsc.setBlockSize(PACK, blockSize);
157 
158 			writePackIndex();
159 			objdb.commitPack(Collections.singletonList(packDsc), null);
160 			rollback = false;
161 
162 			DfsPackFile p = new DfsPackFile(blockCache, packDsc);
163 			p.setBlockSize(blockSize);
164 			if (packIndex != null)
165 				p.setPackIndex(packIndex);
166 
167 			objdb.addPack(p);
168 
169 			return null;
170 		} finally {
171 			blockCache = null;
172 			currBuf = null;
173 			readBlock = null;
174 
175 			if (def != null) {
176 				def.end();
177 				def = null;
178 			}
179 
180 			if (out != null) {
181 				try {
182 					out.close();
183 				} catch (IOException err) {
184 					// Ignore a close error, rollbackPack is also important.
185 				}
186 				out = null;
187 			}
188 
189 			if (rollback && packDsc != null) {
190 				try {
191 					objdb.rollbackPack(Collections.singletonList(packDsc));
192 				} finally {
193 					packDsc = null;
194 				}
195 			}
196 		}
197 	}
198 
199 	/**
200 	 * Get description of the imported pack, if one was made.
201 	 *
202 	 * @return description of the imported pack, if one was made.
203 	 */
204 	public DfsPackDescription getPackDescription() {
205 		return packDsc;
206 	}
207 
208 	/** {@inheritDoc} */
209 	@Override
210 	protected void onPackHeader(long objectCount) throws IOException {
211 		if (objectCount == 0) {
212 			isEmptyPack = true;
213 			currBuf = new byte[256];
214 			return;
215 		}
216 
217 		packDsc = objdb.newPack(DfsObjDatabase.PackSource.RECEIVE);
218 		out = objdb.writeFile(packDsc, PACK);
219 		packKey = packDsc.getStreamKey(PACK);
220 
221 		int size = out.blockSize();
222 		if (size <= 0)
223 			size = blockCache.getBlockSize();
224 		else if (size < blockCache.getBlockSize())
225 			size = (blockCache.getBlockSize() / size) * size;
226 		blockSize = size;
227 		currBuf = new byte[blockSize];
228 	}
229 
230 	/** {@inheritDoc} */
231 	@Override
232 	protected void onBeginWholeObject(long streamPosition, int type,
233 			long inflatedSize) throws IOException {
234 		crc.reset();
235 	}
236 
237 	/** {@inheritDoc} */
238 	@Override
239 	protected void onEndWholeObject(PackedObjectInfo info) throws IOException {
240 		info.setCRC((int) crc.getValue());
241 	}
242 
243 	/** {@inheritDoc} */
244 	@Override
245 	protected void onBeginOfsDelta(long streamPosition,
246 			long baseStreamPosition, long inflatedSize) throws IOException {
247 		crc.reset();
248 	}
249 
250 	/** {@inheritDoc} */
251 	@Override
252 	protected void onBeginRefDelta(long streamPosition, AnyObjectId baseId,
253 			long inflatedSize) throws IOException {
254 		crc.reset();
255 	}
256 
257 	/** {@inheritDoc} */
258 	@Override
259 	protected UnresolvedDelta onEndDelta() throws IOException {
260 		UnresolvedDelta delta = new UnresolvedDelta();
261 		delta.setCRC((int) crc.getValue());
262 		return delta;
263 	}
264 
265 	/** {@inheritDoc} */
266 	@Override
267 	protected void onInflatedObjectData(PackedObjectInfo obj, int typeCode,
268 			byte[] data) throws IOException {
269 		// DfsPackParser ignores this event.
270 	}
271 
272 	/** {@inheritDoc} */
273 	@Override
274 	protected void onObjectHeader(Source src, byte[] raw, int pos, int len)
275 			throws IOException {
276 		crc.update(raw, pos, len);
277 	}
278 
279 	/** {@inheritDoc} */
280 	@Override
281 	protected void onObjectData(Source src, byte[] raw, int pos, int len)
282 			throws IOException {
283 		crc.update(raw, pos, len);
284 	}
285 
286 	/** {@inheritDoc} */
287 	@Override
288 	protected void onStoreStream(byte[] raw, int pos, int len)
289 			throws IOException {
290 		buffer(raw, pos, len);
291 		packDigest.update(raw, pos, len);
292 	}
293 
294 	private void buffer(byte[] raw, int pos, int len) throws IOException {
295 		while (0 < len) {
296 			int n = Math.min(len, currBuf.length - currEnd);
297 			if (n == 0) {
298 				DfsBlock v = flushBlock();
299 				currBuf = new byte[blockSize];
300 				currEnd = 0;
301 				currPos += v.size();
302 				continue;
303 			}
304 
305 			System.arraycopy(raw, pos, currBuf, currEnd, n);
306 			pos += n;
307 			len -= n;
308 			currEnd += n;
309 			packEnd += n;
310 		}
311 	}
312 
313 	private DfsBlock flushBlock() throws IOException {
314 		if (isEmptyPack)
315 			throw new IOException(DfsText.get().willNotStoreEmptyPack);
316 
317 		out.write(currBuf, 0, currEnd);
318 
319 		byte[] buf;
320 		if (currEnd == currBuf.length) {
321 			buf = currBuf;
322 		} else {
323 			buf = new byte[currEnd];
324 			System.arraycopy(currBuf, 0, buf, 0, currEnd);
325 		}
326 
327 		DfsBlock v = new DfsBlock(packKey, currPos, buf);
328 		readBlock = v;
329 		blockCache.put(v);
330 		return v;
331 	}
332 
333 	/** {@inheritDoc} */
334 	@Override
335 	protected void onPackFooter(byte[] hash) throws IOException {
336 		// The base class will validate the original hash matches
337 		// what the stream has stored at the end. We are called
338 		// only if the hash was good. Save it in case there are no
339 		// missing bases to append.
340 		packHash = hash;
341 	}
342 
343 	/** {@inheritDoc} */
344 	@Override
345 	protected ObjectTypeAndSize seekDatabase(PackedObjectInfo obj,
346 			ObjectTypeAndSize info) throws IOException {
347 		readPos = obj.getOffset();
348 		crc.reset();
349 		return readObjectHeader(info);
350 	}
351 
352 	/** {@inheritDoc} */
353 	@Override
354 	protected ObjectTypeAndSize seekDatabase(UnresolvedDelta delta,
355 			ObjectTypeAndSize info) throws IOException {
356 		readPos = delta.getOffset();
357 		crc.reset();
358 		return readObjectHeader(info);
359 	}
360 
361 	/** {@inheritDoc} */
362 	@Override
363 	protected int readDatabase(byte[] dst, int pos, int cnt) throws IOException {
364 		if (cnt == 0)
365 			return 0;
366 
367 		if (currPos <= readPos) {
368 			// Requested read is still buffered. Copy direct from buffer.
369 			int p = (int) (readPos - currPos);
370 			int n = Math.min(cnt, currEnd - p);
371 			if (n == 0)
372 				throw new EOFException();
373 			System.arraycopy(currBuf, p, dst, pos, n);
374 			readPos += n;
375 			return n;
376 		}
377 
378 		if (readBlock == null || !readBlock.contains(packKey, readPos)) {
379 			long start = toBlockStart(readPos);
380 			readBlock = blockCache.get(packKey, start);
381 			if (readBlock == null) {
382 				int size = (int) Math.min(blockSize, packEnd - start);
383 				byte[] buf = new byte[size];
384 				if (read(start, buf, 0, size) != size)
385 					throw new EOFException();
386 				readBlock = new DfsBlock(packKey, start, buf);
387 				blockCache.put(readBlock);
388 			}
389 		}
390 
391 		int n = readBlock.copy(readPos, dst, pos, cnt);
392 		readPos += n;
393 		return n;
394 	}
395 
396 	private int read(long pos, byte[] dst, int off, int len) throws IOException {
397 		if (len == 0)
398 			return 0;
399 
400 		int cnt = 0;
401 		while (0 < len) {
402 			int r = out.read(pos, ByteBuffer.wrap(dst, off, len));
403 			if (r <= 0)
404 				break;
405 			pos += r;
406 			off += r;
407 			len -= r;
408 			cnt += r;
409 		}
410 		return cnt != 0 ? cnt : -1;
411 	}
412 
413 	private long toBlockStart(long pos) {
414 		return (pos / blockSize) * blockSize;
415 	}
416 
417 	/** {@inheritDoc} */
418 	@Override
419 	protected boolean checkCRC(int oldCRC) {
420 		return oldCRC == (int) crc.getValue();
421 	}
422 
423 	/** {@inheritDoc} */
424 	@Override
425 	protected boolean onAppendBase(final int typeCode, final byte[] data,
426 			final PackedObjectInfo info) throws IOException {
427 		info.setOffset(packEnd);
428 
429 		final byte[] buf = buffer();
430 		int sz = data.length;
431 		int len = 0;
432 		buf[len++] = (byte) ((typeCode << 4) | sz & 15);
433 		sz >>>= 4;
434 		while (sz > 0) {
435 			buf[len - 1] |= 0x80;
436 			buf[len++] = (byte) (sz & 0x7f);
437 			sz >>>= 7;
438 		}
439 
440 		packDigest.update(buf, 0, len);
441 		crc.reset();
442 		crc.update(buf, 0, len);
443 		buffer(buf, 0, len);
444 
445 		if (def == null)
446 			def = new Deflater(Deflater.DEFAULT_COMPRESSION, false);
447 		else
448 			def.reset();
449 		def.setInput(data);
450 		def.finish();
451 
452 		while (!def.finished()) {
453 			len = def.deflate(buf);
454 			packDigest.update(buf, 0, len);
455 			crc.update(buf, 0, len);
456 			buffer(buf, 0, len);
457 		}
458 
459 		info.setCRC((int) crc.getValue());
460 		return true;
461 	}
462 
463 	/** {@inheritDoc} */
464 	@Override
465 	protected void onEndThinPack() throws IOException {
466 		// Normally when a thin pack is closed the pack header gets
467 		// updated to reflect the actual object count. This is not going
468 		// to be possible on most DFS backends, so instead we allow
469 		// the header to have an incorrect count, but we do change the
470 		// trailing digest to be correct.
471 		packHash = packDigest.digest();
472 	}
473 
474 	private void writePackIndex() throws IOException {
475 		List<PackedObjectInfo> list = getSortedObjectList(null /* by ObjectId */);
476 		packIndex = objins.writePackIndex(packDsc, packHash, list);
477 	}
478 }