View Javadoc
1   /*
2    * Copyright (C) 2011, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.storage.dfs;
45  
46  import static org.eclipse.jgit.internal.storage.pack.PackExt.INDEX;
47  import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
48  import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
49  import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;
50  
51  import java.io.BufferedInputStream;
52  import java.io.EOFException;
53  import java.io.IOException;
54  import java.io.InputStream;
55  import java.io.OutputStream;
56  import java.nio.ByteBuffer;
57  import java.security.MessageDigest;
58  import java.text.MessageFormat;
59  import java.util.Collection;
60  import java.util.Collections;
61  import java.util.HashSet;
62  import java.util.List;
63  import java.util.Set;
64  import java.util.zip.CRC32;
65  import java.util.zip.DataFormatException;
66  import java.util.zip.Deflater;
67  import java.util.zip.DeflaterOutputStream;
68  import java.util.zip.Inflater;
69  import java.util.zip.InflaterInputStream;
70  
71  import org.eclipse.jgit.errors.CorruptObjectException;
72  import org.eclipse.jgit.errors.IncorrectObjectTypeException;
73  import org.eclipse.jgit.errors.LargeObjectException;
74  import org.eclipse.jgit.internal.JGitText;
75  import org.eclipse.jgit.internal.storage.file.PackIndex;
76  import org.eclipse.jgit.internal.storage.file.PackIndexWriter;
77  import org.eclipse.jgit.internal.storage.pack.PackExt;
78  import org.eclipse.jgit.lib.AbbreviatedObjectId;
79  import org.eclipse.jgit.lib.AnyObjectId;
80  import org.eclipse.jgit.lib.Constants;
81  import org.eclipse.jgit.lib.ObjectId;
82  import org.eclipse.jgit.lib.ObjectIdOwnerMap;
83  import org.eclipse.jgit.lib.ObjectInserter;
84  import org.eclipse.jgit.lib.ObjectLoader;
85  import org.eclipse.jgit.lib.ObjectReader;
86  import org.eclipse.jgit.lib.ObjectStream;
87  import org.eclipse.jgit.transport.PackedObjectInfo;
88  import org.eclipse.jgit.util.BlockList;
89  import org.eclipse.jgit.util.IO;
90  import org.eclipse.jgit.util.NB;
91  import org.eclipse.jgit.util.TemporaryBuffer;
92  import org.eclipse.jgit.util.io.CountingOutputStream;
93  import org.eclipse.jgit.util.sha1.SHA1;
94  
95  /**
96   * Inserts objects into the DFS.
97   */
98  public class DfsInserter extends ObjectInserter {
99  	/** Always produce version 2 indexes, to get CRC data. */
100 	private static final int INDEX_VERSION = 2;
101 
102 	final DfsObjDatabase db;
103 	int compression = Deflater.BEST_COMPRESSION;
104 
105 	List<PackedObjectInfo> objectList;
106 	ObjectIdOwnerMap<PackedObjectInfo> objectMap;
107 
108 	DfsBlockCache cache;
109 	DfsStreamKey packKey;
110 	DfsPackDescription packDsc;
111 	PackStream packOut;
112 	private boolean rollback;
113 	private boolean checkExisting = true;
114 
115 	/**
116 	 * Initialize a new inserter.
117 	 *
118 	 * @param db
119 	 *            database the inserter writes to.
120 	 */
121 	protected DfsInserter(DfsObjDatabase db) {
122 		this.db = db;
123 	}
124 
125 	/**
126 	 * Check existence
127 	 *
128 	 * @param check
129 	 *            if {@code false}, will write out possibly-duplicate objects
130 	 *            without first checking whether they exist in the repo; default
131 	 *            is true.
132 	 */
133 	public void checkExisting(boolean check) {
134 		checkExisting = check;
135 	}
136 
137 	void setCompressionLevel(int compression) {
138 		this.compression = compression;
139 	}
140 
141 	/** {@inheritDoc} */
142 	@Override
143 	public DfsPackParser newPackParser(InputStream in) throws IOException {
144 		return new DfsPackParser(db, this, in);
145 	}
146 
147 	/** {@inheritDoc} */
148 	@Override
149 	public ObjectReader newReader() {
150 		return new Reader();
151 	}
152 
153 	/** {@inheritDoc} */
154 	@Override
155 	public ObjectId insert(int type, byte[] data, int off, int len)
156 			throws IOException {
157 		ObjectId id = idFor(type, data, off, len);
158 		if (objectMap != null && objectMap.contains(id))
159 			return id;
160 		// Ignore unreachable (garbage) objects here.
161 		if (checkExisting && db.has(id, true))
162 			return id;
163 
164 		long offset = beginObject(type, len);
165 		packOut.compress.write(data, off, len);
166 		packOut.compress.finish();
167 		return endObject(id, offset);
168 	}
169 
170 	/** {@inheritDoc} */
171 	@Override
172 	public ObjectId insert(int type, long len, InputStream in)
173 			throws IOException {
174 		byte[] buf = insertBuffer(len);
175 		if (len <= buf.length) {
176 			IO.readFully(in, buf, 0, (int) len);
177 			return insert(type, buf, 0, (int) len);
178 		}
179 
180 		long offset = beginObject(type, len);
181 		SHA1 md = digest();
182 		md.update(Constants.encodedTypeString(type));
183 		md.update((byte) ' ');
184 		md.update(Constants.encodeASCII(len));
185 		md.update((byte) 0);
186 
187 		while (0 < len) {
188 			int n = in.read(buf, 0, (int) Math.min(buf.length, len));
189 			if (n <= 0)
190 				throw new EOFException();
191 			md.update(buf, 0, n);
192 			packOut.compress.write(buf, 0, n);
193 			len -= n;
194 		}
195 		packOut.compress.finish();
196 		return endObject(md.toObjectId(), offset);
197 	}
198 
199 	private byte[] insertBuffer(long len) {
200 		byte[] buf = buffer();
201 		if (len <= buf.length)
202 			return buf;
203 		if (len < db.getReaderOptions().getStreamFileThreshold()) {
204 			try {
205 				return new byte[(int) len];
206 			} catch (OutOfMemoryError noMem) {
207 				return buf;
208 			}
209 		}
210 		return buf;
211 	}
212 
213 	/** {@inheritDoc} */
214 	@Override
215 	public void flush() throws IOException {
216 		if (packDsc == null)
217 			return;
218 
219 		if (packOut == null)
220 			throw new IOException();
221 
222 		byte[] packHash = packOut.writePackFooter();
223 		packDsc.addFileExt(PACK);
224 		packDsc.setFileSize(PACK, packOut.getCount());
225 		packOut.close();
226 		packOut = null;
227 
228 		sortObjectsById();
229 
230 		PackIndex index = writePackIndex(packDsc, packHash, objectList);
231 		db.commitPack(Collections.singletonList(packDsc), null);
232 		rollback = false;
233 
234 		DfsPackFile p = new DfsPackFile(cache, packDsc);
235 		if (index != null)
236 			p.setPackIndex(index);
237 		db.addPack(p);
238 		clear();
239 	}
240 
241 	/** {@inheritDoc} */
242 	@Override
243 	public void close() {
244 		if (packOut != null) {
245 			try {
246 				packOut.close();
247 			} catch (IOException err) {
248 				// Ignore a close failure, the pack should be removed.
249 			} finally {
250 				packOut = null;
251 			}
252 		}
253 		if (rollback && packDsc != null) {
254 			try {
255 				db.rollbackPack(Collections.singletonList(packDsc));
256 			} finally {
257 				packDsc = null;
258 				rollback = false;
259 			}
260 		}
261 		clear();
262 	}
263 
264 	private void clear() {
265 		objectList = null;
266 		objectMap = null;
267 		packKey = null;
268 		packDsc = null;
269 	}
270 
271 	private long beginObject(int type, long len) throws IOException {
272 		if (packOut == null)
273 			beginPack();
274 		long offset = packOut.getCount();
275 		packOut.beginObject(type, len);
276 		return offset;
277 	}
278 
279 	private ObjectId endObject(ObjectId id, long offset) {
280 		PackedObjectInfo obj = new PackedObjectInfo(id);
281 		obj.setOffset(offset);
282 		obj.setCRC((int) packOut.crc32.getValue());
283 		objectList.add(obj);
284 		objectMap.addIfAbsent(obj);
285 		return id;
286 	}
287 
288 	private void beginPack() throws IOException {
289 		objectList = new BlockList<>();
290 		objectMap = new ObjectIdOwnerMap<>();
291 		cache = DfsBlockCache.getInstance();
292 
293 		rollback = true;
294 		packDsc = db.newPack(DfsObjDatabase.PackSource.INSERT);
295 		DfsOutputStream dfsOut = db.writeFile(packDsc, PACK);
296 		packDsc.setBlockSize(PACK, dfsOut.blockSize());
297 		packOut = new PackStream(dfsOut);
298 		packKey = packDsc.getStreamKey(PACK);
299 
300 		// Write the header as though it were a single object pack.
301 		byte[] buf = packOut.hdrBuf;
302 		System.arraycopy(Constants.PACK_SIGNATURE, 0, buf, 0, 4);
303 		NB.encodeInt32(buf, 4, 2); // Always use pack version 2.
304 		NB.encodeInt32(buf, 8, 1); // Always assume 1 object.
305 		packOut.write(buf, 0, 12);
306 	}
307 
308 	private void sortObjectsById() {
309 		Collections.sort(objectList);
310 	}
311 
312 	PackIndex writePackIndex(DfsPackDescription pack, byte[] packHash,
313 			List<PackedObjectInfo> list) throws IOException {
314 		pack.setIndexVersion(INDEX_VERSION);
315 		pack.setObjectCount(list.size());
316 
317 		// If there are less than 58,000 objects, the entire index fits in under
318 		// 2 MiB. Callers will probably need the index immediately, so buffer
319 		// the index in process and load from the buffer.
320 		TemporaryBuffer.Heap buf = null;
321 		PackIndex packIndex = null;
322 		if (list.size() <= 58000) {
323 			buf = new TemporaryBuffer.Heap(2 << 20);
324 			index(buf, packHash, list);
325 			packIndex = PackIndex.read(buf.openInputStream());
326 		}
327 
328 		try (DfsOutputStream os = db.writeFile(pack, INDEX)) {
329 			CountingOutputStream cnt = new CountingOutputStream(os);
330 			if (buf != null)
331 				buf.writeTo(cnt, null);
332 			else
333 				index(cnt, packHash, list);
334 			pack.addFileExt(INDEX);
335 			pack.setBlockSize(INDEX, os.blockSize());
336 			pack.setFileSize(INDEX, cnt.getCount());
337 		} finally {
338 			if (buf != null) {
339 				buf.close();
340 			}
341 		}
342 		return packIndex;
343 	}
344 
345 	private static void index(OutputStream out, byte[] packHash,
346 			List<PackedObjectInfo> list) throws IOException {
347 		PackIndexWriter.createVersion(out, INDEX_VERSION).write(list, packHash);
348 	}
349 
350 	private class PackStream extends OutputStream {
351 		private final DfsOutputStream out;
352 		private final MessageDigest md;
353 		final byte[] hdrBuf;
354 		private final Deflater deflater;
355 		private final int blockSize;
356 
357 		private long currPos; // Position of currBuf[0] in the output stream.
358 		private int currPtr; // Number of bytes in currBuf.
359 		private byte[] currBuf;
360 
361 		final CRC32 crc32;
362 		final DeflaterOutputStream compress;
363 
364 		PackStream(DfsOutputStream out) {
365 			this.out = out;
366 
367 			hdrBuf = new byte[32];
368 			md = Constants.newMessageDigest();
369 			crc32 = new CRC32();
370 			deflater = new Deflater(compression);
371 			compress = new DeflaterOutputStream(this, deflater, 8192);
372 
373 			int size = out.blockSize();
374 			if (size <= 0)
375 				size = cache.getBlockSize();
376 			else if (size < cache.getBlockSize())
377 				size = (cache.getBlockSize() / size) * size;
378 			blockSize = size;
379 			currBuf = new byte[blockSize];
380 		}
381 
382 		long getCount() {
383 			return currPos + currPtr;
384 		}
385 
386 		void beginObject(int objectType, long length) throws IOException {
387 			crc32.reset();
388 			deflater.reset();
389 			write(hdrBuf, 0, encodeTypeSize(objectType, length));
390 		}
391 
392 		private int encodeTypeSize(int type, long rawLength) {
393 			long nextLength = rawLength >>> 4;
394 			hdrBuf[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F));
395 			rawLength = nextLength;
396 			int n = 1;
397 			while (rawLength > 0) {
398 				nextLength >>>= 7;
399 				hdrBuf[n++] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (rawLength & 0x7F));
400 				rawLength = nextLength;
401 			}
402 			return n;
403 		}
404 
405 		@Override
406 		public void write(final int b) throws IOException {
407 			hdrBuf[0] = (byte) b;
408 			write(hdrBuf, 0, 1);
409 		}
410 
411 		@Override
412 		public void write(byte[] data, int off, int len) throws IOException {
413 			crc32.update(data, off, len);
414 			md.update(data, off, len);
415 			writeNoHash(data, off, len);
416 		}
417 
418 		private void writeNoHash(byte[] data, int off, int len)
419 				throws IOException {
420 			while (0 < len) {
421 				int n = Math.min(len, currBuf.length - currPtr);
422 				if (n == 0) {
423 					flushBlock();
424 					currBuf = new byte[blockSize];
425 					continue;
426 				}
427 
428 				System.arraycopy(data, off, currBuf, currPtr, n);
429 				off += n;
430 				len -= n;
431 				currPtr += n;
432 			}
433 		}
434 
435 		private void flushBlock() throws IOException {
436 			out.write(currBuf, 0, currPtr);
437 
438 			byte[] buf;
439 			if (currPtr == currBuf.length)
440 				buf = currBuf;
441 			else
442 				buf = copyOf(currBuf, 0, currPtr);
443 			cache.put(new DfsBlock(packKey, currPos, buf));
444 
445 			currPos += currPtr;
446 			currPtr = 0;
447 			currBuf = null;
448 		}
449 
450 		private byte[] copyOf(byte[] src, int ptr, int cnt) {
451 			byte[] dst = new byte[cnt];
452 			System.arraycopy(src, ptr, dst, 0, cnt);
453 			return dst;
454 		}
455 
456 		byte[] writePackFooter() throws IOException {
457 			byte[] packHash = md.digest();
458 			writeNoHash(packHash, 0, packHash.length);
459 			if (currPtr != 0)
460 				flushBlock();
461 			return packHash;
462 		}
463 
464 		int read(long pos, byte[] dst, int ptr, int cnt) throws IOException {
465 			int r = 0;
466 			while (pos < currPos && r < cnt) {
467 				DfsBlock b = getOrLoadBlock(pos);
468 				int n = b.copy(pos, dst, ptr + r, cnt - r);
469 				pos += n;
470 				r += n;
471 			}
472 			if (currPos <= pos && r < cnt) {
473 				int s = (int) (pos - currPos);
474 				int n = Math.min(currPtr - s, cnt - r);
475 				System.arraycopy(currBuf, s, dst, ptr + r, n);
476 				r += n;
477 			}
478 			return r;
479 		}
480 
481 		byte[] inflate(DfsReader ctx, long pos, int len) throws IOException,
482 				DataFormatException {
483 			byte[] dstbuf;
484 			try {
485 				dstbuf = new byte[len];
486 			} catch (OutOfMemoryError noMemory) {
487 				return null; // Caller will switch to large object streaming.
488 			}
489 
490 			Inflater inf = ctx.inflater();
491 			pos += setInput(pos, inf);
492 			for (int dstoff = 0;;) {
493 				int n = inf.inflate(dstbuf, dstoff, dstbuf.length - dstoff);
494 				dstoff += n;
495 				if (inf.finished())
496 					return dstbuf;
497 				if (inf.needsInput())
498 					pos += setInput(pos, inf);
499 				else if (n == 0)
500 					throw new DataFormatException();
501 			}
502 		}
503 
504 		private int setInput(long pos, Inflater inf)
505 				throws IOException, DataFormatException {
506 			if (pos < currPos)
507 				return getOrLoadBlock(pos).setInput(pos, inf);
508 			if (pos < currPos + currPtr) {
509 				int s = (int) (pos - currPos);
510 				int n = currPtr - s;
511 				inf.setInput(currBuf, s, n);
512 				return n;
513 			}
514 			throw new EOFException(JGitText.get().unexpectedEofInPack);
515 		}
516 
517 		private DfsBlock getOrLoadBlock(long pos) throws IOException {
518 			long s = toBlockStart(pos);
519 			DfsBlock b = cache.get(packKey, s);
520 			if (b != null)
521 				return b;
522 
523 			byte[] d = new byte[blockSize];
524 			for (int p = 0; p < blockSize;) {
525 				int n = out.read(s + p, ByteBuffer.wrap(d, p, blockSize - p));
526 				if (n <= 0)
527 					throw new EOFException(JGitText.get().unexpectedEofInPack);
528 				p += n;
529 			}
530 			b = new DfsBlock(packKey, s, d);
531 			cache.put(b);
532 			return b;
533 		}
534 
535 		private long toBlockStart(long pos) {
536 			return (pos / blockSize) * blockSize;
537 		}
538 
539 		@Override
540 		public void close() throws IOException {
541 			deflater.end();
542 			out.close();
543 		}
544 	}
545 
546 	private class Reader extends ObjectReader {
547 		private final DfsReader ctx = db.newReader();
548 
549 		@Override
550 		public ObjectReader newReader() {
551 			return db.newReader();
552 		}
553 
554 		@Override
555 		public Collection<ObjectId> resolve(AbbreviatedObjectId id)
556 				throws IOException {
557 			Collection<ObjectId> stored = ctx.resolve(id);
558 			if (objectList == null)
559 				return stored;
560 
561 			Set<ObjectId> r = new HashSet<>(stored.size() + 2);
562 			r.addAll(stored);
563 			for (PackedObjectInfo obj : objectList) {
564 				if (id.prefixCompare(obj) == 0)
565 					r.add(obj.copy());
566 			}
567 			return r;
568 		}
569 
570 		@Override
571 		public ObjectLoader open(AnyObjectId objectId, int typeHint)
572 				throws IOException {
573 			if (objectMap == null)
574 				return ctx.open(objectId, typeHint);
575 
576 			PackedObjectInfo obj = objectMap.get(objectId);
577 			if (obj == null)
578 				return ctx.open(objectId, typeHint);
579 
580 			byte[] buf = buffer();
581 			int cnt = packOut.read(obj.getOffset(), buf, 0, 20);
582 			if (cnt <= 0)
583 					throw new EOFException(JGitText.get().unexpectedEofInPack);
584 
585 			int c = buf[0] & 0xff;
586 			int type = (c >> 4) & 7;
587 			if (type == OBJ_OFS_DELTA || type == OBJ_REF_DELTA)
588 				throw new IOException(MessageFormat.format(
589 						JGitText.get().cannotReadBackDelta, Integer.toString(type)));
590 			if (typeHint != OBJ_ANY && type != typeHint) {
591 				throw new IncorrectObjectTypeException(objectId.copy(), typeHint);
592 			}
593 
594 			long sz = c & 0x0f;
595 			int ptr = 1;
596 			int shift = 4;
597 			while ((c & 0x80) != 0) {
598 				if (ptr >= cnt)
599 					throw new EOFException(JGitText.get().unexpectedEofInPack);
600 				c = buf[ptr++] & 0xff;
601 				sz += ((long) (c & 0x7f)) << shift;
602 				shift += 7;
603 			}
604 
605 			long zpos = obj.getOffset() + ptr;
606 			if (sz < ctx.getStreamFileThreshold()) {
607 				byte[] data = inflate(obj, zpos, (int) sz);
608 				if (data != null)
609 					return new ObjectLoader.SmallObject(type, data);
610 			}
611 			return new StreamLoader(obj.copy(), type, sz, packKey, zpos);
612 		}
613 
614 		private byte[] inflate(PackedObjectInfo obj, long zpos, int sz)
615 				throws IOException, CorruptObjectException {
616 			try {
617 				return packOut.inflate(ctx, zpos, sz);
618 			} catch (DataFormatException dfe) {
619 				throw new CorruptObjectException(
620 						MessageFormat.format(
621 								JGitText.get().objectAtHasBadZlibStream,
622 								Long.valueOf(obj.getOffset()),
623 								packDsc.getFileName(PackExt.PACK)),
624 						dfe);
625 			}
626 		}
627 
628 		@Override
629 		public boolean has(AnyObjectId objectId) throws IOException {
630 			return (objectMap != null && objectMap.contains(objectId))
631 					|| ctx.has(objectId);
632 		}
633 
634 		@Override
635 		public Set<ObjectId> getShallowCommits() throws IOException {
636 			return ctx.getShallowCommits();
637 		}
638 
639 		@Override
640 		public ObjectInserter getCreatedFromInserter() {
641 			return DfsInserter.this;
642 		}
643 
644 		@Override
645 		public void close() {
646 			ctx.close();
647 		}
648 	}
649 
650 	private class StreamLoader extends ObjectLoader {
651 		private final ObjectId id;
652 		private final int type;
653 		private final long size;
654 
655 		private final DfsStreamKey srcPack;
656 		private final long pos;
657 
658 		StreamLoader(ObjectId id, int type, long sz,
659 				DfsStreamKey key, long pos) {
660 			this.id = id;
661 			this.type = type;
662 			this.size = sz;
663 			this.srcPack = key;
664 			this.pos = pos;
665 		}
666 
667 		@Override
668 		public ObjectStream openStream() throws IOException {
669 			final DfsReader ctx = db.newReader();
670 			if (srcPack != packKey) {
671 				try {
672 					// Post DfsInserter.flush() use the normal code path.
673 					// The newly created pack is registered in the cache.
674 					return ctx.open(id, type).openStream();
675 				} finally {
676 					ctx.close();
677 				}
678 			}
679 
680 			int bufsz = 8192;
681 			final Inflater inf = ctx.inflater();
682 			return new ObjectStream.Filter(type,
683 					size, new BufferedInputStream(new InflaterInputStream(
684 							new ReadBackStream(pos), inf, bufsz), bufsz)) {
685 				@Override
686 				public void close() throws IOException {
687 					ctx.close();
688 					super.close();
689 				}
690 			};
691 		}
692 
693 		@Override
694 		public int getType() {
695 			return type;
696 		}
697 
698 		@Override
699 		public long getSize() {
700 			return size;
701 		}
702 
703 		@Override
704 		public boolean isLarge() {
705 			return true;
706 		}
707 
708 		@Override
709 		public byte[] getCachedBytes() throws LargeObjectException {
710 			throw new LargeObjectException.ExceedsLimit(
711 					db.getReaderOptions().getStreamFileThreshold(), size);
712 		}
713 	}
714 
715 	private final class ReadBackStream extends InputStream {
716 		private long pos;
717 
718 		ReadBackStream(long offset) {
719 			pos = offset;
720 		}
721 
722 		@Override
723 		public int read() throws IOException {
724 			byte[] b = new byte[1];
725 			int n = read(b);
726 			return n == 1 ? b[0] & 0xff : -1;
727 		}
728 
729 		@Override
730 		public int read(byte[] buf, int ptr, int len) throws IOException {
731 			int n = packOut.read(pos, buf, ptr, len);
732 			if (n > 0) {
733 				pos += n;
734 			}
735 			return n;
736 		}
737 	}
738 }