View Javadoc
1   /*
2    * Copyright (C) 2011, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.storage.dfs;
45  
46  import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
47  
48  import java.io.EOFException;
49  import java.io.IOException;
50  import java.io.InputStream;
51  import java.nio.ByteBuffer;
52  import java.security.MessageDigest;
53  import java.util.Collections;
54  import java.util.List;
55  import java.util.zip.CRC32;
56  import java.util.zip.Deflater;
57  
58  import org.eclipse.jgit.internal.storage.file.PackIndex;
59  import org.eclipse.jgit.internal.storage.file.PackLock;
60  import org.eclipse.jgit.lib.AnyObjectId;
61  import org.eclipse.jgit.lib.Constants;
62  import org.eclipse.jgit.lib.ProgressMonitor;
63  import org.eclipse.jgit.transport.PackParser;
64  import org.eclipse.jgit.transport.PackedObjectInfo;
65  
66  /** Parses a pack stream into the DFS, by creating a new pack and index. */
67  public class DfsPackParser extends PackParser {
68  	private final DfsObjDatabase objdb;
69  
70  	private final DfsInserter objins;
71  
72  	/** CRC-32 computation for objects that are appended onto the pack. */
73  	private final CRC32 crc;
74  
75  	/** Running SHA-1 of the entire pack stream. */
76  	private final MessageDigest packDigest;
77  
78  	/** Block size to use when caching data for read back. */
79  	private int blockSize;
80  
81  	/** Current end of the pack file. */
82  	private long packEnd;
83  
84  	/** Checksum of the entire pack file. */
85  	private byte[] packHash;
86  
87  	/** Compresses delta bases when completing a thin pack. */
88  	private Deflater def;
89  
90  	/** True if the pack is an empty pack. */
91  	private boolean isEmptyPack;
92  
93  	/** Name of the pack file, computed in {@link #onPackHeader(long)}. */
94  	private DfsPackDescription packDsc;
95  
96  	/** Key used during delta resolution reading delta chains. */
97  	private DfsPackKey packKey;
98  
99  	/** If the index was small enough, the entire index after writing. */
100 	private PackIndex packIndex;
101 
102 	/** Stream to the DFS storage, opened during {@link #onPackHeader(long)}. */
103 	private DfsOutputStream out;
104 
105 	/** Data being written that has not yet been cached. */
106 	private byte[] currBuf;
107 	private long currPos; // Position of currBuf in the file.
108 	private int currEnd; // Position inside of currBuf to append to next.
109 
110 	/** Cache the chunks were stored into or get read back from. */
111 	private DfsBlockCache blockCache;
112 
113 	/** Cached block that is being read. */
114 	private long readPos;
115 	private DfsBlock readBlock;
116 
117 	/**
118 	 * Initialize a new pack parser.
119 	 *
120 	 * @param db
121 	 *            database the objects will be imported into.
122 	 * @param ins
123 	 *            inserter the parser will use to help it inject the objects.
124 	 * @param in
125 	 *            the stream to parse.
126 	 */
127 	protected DfsPackParser(DfsObjDatabase db, DfsInserter ins, InputStream in) {
128 		super(db, in);
129 		this.objdb = db;
130 		this.objins = ins;
131 		this.crc = new CRC32();
132 		this.packDigest = Constants.newMessageDigest();
133 	}
134 
135 	@Override
136 	public PackLock parse(ProgressMonitor receiving, ProgressMonitor resolving)
137 			throws IOException {
138 		boolean rollback = true;
139 		try {
140 			blockCache = DfsBlockCache.getInstance();
141 			super.parse(receiving, resolving);
142 			if (isEmptyPack)
143 				return null;
144 			buffer(packHash, 0, packHash.length);
145 			if (currEnd != 0)
146 				flushBlock();
147 			out.close();
148 			out = null;
149 			currBuf = null;
150 			readBlock = null;
151 			packDsc.addFileExt(PACK);
152 			packDsc.setFileSize(PACK, packEnd);
153 
154 			writePackIndex();
155 			objdb.commitPack(Collections.singletonList(packDsc), null);
156 			rollback = false;
157 
158 			DfsPackFile p = blockCache.getOrCreate(packDsc, packKey);
159 			p.setBlockSize(blockSize);
160 			if (packIndex != null)
161 				p.setPackIndex(packIndex);
162 
163 			objdb.addPack(p);
164 
165 			return null;
166 		} finally {
167 			blockCache = null;
168 			currBuf = null;
169 			readBlock = null;
170 
171 			if (def != null) {
172 				def.end();
173 				def = null;
174 			}
175 
176 			if (out != null) {
177 				try {
178 					out.close();
179 				} catch (IOException err) {
180 					// Ignore a close error, rollbackPack is also important.
181 				}
182 				out = null;
183 			}
184 
185 			if (rollback && packDsc != null) {
186 				try {
187 					objdb.rollbackPack(Collections.singletonList(packDsc));
188 				} finally {
189 					packDsc = null;
190 				}
191 			}
192 		}
193 	}
194 
195 	/** @return description of the imported pack, if one was made. */
196 	public DfsPackDescription getPackDescription() {
197 		return packDsc;
198 	}
199 
200 	@Override
201 	protected void onPackHeader(long objectCount) throws IOException {
202 		if (objectCount == 0) {
203 			isEmptyPack = true;
204 			currBuf = new byte[256];
205 			return;
206 		}
207 
208 		packDsc = objdb.newPack(DfsObjDatabase.PackSource.RECEIVE);
209 		packKey = new DfsPackKey();
210 
211 		out = objdb.writeFile(packDsc, PACK);
212 		int size = out.blockSize();
213 		if (size <= 0)
214 			size = blockCache.getBlockSize();
215 		else if (size < blockCache.getBlockSize())
216 			size = (blockCache.getBlockSize() / size) * size;
217 		blockSize = size;
218 		currBuf = new byte[blockSize];
219 	}
220 
221 	@Override
222 	protected void onBeginWholeObject(long streamPosition, int type,
223 			long inflatedSize) throws IOException {
224 		crc.reset();
225 	}
226 
227 	@Override
228 	protected void onEndWholeObject(PackedObjectInfo info) throws IOException {
229 		info.setCRC((int) crc.getValue());
230 	}
231 
232 	@Override
233 	protected void onBeginOfsDelta(long streamPosition,
234 			long baseStreamPosition, long inflatedSize) throws IOException {
235 		crc.reset();
236 	}
237 
238 	@Override
239 	protected void onBeginRefDelta(long streamPosition, AnyObjectId baseId,
240 			long inflatedSize) throws IOException {
241 		crc.reset();
242 	}
243 
244 	@Override
245 	protected UnresolvedDelta onEndDelta() throws IOException {
246 		UnresolvedDelta delta = new UnresolvedDelta();
247 		delta.setCRC((int) crc.getValue());
248 		return delta;
249 	}
250 
251 	@Override
252 	protected void onInflatedObjectData(PackedObjectInfo obj, int typeCode,
253 			byte[] data) throws IOException {
254 		// DfsPackParser ignores this event.
255 	}
256 
257 	@Override
258 	protected void onObjectHeader(Source src, byte[] raw, int pos, int len)
259 			throws IOException {
260 		crc.update(raw, pos, len);
261 	}
262 
263 	@Override
264 	protected void onObjectData(Source src, byte[] raw, int pos, int len)
265 			throws IOException {
266 		crc.update(raw, pos, len);
267 	}
268 
269 	@Override
270 	protected void onStoreStream(byte[] raw, int pos, int len)
271 			throws IOException {
272 		buffer(raw, pos, len);
273 		packDigest.update(raw, pos, len);
274 	}
275 
276 	private void buffer(byte[] raw, int pos, int len) throws IOException {
277 		while (0 < len) {
278 			int n = Math.min(len, currBuf.length - currEnd);
279 			if (n == 0) {
280 				DfsBlock v = flushBlock();
281 				currBuf = new byte[blockSize];
282 				currEnd = 0;
283 				currPos += v.size();
284 				continue;
285 			}
286 
287 			System.arraycopy(raw, pos, currBuf, currEnd, n);
288 			pos += n;
289 			len -= n;
290 			currEnd += n;
291 			packEnd += n;
292 		}
293 	}
294 
295 	private DfsBlock flushBlock() throws IOException {
296 		if (isEmptyPack)
297 			throw new IOException(DfsText.get().willNotStoreEmptyPack);
298 
299 		out.write(currBuf, 0, currEnd);
300 
301 		byte[] buf;
302 		if (currEnd == currBuf.length) {
303 			buf = currBuf;
304 		} else {
305 			buf = new byte[currEnd];
306 			System.arraycopy(currBuf, 0, buf, 0, currEnd);
307 		}
308 
309 		DfsBlock v = new DfsBlock(packKey, currPos, buf);
310 		readBlock = v;
311 		blockCache.put(v);
312 		return v;
313 	}
314 
315 	@Override
316 	protected void onPackFooter(byte[] hash) throws IOException {
317 		// The base class will validate the original hash matches
318 		// what the stream has stored at the end. We are called
319 		// only if the hash was good. Save it in case there are no
320 		// missing bases to append.
321 		packHash = hash;
322 	}
323 
324 	@Override
325 	protected ObjectTypeAndSize seekDatabase(PackedObjectInfo obj,
326 			ObjectTypeAndSize info) throws IOException {
327 		readPos = obj.getOffset();
328 		crc.reset();
329 		return readObjectHeader(info);
330 	}
331 
332 	@Override
333 	protected ObjectTypeAndSize seekDatabase(UnresolvedDelta delta,
334 			ObjectTypeAndSize info) throws IOException {
335 		readPos = delta.getOffset();
336 		crc.reset();
337 		return readObjectHeader(info);
338 	}
339 
340 	@Override
341 	protected int readDatabase(byte[] dst, int pos, int cnt) throws IOException {
342 		if (cnt == 0)
343 			return 0;
344 
345 		if (currPos <= readPos) {
346 			// Requested read is still buffered. Copy direct from buffer.
347 			int p = (int) (readPos - currPos);
348 			int n = Math.min(cnt, currEnd - p);
349 			if (n == 0)
350 				throw new EOFException();
351 			System.arraycopy(currBuf, p, dst, pos, n);
352 			readPos += n;
353 			return n;
354 		}
355 
356 		if (readBlock == null || !readBlock.contains(packKey, readPos)) {
357 			long start = toBlockStart(readPos);
358 			readBlock = blockCache.get(packKey, start);
359 			if (readBlock == null) {
360 				int size = (int) Math.min(blockSize, packEnd - start);
361 				byte[] buf = new byte[size];
362 				if (read(start, buf, 0, size) != size)
363 					throw new EOFException();
364 				readBlock = new DfsBlock(packKey, start, buf);
365 				blockCache.put(readBlock);
366 			}
367 		}
368 
369 		int n = readBlock.copy(readPos, dst, pos, cnt);
370 		readPos += n;
371 		return n;
372 	}
373 
374 	private int read(long pos, byte[] dst, int off, int len) throws IOException {
375 		if (len == 0)
376 			return 0;
377 
378 		int cnt = 0;
379 		while (0 < len) {
380 			int r = out.read(pos, ByteBuffer.wrap(dst, off, len));
381 			if (r <= 0)
382 				break;
383 			pos += r;
384 			off += r;
385 			len -= r;
386 			cnt += r;
387 		}
388 		return cnt != 0 ? cnt : -1;
389 	}
390 
391 	private long toBlockStart(long pos) {
392 		return (pos / blockSize) * blockSize;
393 	}
394 
395 	@Override
396 	protected boolean checkCRC(int oldCRC) {
397 		return oldCRC == (int) crc.getValue();
398 	}
399 
400 	@Override
401 	protected boolean onAppendBase(final int typeCode, final byte[] data,
402 			final PackedObjectInfo info) throws IOException {
403 		info.setOffset(packEnd);
404 
405 		final byte[] buf = buffer();
406 		int sz = data.length;
407 		int len = 0;
408 		buf[len++] = (byte) ((typeCode << 4) | sz & 15);
409 		sz >>>= 4;
410 		while (sz > 0) {
411 			buf[len - 1] |= 0x80;
412 			buf[len++] = (byte) (sz & 0x7f);
413 			sz >>>= 7;
414 		}
415 
416 		packDigest.update(buf, 0, len);
417 		crc.reset();
418 		crc.update(buf, 0, len);
419 		buffer(buf, 0, len);
420 
421 		if (def == null)
422 			def = new Deflater(Deflater.DEFAULT_COMPRESSION, false);
423 		else
424 			def.reset();
425 		def.setInput(data);
426 		def.finish();
427 
428 		while (!def.finished()) {
429 			len = def.deflate(buf);
430 			packDigest.update(buf, 0, len);
431 			crc.update(buf, 0, len);
432 			buffer(buf, 0, len);
433 		}
434 
435 		info.setCRC((int) crc.getValue());
436 		return true;
437 	}
438 
439 	@Override
440 	protected void onEndThinPack() throws IOException {
441 		// Normally when a thin pack is closed the pack header gets
442 		// updated to reflect the actual object count. This is not going
443 		// to be possible on most DFS backends, so instead we allow
444 		// the header to have an incorrect count, but we do change the
445 		// trailing digest to be correct.
446 		packHash = packDigest.digest();
447 	}
448 
449 	private void writePackIndex() throws IOException {
450 		List<PackedObjectInfo> list = getSortedObjectList(null /* by ObjectId */);
451 		packIndex = objins.writePackIndex(packDsc, packHash, list);
452 	}
453 }