1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 package org.eclipse.jgit.internal.storage.dfs;
46
47 import java.io.IOException;
48 import java.util.concurrent.atomic.AtomicLong;
49 import java.util.concurrent.atomic.AtomicReferenceArray;
50 import java.util.concurrent.locks.ReentrantLock;
51
52 import org.eclipse.jgit.annotations.Nullable;
53 import org.eclipse.jgit.internal.JGitText;
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 public final class DfsBlockCache {
89 private static volatile DfsBlockCache cache;
90
91 static {
92 reconfigure(new DfsBlockCacheConfig());
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106
107 public static void reconfigure(DfsBlockCacheConfig cfg) {
108 cache = new DfsBlockCache(cfg);
109 }
110
111
112 public static DfsBlockCache getInstance() {
113 return cache;
114 }
115
116
117 private final int tableSize;
118
119
120 private final AtomicReferenceArray<HashEntry> table;
121
122
123 private final ReentrantLock[] loadLocks;
124
125
126 private final long maxBytes;
127
128
129 private final long maxStreamThroughCache;
130
131
132
133
134
135
136
137
138
139
140
141 private final int blockSize;
142
143
144 private final int blockSizeShift;
145
146
147 private final AtomicLong statHit;
148
149
150 private final AtomicLong statMiss;
151
152
153 private volatile long statEvict;
154
155
156 private final ReentrantLock clockLock;
157
158
159 private Ref clockHand;
160
161
162 private volatile long liveBytes;
163
164 @SuppressWarnings("unchecked")
165 private DfsBlockCache(final DfsBlockCacheConfig cfg) {
166 tableSize = tableSize(cfg);
167 if (tableSize < 1)
168 throw new IllegalArgumentException(JGitText.get().tSizeMustBeGreaterOrEqual1);
169
170 table = new AtomicReferenceArray<>(tableSize);
171 loadLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
172 for (int i = 0; i < loadLocks.length; i++)
173 loadLocks[i] = new ReentrantLock(true );
174
175 maxBytes = cfg.getBlockLimit();
176 maxStreamThroughCache = (long) (maxBytes * cfg.getStreamRatio());
177 blockSize = cfg.getBlockSize();
178 blockSizeShift = Integer.numberOfTrailingZeros(blockSize);
179
180 clockLock = new ReentrantLock(true );
181 String none = "";
182 clockHand = new Ref<>(
183 DfsStreamKey.of(new DfsRepositoryDescription(none), none),
184 -1, 0, null);
185 clockHand.next = clockHand;
186
187 statHit = new AtomicLong();
188 statMiss = new AtomicLong();
189 }
190
191 boolean shouldCopyThroughCache(long length) {
192 return length <= maxStreamThroughCache;
193 }
194
195
196 public long getCurrentSize() {
197 return liveBytes;
198 }
199
200
201 public long getFillPercentage() {
202 return getCurrentSize() * 100 / maxBytes;
203 }
204
205
206 public long getHitCount() {
207 return statHit.get();
208 }
209
210
211 public long getMissCount() {
212 return statMiss.get();
213 }
214
215
216 public long getTotalRequestCount() {
217 return getHitCount() + getMissCount();
218 }
219
220
221 public long getHitRatio() {
222 long hits = statHit.get();
223 long miss = statMiss.get();
224 long total = hits + miss;
225 if (total == 0)
226 return 0;
227 return hits * 100 / total;
228 }
229
230
231 public long getEvictions() {
232 return statEvict;
233 }
234
235 private int hash(int packHash, long off) {
236 return packHash + (int) (off >>> blockSizeShift);
237 }
238
239 int getBlockSize() {
240 return blockSize;
241 }
242
243 private static int tableSize(final DfsBlockCacheConfig cfg) {
244 final int wsz = cfg.getBlockSize();
245 final long limit = cfg.getBlockLimit();
246 if (wsz <= 0)
247 throw new IllegalArgumentException(JGitText.get().invalidWindowSize);
248 if (limit < wsz)
249 throw new IllegalArgumentException(JGitText.get().windowSizeMustBeLesserThanLimit);
250 return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE);
251 }
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268 DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
269 @Nullable ReadableChannel fileChannel) throws IOException {
270 final long requestedPosition = position;
271 position = file.alignToBlock(position);
272
273 DfsStreamKey key = file.key;
274 int slot = slot(key, position);
275 HashEntry e1 = table.get(slot);
276 DfsBlock v = scan(e1, key, position);
277 if (v != null && v.contains(key, requestedPosition)) {
278 ctx.stats.blockCacheHit++;
279 statHit.incrementAndGet();
280 return v;
281 }
282
283 reserveSpace(blockSize);
284 ReentrantLock regionLock = lockFor(key, position);
285 regionLock.lock();
286 try {
287 HashEntry e2 = table.get(slot);
288 if (e2 != e1) {
289 v = scan(e2, key, position);
290 if (v != null) {
291 ctx.stats.blockCacheHit++;
292 statHit.incrementAndGet();
293 creditSpace(blockSize);
294 return v;
295 }
296 }
297
298 statMiss.incrementAndGet();
299 boolean credit = true;
300 try {
301 v = file.readOneBlock(requestedPosition, ctx, fileChannel);
302 credit = false;
303 } finally {
304 if (credit)
305 creditSpace(blockSize);
306 }
307 if (position != v.start) {
308
309 position = v.start;
310 slot = slot(key, position);
311 e2 = table.get(slot);
312 }
313
314 Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v);
315 ref.hot = true;
316 for (;;) {
317 HashEntry n = new HashEntry(clean(e2), ref);
318 if (table.compareAndSet(slot, e2, n))
319 break;
320 e2 = table.get(slot);
321 }
322 addToClock(ref, blockSize - v.size());
323 } finally {
324 regionLock.unlock();
325 }
326
327
328
329 if (v.contains(file.key, requestedPosition))
330 return v;
331 return getOrLoad(file, requestedPosition, ctx, fileChannel);
332 }
333
334 @SuppressWarnings("unchecked")
335 private void reserveSpace(int reserve) {
336 clockLock.lock();
337 try {
338 long live = liveBytes + reserve;
339 if (maxBytes < live) {
340 Ref prev = clockHand;
341 Ref hand = clockHand.next;
342 do {
343 if (hand.hot) {
344
345
346 hand.hot = false;
347 prev = hand;
348 hand = hand.next;
349 continue;
350 } else if (prev == hand)
351 break;
352
353
354
355 Ref dead = hand;
356 hand = hand.next;
357 prev.next = hand;
358 dead.next = null;
359 dead.value = null;
360 live -= dead.size;
361 statEvict++;
362 } while (maxBytes < live);
363 clockHand = prev;
364 }
365 liveBytes = live;
366 } finally {
367 clockLock.unlock();
368 }
369 }
370
371 private void creditSpace(int credit) {
372 clockLock.lock();
373 liveBytes -= credit;
374 clockLock.unlock();
375 }
376
377 @SuppressWarnings("unchecked")
378 private void addToClock(Ref ref, int credit) {
379 clockLock.lock();
380 try {
381 if (credit != 0)
382 liveBytes -= credit;
383 Ref ptr = clockHand;
384 ref.next = ptr.next;
385 ptr.next = ref;
386 clockHand = ref;
387 } finally {
388 clockLock.unlock();
389 }
390 }
391
392 void put(DfsBlock v) {
393 put(v.stream, v.start, v.size(), v);
394 }
395
396 <T> Ref<T> putRef(DfsStreamKey key, long size, T v) {
397 return put(key, 0, (int) Math.min(size, Integer.MAX_VALUE), v);
398 }
399
400 <T> Ref<T> put(DfsStreamKey key, long pos, int size, T v) {
401 int slot = slot(key, pos);
402 HashEntry e1 = table.get(slot);
403 Ref<T> ref = scanRef(e1, key, pos);
404 if (ref != null)
405 return ref;
406
407 reserveSpace(size);
408 ReentrantLock regionLock = lockFor(key, pos);
409 regionLock.lock();
410 try {
411 HashEntry e2 = table.get(slot);
412 if (e2 != e1) {
413 ref = scanRef(e2, key, pos);
414 if (ref != null) {
415 creditSpace(size);
416 return ref;
417 }
418 }
419
420 ref = new Ref<>(key, pos, size, v);
421 ref.hot = true;
422 for (;;) {
423 HashEntry n = new HashEntry(clean(e2), ref);
424 if (table.compareAndSet(slot, e2, n))
425 break;
426 e2 = table.get(slot);
427 }
428 addToClock(ref, 0);
429 } finally {
430 regionLock.unlock();
431 }
432 return ref;
433 }
434
435 boolean contains(DfsStreamKey key, long position) {
436 return scan(table.get(slot(key, position)), key, position) != null;
437 }
438
439 @SuppressWarnings("unchecked")
440 <T> T get(DfsStreamKey key, long position) {
441 T val = (T) scan(table.get(slot(key, position)), key, position);
442 if (val == null)
443 statMiss.incrementAndGet();
444 else
445 statHit.incrementAndGet();
446 return val;
447 }
448
449 private <T> T scan(HashEntry n, DfsStreamKey key, long position) {
450 Ref<T> r = scanRef(n, key, position);
451 return r != null ? r.get() : null;
452 }
453
454 <T> Ref<T> getRef(DfsStreamKey key) {
455 Ref<T> r = scanRef(table.get(slot(key, 0)), key, 0);
456 if (r != null)
457 statHit.incrementAndGet();
458 else
459 statMiss.incrementAndGet();
460 return r;
461 }
462
463 @SuppressWarnings("unchecked")
464 private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) {
465 for (; n != null; n = n.next) {
466 Ref<T> r = n.ref;
467 if (r.position == position && r.key.equals(key))
468 return r.get() != null ? r : null;
469 }
470 return null;
471 }
472
473 private int slot(DfsStreamKey key, long position) {
474 return (hash(key.hash, position) >>> 1) % tableSize;
475 }
476
477 private ReentrantLock lockFor(DfsStreamKey key, long position) {
478 return loadLocks[(hash(key.hash, position) >>> 1) % loadLocks.length];
479 }
480
481 private static HashEntry clean(HashEntry top) {
482 while (top != null && top.ref.next == null)
483 top = top.next;
484 if (top == null)
485 return null;
486 HashEntry n = clean(top.next);
487 return n == top.next ? top : new HashEntry(n, top.ref);
488 }
489
490 private static final class HashEntry {
491
492 final HashEntry next;
493
494
495 final Ref ref;
496
497 HashEntry(HashEntry n, Ref r) {
498 next = n;
499 ref = r;
500 }
501 }
502
503 static final class Ref<T> {
504 final DfsStreamKey key;
505 final long position;
506 final int size;
507 volatile T value;
508 Ref next;
509 volatile boolean hot;
510
511 Ref(DfsStreamKey key, long position, int size, T v) {
512 this.key = key;
513 this.position = position;
514 this.size = size;
515 this.value = v;
516 }
517
518 T get() {
519 T v = value;
520 if (v != null)
521 hot = true;
522 return v;
523 }
524
525 boolean has() {
526 return value != null;
527 }
528 }
529 }