1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 package org.eclipse.jgit.internal.storage.dfs;
46
47 import java.io.IOException;
48 import java.util.concurrent.atomic.AtomicLong;
49 import java.util.concurrent.atomic.AtomicReference;
50 import java.util.concurrent.atomic.AtomicReferenceArray;
51 import java.util.concurrent.locks.ReentrantLock;
52 import java.util.function.Consumer;
53 import java.util.stream.LongStream;
54
55 import org.eclipse.jgit.internal.JGitText;
56 import org.eclipse.jgit.internal.storage.pack.PackExt;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 public final class DfsBlockCache {
94 private static volatile DfsBlockCache cache;
95
96 static {
97 reconfigure(new DfsBlockCacheConfig());
98 }
99
100
101
102
103
104
105
106
107
108
109
110
111
112 public static void reconfigure(DfsBlockCacheConfig cfg) {
113 cache = new DfsBlockCache(cfg);
114 }
115
116
117
118
119
120
121 public static DfsBlockCache getInstance() {
122 return cache;
123 }
124
125
126 private final int tableSize;
127
128
129 private final AtomicReferenceArray<HashEntry> table;
130
131
132
133
134
135
136 private final ReentrantLock[] loadLocks;
137
138
139
140
141 private final ReentrantLock[] refLocks;
142
143
144 private final long maxBytes;
145
146
147 private final long maxStreamThroughCache;
148
149
150
151
152
153
154
155
156
157
158
159 private final int blockSize;
160
161
162 private final int blockSizeShift;
163
164
165
166
167 private final AtomicReference<AtomicLong[]> statHit;
168
169
170
171
172
173 private final AtomicReference<AtomicLong[]> statMiss;
174
175
176
177
178
179 private final AtomicReference<AtomicLong[]> statEvict;
180
181
182
183
184 private final AtomicReference<AtomicLong[]> liveBytes;
185
186
187 private final ReentrantLock clockLock;
188
189
190
191
192 private final Consumer<Long> refLockWaitTime;
193
194
195 private Ref clockHand;
196
197 @SuppressWarnings("unchecked")
198 private DfsBlockCache(DfsBlockCacheConfig cfg) {
199 tableSize = tableSize(cfg);
200 if (tableSize < 1) {
201 throw new IllegalArgumentException(JGitText.get().tSizeMustBeGreaterOrEqual1);
202 }
203
204 table = new AtomicReferenceArray<>(tableSize);
205 loadLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
206 for (int i = 0; i < loadLocks.length; i++) {
207 loadLocks[i] = new ReentrantLock(true );
208 }
209 refLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
210 for (int i = 0; i < refLocks.length; i++) {
211 refLocks[i] = new ReentrantLock(true );
212 }
213
214 maxBytes = cfg.getBlockLimit();
215 maxStreamThroughCache = (long) (maxBytes * cfg.getStreamRatio());
216 blockSize = cfg.getBlockSize();
217 blockSizeShift = Integer.numberOfTrailingZeros(blockSize);
218
219 clockLock = new ReentrantLock(true );
220 String none = "";
221 clockHand = new Ref<>(
222 DfsStreamKey.of(new DfsRepositoryDescription(none), none, null),
223 -1, 0, null);
224 clockHand.next = clockHand;
225
226 statHit = new AtomicReference<>(newCounters());
227 statMiss = new AtomicReference<>(newCounters());
228 statEvict = new AtomicReference<>(newCounters());
229 liveBytes = new AtomicReference<>(newCounters());
230
231 refLockWaitTime = cfg.getRefLockWaitTimeConsumer();
232 }
233
234 boolean shouldCopyThroughCache(long length) {
235 return length <= maxStreamThroughCache;
236 }
237
238
239
240
241
242
243 public long[] getCurrentSize() {
244 return getStatVals(liveBytes);
245 }
246
247
248
249
250
251
252 public long getFillPercentage() {
253 return LongStream.of(getCurrentSize()).sum() * 100 / maxBytes;
254 }
255
256
257
258
259
260
261
262 public long[] getHitCount() {
263 return getStatVals(statHit);
264 }
265
266
267
268
269
270
271
272
273 public long[] getMissCount() {
274 return getStatVals(statMiss);
275 }
276
277
278
279
280
281
282 public long[] getTotalRequestCount() {
283 AtomicLong[] hit = statHit.get();
284 AtomicLong[] miss = statMiss.get();
285 long[] cnt = new long[Math.max(hit.length, miss.length)];
286 for (int i = 0; i < hit.length; i++) {
287 cnt[i] += hit[i].get();
288 }
289 for (int i = 0; i < miss.length; i++) {
290 cnt[i] += miss[i].get();
291 }
292 return cnt;
293 }
294
295
296
297
298
299
300 public long[] getHitRatio() {
301 AtomicLong[] hit = statHit.get();
302 AtomicLong[] miss = statMiss.get();
303 long[] ratio = new long[Math.max(hit.length, miss.length)];
304 for (int i = 0; i < ratio.length; i++) {
305 if (i >= hit.length) {
306 ratio[i] = 0;
307 } else if (i >= miss.length) {
308 ratio[i] = 100;
309 } else {
310 long hitVal = hit[i].get();
311 long missVal = miss[i].get();
312 long total = hitVal + missVal;
313 ratio[i] = total == 0 ? 0 : hitVal * 100 / total;
314 }
315 }
316 return ratio;
317 }
318
319
320
321
322
323
324
325
326 public long[] getEvictions() {
327 return getStatVals(statEvict);
328 }
329
330
331
332
333
334
335
336
337
338
339
340
341 public boolean hasBlock0(DfsStreamKey key) {
342 HashEntry e1 = table.get(slot(key, 0));
343 DfsBlock v = scan(e1, key, 0);
344 return v != null && v.contains(key, 0);
345 }
346
347 private int hash(int packHash, long off) {
348 return packHash + (int) (off >>> blockSizeShift);
349 }
350
351 int getBlockSize() {
352 return blockSize;
353 }
354
355 private static int tableSize(DfsBlockCacheConfig cfg) {
356 final int wsz = cfg.getBlockSize();
357 final long limit = cfg.getBlockLimit();
358 if (wsz <= 0) {
359 throw new IllegalArgumentException(JGitText.get().invalidWindowSize);
360 }
361 if (limit < wsz) {
362 throw new IllegalArgumentException(JGitText.get().windowSizeMustBeLesserThanLimit);
363 }
364 return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE);
365 }
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382 DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
383 ReadableChannelSupplier fileChannel) throws IOException {
384 final long requestedPosition = position;
385 position = file.alignToBlock(position);
386
387 DfsStreamKey key = file.key;
388 int slot = slot(key, position);
389 HashEntry e1 = table.get(slot);
390 DfsBlock v = scan(e1, key, position);
391 if (v != null && v.contains(key, requestedPosition)) {
392 ctx.stats.blockCacheHit++;
393 getStat(statHit, key).incrementAndGet();
394 return v;
395 }
396
397 reserveSpace(blockSize, key);
398 ReentrantLock regionLock = lockFor(key, position);
399 regionLock.lock();
400 try {
401 HashEntry e2 = table.get(slot);
402 if (e2 != e1) {
403 v = scan(e2, key, position);
404 if (v != null) {
405 ctx.stats.blockCacheHit++;
406 getStat(statHit, key).incrementAndGet();
407 creditSpace(blockSize, key);
408 return v;
409 }
410 }
411
412 getStat(statMiss, key).incrementAndGet();
413 boolean credit = true;
414 try {
415 v = file.readOneBlock(position, ctx, fileChannel.get());
416 credit = false;
417 } finally {
418 if (credit) {
419 creditSpace(blockSize, key);
420 }
421 }
422 if (position != v.start) {
423
424 position = v.start;
425 slot = slot(key, position);
426 e2 = table.get(slot);
427 }
428
429 Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v);
430 ref.hot = true;
431 for (;;) {
432 HashEntry n = new HashEntry(clean(e2), ref);
433 if (table.compareAndSet(slot, e2, n)) {
434 break;
435 }
436 e2 = table.get(slot);
437 }
438 addToClock(ref, blockSize - v.size());
439 } finally {
440 regionLock.unlock();
441 }
442
443
444
445 if (v.contains(file.key, requestedPosition)) {
446 return v;
447 }
448 return getOrLoad(file, requestedPosition, ctx, fileChannel);
449 }
450
451 @SuppressWarnings("unchecked")
452 private void reserveSpace(long reserve, DfsStreamKey key) {
453 clockLock.lock();
454 try {
455 long live = LongStream.of(getCurrentSize()).sum() + reserve;
456 if (maxBytes < live) {
457 Ref prev = clockHand;
458 Ref hand = clockHand.next;
459 do {
460 if (hand.hot) {
461
462
463 hand.hot = false;
464 prev = hand;
465 hand = hand.next;
466 continue;
467 } else if (prev == hand)
468 break;
469
470
471
472 Ref dead = hand;
473 hand = hand.next;
474 prev.next = hand;
475 dead.next = null;
476 dead.value = null;
477 live -= dead.size;
478 getStat(liveBytes, dead.key).addAndGet(-dead.size);
479 getStat(statEvict, dead.key).incrementAndGet();
480 } while (maxBytes < live);
481 clockHand = prev;
482 }
483 getStat(liveBytes, key).addAndGet(reserve);
484 } finally {
485 clockLock.unlock();
486 }
487 }
488
489 private void creditSpace(long credit, DfsStreamKey key) {
490 clockLock.lock();
491 try {
492 getStat(liveBytes, key).addAndGet(-credit);
493 } finally {
494 clockLock.unlock();
495 }
496 }
497
498 @SuppressWarnings("unchecked")
499 private void addToClock(Ref ref, long credit) {
500 clockLock.lock();
501 try {
502 if (credit != 0) {
503 getStat(liveBytes, ref.key).addAndGet(-credit);
504 }
505 Ref ptr = clockHand;
506 ref.next = ptr.next;
507 ptr.next = ref;
508 clockHand = ref;
509 } finally {
510 clockLock.unlock();
511 }
512 }
513
514 void put(DfsBlock v) {
515 put(v.stream, v.start, v.size(), v);
516 }
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531 <T> Ref<T> getOrLoadRef(
532 DfsStreamKey key, long position, RefLoader<T> loader)
533 throws IOException {
534 int slot = slot(key, position);
535 HashEntry e1 = table.get(slot);
536 Ref<T> ref = scanRef(e1, key, position);
537 if (ref != null) {
538 getStat(statHit, key).incrementAndGet();
539 return ref;
540 }
541
542 ReentrantLock regionLock = lockForRef(key);
543 long lockStart = System.currentTimeMillis();
544 regionLock.lock();
545 try {
546 HashEntry e2 = table.get(slot);
547 if (e2 != e1) {
548 ref = scanRef(e2, key, position);
549 if (ref != null) {
550 getStat(statHit, key).incrementAndGet();
551 return ref;
552 }
553 }
554
555 if (refLockWaitTime != null) {
556 refLockWaitTime.accept(
557 Long.valueOf(System.currentTimeMillis() - lockStart));
558 }
559 getStat(statMiss, key).incrementAndGet();
560 ref = loader.load();
561 ref.hot = true;
562
563 reserveSpace(ref.size, key);
564 for (;;) {
565 HashEntry n = new HashEntry(clean(e2), ref);
566 if (table.compareAndSet(slot, e2, n)) {
567 break;
568 }
569 e2 = table.get(slot);
570 }
571 addToClock(ref, 0);
572 } finally {
573 regionLock.unlock();
574 }
575 return ref;
576 }
577
578 <T> Ref<T> putRef(DfsStreamKey key, long size, T v) {
579 return put(key, 0, size, v);
580 }
581
582 <T> Ref<T> put(DfsStreamKey key, long pos, long size, T v) {
583 int slot = slot(key, pos);
584 HashEntry e1 = table.get(slot);
585 Ref<T> ref = scanRef(e1, key, pos);
586 if (ref != null) {
587 return ref;
588 }
589
590 reserveSpace(size, key);
591 ReentrantLock regionLock = lockFor(key, pos);
592 regionLock.lock();
593 try {
594 HashEntry e2 = table.get(slot);
595 if (e2 != e1) {
596 ref = scanRef(e2, key, pos);
597 if (ref != null) {
598 creditSpace(size, key);
599 return ref;
600 }
601 }
602
603 ref = new Ref<>(key, pos, size, v);
604 ref.hot = true;
605 for (;;) {
606 HashEntry n = new HashEntry(clean(e2), ref);
607 if (table.compareAndSet(slot, e2, n)) {
608 break;
609 }
610 e2 = table.get(slot);
611 }
612 addToClock(ref, 0);
613 } finally {
614 regionLock.unlock();
615 }
616 return ref;
617 }
618
619 boolean contains(DfsStreamKey key, long position) {
620 return scan(table.get(slot(key, position)), key, position) != null;
621 }
622
623 @SuppressWarnings("unchecked")
624 <T> T get(DfsStreamKey key, long position) {
625 T val = (T) scan(table.get(slot(key, position)), key, position);
626 if (val == null) {
627 getStat(statMiss, key).incrementAndGet();
628 } else {
629 getStat(statHit, key).incrementAndGet();
630 }
631 return val;
632 }
633
634 private <T> T scan(HashEntry n, DfsStreamKey key, long position) {
635 Ref<T> r = scanRef(n, key, position);
636 return r != null ? r.get() : null;
637 }
638
639 @SuppressWarnings("unchecked")
640 private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) {
641 for (; n != null; n = n.next) {
642 Ref<T> r = n.ref;
643 if (r.position == position && r.key.equals(key)) {
644 return r.get() != null ? r : null;
645 }
646 }
647 return null;
648 }
649
650 private int slot(DfsStreamKey key, long position) {
651 return (hash(key.hash, position) >>> 1) % tableSize;
652 }
653
654 private ReentrantLock lockFor(DfsStreamKey key, long position) {
655 return loadLocks[(hash(key.hash, position) >>> 1) % loadLocks.length];
656 }
657
658 private ReentrantLock lockForRef(DfsStreamKey key) {
659 return refLocks[(key.hash >>> 1) % refLocks.length];
660 }
661
662 private static AtomicLong[] newCounters() {
663 AtomicLong[] ret = new AtomicLong[PackExt.values().length];
664 for (int i = 0; i < ret.length; i++) {
665 ret[i] = new AtomicLong();
666 }
667 return ret;
668 }
669
670 private static AtomicLong getStat(AtomicReference<AtomicLong[]> stats,
671 DfsStreamKey key) {
672 int pos = key.packExtPos;
673 while (true) {
674 AtomicLong[] vals = stats.get();
675 if (pos < vals.length) {
676 return vals[pos];
677 }
678 AtomicLong[] expect = vals;
679 vals = new AtomicLong[Math.max(pos + 1, PackExt.values().length)];
680 System.arraycopy(expect, 0, vals, 0, expect.length);
681 for (int i = expect.length; i < vals.length; i++) {
682 vals[i] = new AtomicLong();
683 }
684 if (stats.compareAndSet(expect, vals)) {
685 return vals[pos];
686 }
687 }
688 }
689
690 private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) {
691 AtomicLong[] stats = stat.get();
692 long[] cnt = new long[stats.length];
693 for (int i = 0; i < stats.length; i++) {
694 cnt[i] = stats[i].get();
695 }
696 return cnt;
697 }
698
699 private static HashEntry clean(HashEntry top) {
700 while (top != null && top.ref.next == null)
701 top = top.next;
702 if (top == null) {
703 return null;
704 }
705 HashEntry n = clean(top.next);
706 return n == top.next ? top : new HashEntry(n, top.ref);
707 }
708
709 private static final class HashEntry {
710
711 final HashEntry next;
712
713
714 final Ref ref;
715
716 HashEntry(HashEntry n, Ref r) {
717 next = n;
718 ref = r;
719 }
720 }
721
722 static final class Ref<T> {
723 final DfsStreamKey key;
724 final long position;
725 final long size;
726 volatile T value;
727 Ref next;
728 volatile boolean hot;
729
730 Ref(DfsStreamKey key, long position, long size, T v) {
731 this.key = key;
732 this.position = position;
733 this.size = size;
734 this.value = v;
735 }
736
737 T get() {
738 T v = value;
739 if (v != null) {
740 hot = true;
741 }
742 return v;
743 }
744
745 boolean has() {
746 return value != null;
747 }
748 }
749
750 @FunctionalInterface
751 interface RefLoader<T> {
752 Ref<T> load() throws IOException;
753 }
754
755
756
757
758 @FunctionalInterface
759 interface ReadableChannelSupplier {
760
761
762
763
764 ReadableChannel get() throws IOException;
765 }
766 }