1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.ketch;
45
46 import static java.util.concurrent.TimeUnit.MILLISECONDS;
47 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.BATCHED;
48 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.FAST;
49 import static org.eclipse.jgit.internal.ketch.KetchReplica.State.CURRENT;
50 import static org.eclipse.jgit.internal.ketch.KetchReplica.State.LAGGING;
51 import static org.eclipse.jgit.internal.ketch.KetchReplica.State.OFFLINE;
52 import static org.eclipse.jgit.internal.ketch.KetchReplica.State.UNKNOWN;
53 import static org.eclipse.jgit.lib.Constants.HEAD;
54 import static org.eclipse.jgit.lib.FileMode.TYPE_GITLINK;
55 import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
56 import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
57 import static org.eclipse.jgit.transport.ReceiveCommand.Type.CREATE;
58
59 import java.io.IOException;
60 import java.lang.ref.WeakReference;
61 import java.util.ArrayList;
62 import java.util.Collection;
63 import java.util.HashMap;
64 import java.util.Iterator;
65 import java.util.List;
66 import java.util.Map;
67 import java.util.concurrent.Callable;
68 import java.util.concurrent.Future;
69
70 import org.eclipse.jgit.annotations.NonNull;
71 import org.eclipse.jgit.annotations.Nullable;
72 import org.eclipse.jgit.internal.storage.reftree.RefTree;
73 import org.eclipse.jgit.lib.AnyObjectId;
74 import org.eclipse.jgit.lib.ObjectId;
75 import org.eclipse.jgit.lib.Ref;
76 import org.eclipse.jgit.lib.Repository;
77 import org.eclipse.jgit.revwalk.RevWalk;
78 import org.eclipse.jgit.transport.ReceiveCommand;
79 import org.eclipse.jgit.treewalk.TreeWalk;
80 import org.eclipse.jgit.util.SystemReader;
81 import org.slf4j.Logger;
82 import org.slf4j.LoggerFactory;
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 public abstract class KetchReplica {
113 static final Logger log = LoggerFactory.getLogger(KetchReplica.class);
114 private static final byte[] PEEL = { ' ', '^' };
115
116
117 public enum Participation {
118
119 FULL,
120
121
122 FOLLOWER_ONLY;
123 }
124
125
126 public enum CommitMethod {
127
128 ALL_REFS,
129
130
131 TXN_COMMITTED;
132 }
133
134
135 public enum CommitSpeed {
136
137
138
139
140 FAST,
141
142
143
144
145
146
147 BATCHED;
148 }
149
150
151 public enum State {
152
153 UNKNOWN,
154
155
156 LAGGING,
157
158
159 CURRENT,
160
161
162 DIVERGENT,
163
164
165 AHEAD,
166
167
168 OFFLINE;
169 }
170
171 private final KetchLeader leader;
172 private final String replicaName;
173 private final Participation participation;
174 private final CommitMethod commitMethod;
175 private final CommitSpeed commitSpeed;
176 private final long minRetryMillis;
177 private final long maxRetryMillis;
178 private final Map<ObjectId, List<ReceiveCommand>> staged;
179 private final Map<String, ReceiveCommand> running;
180 private final Map<String, ReceiveCommand> waiting;
181 private final List<ReplicaPushRequest> queued;
182
183
184
185
186
187
188 private ObjectId txnAccepted;
189
190
191
192
193
194
195
196
197 private ObjectId txnCommitted;
198
199
200 private State state = UNKNOWN;
201 private String error;
202
203
204 private Future<?> retryFuture;
205 private long lastRetryMillis;
206 private long retryAtMillis;
207
208
209
210
211
212
213
214
215
216
217
218 protected KetchReplica(KetchLeader leader, String name, ReplicaConfig cfg) {
219 this.leader = leader;
220 this.replicaName = name;
221 this.participation = cfg.getParticipation();
222 this.commitMethod = cfg.getCommitMethod();
223 this.commitSpeed = cfg.getCommitSpeed();
224 this.minRetryMillis = cfg.getMinRetry(MILLISECONDS);
225 this.maxRetryMillis = cfg.getMaxRetry(MILLISECONDS);
226 this.staged = new HashMap<>();
227 this.running = new HashMap<>();
228 this.waiting = new HashMap<>();
229 this.queued = new ArrayList<>(4);
230 }
231
232
233
234
235
236
237 public KetchSystem getSystem() {
238 return getLeader().getSystem();
239 }
240
241
242
243
244
245
246 public KetchLeader getLeader() {
247 return leader;
248 }
249
250
251
252
253
254
255 public String getName() {
256 return replicaName;
257 }
258
259
260
261
262
263
264 protected String describeForLog() {
265 return getName();
266 }
267
268
269
270
271
272
273 public Participation getParticipation() {
274 return participation;
275 }
276
277
278
279
280
281
282 public CommitMethod getCommitMethod() {
283 return commitMethod;
284 }
285
286
287
288
289
290
291 public CommitSpeed getCommitSpeed() {
292 return commitSpeed;
293 }
294
295
296
297
298
299
300
301
302
303
304 protected void shutdown() {
305 Future<?> f = retryFuture;
306 if (f != null) {
307 retryFuture = null;
308 f.cancel(true);
309 }
310 }
311
312 ReplicaSnapshot snapshot() {
313 ReplicaSnapshot s = new ReplicaSnapshot(this);
314 s.accepted = txnAccepted;
315 s.committed = txnCommitted;
316 s.state = state;
317 s.error = error;
318 s.retryAtMillis = waitingForRetry() ? retryAtMillis : 0;
319 return s;
320 }
321
322
323
324
325
326
327
328
329
330 void initialize(Map<String, Ref> refs) {
331 if (txnAccepted == null) {
332 txnAccepted = getId(refs.get(getSystem().getTxnAccepted()));
333 }
334 if (txnCommitted == null) {
335 txnCommitted = getId(refs.get(getSystem().getTxnCommitted()));
336 }
337 }
338
339 ObjectId getTxnAccepted() {
340 return txnAccepted;
341 }
342
343 boolean hasAccepted(LogIndex id) {
344 return equals(txnAccepted, id);
345 }
346
347 private static boolean equals(@Nullable ObjectId a, LogIndex b) {
348 return a != null && b != null && AnyObjectId.equals(a, b);
349 }
350
351
352
353
354
355
356
357
358
359 void pushTxnAcceptedAsync(Round round) {
360 List<ReceiveCommand> cmds = new ArrayList<>();
361 if (commitSpeed == BATCHED) {
362 LogIndex committedIndex = leader.getCommitted();
363 if (equals(txnAccepted, committedIndex)
364 && !equals(txnCommitted, committedIndex)) {
365 prepareTxnCommitted(cmds, committedIndex);
366 }
367 }
368
369
370 if (round.stageCommands != null) {
371 for (ReceiveCommand cmd : round.stageCommands) {
372
373 cmds.add(copy(cmd));
374 }
375 }
376 cmds.add(new ReceiveCommand(
377 round.acceptedOldIndex, round.acceptedNewIndex,
378 getSystem().getTxnAccepted()));
379 pushAsync(new ReplicaPushRequest(this, cmds));
380 }
381
382 private static ReceiveCommand../../../../org/eclipse/jgit/transport/ReceiveCommand.html#ReceiveCommand">ReceiveCommand copy(ReceiveCommand c) {
383 return new ReceiveCommand(c.getOldId(), c.getNewId(), c.getRefName());
384 }
385
386 boolean shouldPushUnbatchedCommit(LogIndex committed, boolean leaderIdle) {
387 return (leaderIdle || commitSpeed == FAST) && hasAccepted(committed);
388 }
389
390 void pushCommitAsync(LogIndex committed) {
391 List<ReceiveCommand> cmds = new ArrayList<>();
392 prepareTxnCommitted(cmds, committed);
393 pushAsync(new ReplicaPushRequest(this, cmds));
394 }
395
396 private void prepareTxnCommitted(List<ReceiveCommand> cmds,
397 ObjectId committed) {
398 removeStaged(cmds, committed);
399 cmds.add(new ReceiveCommand(
400 txnCommitted, committed,
401 getSystem().getTxnCommitted()));
402 }
403
404 private void removeStaged(List<ReceiveCommand> cmds, ObjectId committed) {
405 List<ReceiveCommand> a = staged.remove(committed);
406 if (a != null) {
407 delete(cmds, a);
408 }
409 if (staged.isEmpty() || !(committed instanceof LogIndex)) {
410 return;
411 }
412
413 LogIndex committedIndex = (LogIndex) committed;
414 Iterator<Map.Entry<ObjectId, List<ReceiveCommand>>> itr = staged
415 .entrySet().iterator();
416 while (itr.hasNext()) {
417 Map.Entry<ObjectId, List<ReceiveCommand>> e = itr.next();
418 if (e.getKey() instanceof LogIndex) {
419 LogIndex stagedIndex = (LogIndex) e.getKey();
420 if (stagedIndex.isBefore(committedIndex)) {
421 delete(cmds, e.getValue());
422 itr.remove();
423 }
424 }
425 }
426 }
427
428 private static void delete(List<ReceiveCommand> cmds,
429 List<ReceiveCommand> createCmds) {
430 for (ReceiveCommand cmd : createCmds) {
431 ObjectId id = cmd.getNewId();
432 String name = cmd.getRefName();
433 cmds.add(new ReceiveCommand(id, ObjectId.zeroId(), name));
434 }
435 }
436
437
438
439
440
441
442
443
444
445
446
447
448
449 private void runNextPushRequest() {
450 LogIndex committed = leader.getCommitted();
451 if (!equals(txnCommitted, committed)
452 && shouldPushUnbatchedCommit(committed, leader.isIdle())) {
453 pushCommitAsync(committed);
454 }
455
456 if (queued.isEmpty() || !running.isEmpty() || waitingForRetry()) {
457 return;
458 }
459
460
461 Map<String, ReceiveCommand> cmdMap = new HashMap<>();
462 for (ReplicaPushRequest req : queued) {
463 for (ReceiveCommand cmd : req.getCommands()) {
464 String name = cmd.getRefName();
465 ReceiveCommand old = cmdMap.remove(name);
466 if (old != null) {
467 cmd = new ReceiveCommand(
468 old.getOldId(), cmd.getNewId(),
469 name);
470 }
471 cmdMap.put(name, cmd);
472 }
473 }
474 queued.clear();
475 waiting.clear();
476
477 List<ReceiveCommand> next = new ArrayList<>(cmdMap.values());
478 for (ReceiveCommand cmd : next) {
479 running.put(cmd.getRefName(), cmd);
480 }
481 startPush(new ReplicaPushRequest(this, next));
482 }
483
484 private void pushAsync(ReplicaPushRequest req) {
485 if (defer(req)) {
486
487 for (ReceiveCommand cmd : req.getCommands()) {
488 waiting.put(cmd.getRefName(), cmd);
489 }
490 queued.add(req);
491 } else {
492 for (ReceiveCommand cmd : req.getCommands()) {
493 running.put(cmd.getRefName(), cmd);
494 }
495 startPush(req);
496 }
497 }
498
499 private boolean defer(ReplicaPushRequest req) {
500 if (waitingForRetry()) {
501
502 return true;
503 }
504
505 for (ReceiveCommand nextCmd : req.getCommands()) {
506 ReceiveCommand priorCmd = waiting.get(nextCmd.getRefName());
507 if (priorCmd == null) {
508 priorCmd = running.get(nextCmd.getRefName());
509 }
510 if (priorCmd != null) {
511
512
513 return true;
514 }
515 }
516 return false;
517 }
518
519 private boolean waitingForRetry() {
520 Future<?> f = retryFuture;
521 return f != null && !f.isDone();
522 }
523
524 private void retryLater(ReplicaPushRequest req) {
525 Collection<ReceiveCommand> cmds = req.getCommands();
526 for (ReceiveCommand cmd : cmds) {
527 cmd.setResult(NOT_ATTEMPTED, null);
528 if (!waiting.containsKey(cmd.getRefName())) {
529 waiting.put(cmd.getRefName(), cmd);
530 }
531 }
532 queued.add(0, new ReplicaPushRequest(this, cmds));
533
534 if (!waitingForRetry()) {
535 long delay = KetchSystem.delay(
536 lastRetryMillis,
537 minRetryMillis, maxRetryMillis);
538 if (log.isDebugEnabled()) {
539 log.debug("Retrying {} after {} ms",
540 describeForLog(), Long.valueOf(delay));
541 }
542 lastRetryMillis = delay;
543 retryAtMillis = SystemReader.getInstance().getCurrentTime() + delay;
544 retryFuture = getSystem().getExecutor()
545 .schedule(new WeakRetryPush(this), delay, MILLISECONDS);
546 }
547 }
548
549
550 static class WeakRetryPush extends WeakReference<KetchReplica>
551 implements Callable<Void> {
552 WeakRetryPush(KetchReplica r) {
553 super(r);
554 }
555
556 @Override
557 public Void call() throws Exception {
558 KetchReplica r = get();
559 if (r != null) {
560 r.doRetryPush();
561 }
562 return null;
563 }
564 }
565
566 private void doRetryPush() {
567 leader.lock.lock();
568 try {
569 retryFuture = null;
570 runNextPushRequest();
571 } finally {
572 leader.lock.unlock();
573 }
574 }
575
576
577
578
579
580
581
582
583
584
585 protected abstract void startPush(ReplicaPushRequest req);
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601 void afterPush(@Nullable Repository repo, ReplicaPushRequest req) {
602 ReceiveCommand acceptCmd = null;
603 ReceiveCommand commitCmd = null;
604 List<ReceiveCommand> stages = null;
605
606 for (ReceiveCommand cmd : req.getCommands()) {
607 String name = cmd.getRefName();
608 if (name.equals(getSystem().getTxnAccepted())) {
609 acceptCmd = cmd;
610 } else if (name.equals(getSystem().getTxnCommitted())) {
611 commitCmd = cmd;
612 } else if (cmd.getResult() == OK && cmd.getType() == CREATE
613 && name.startsWith(getSystem().getTxnStage())) {
614 if (stages == null) {
615 stages = new ArrayList<>();
616 }
617 stages.add(cmd);
618 }
619 }
620
621 State newState = null;
622 ObjectId acceptId = readId(req, acceptCmd);
623 if (repo != null && acceptCmd != null && acceptCmd.getResult() != OK
624 && req.getException() == null) {
625 try (LagCheckl/ketch/LagCheck.html#LagCheck">LagCheck lag = new LagCheck(this, repo)) {
626 newState = lag.check(acceptId, acceptCmd);
627 acceptId = lag.getRemoteId();
628 }
629 }
630
631 leader.lock.lock();
632 try {
633 for (ReceiveCommand cmd : req.getCommands()) {
634 running.remove(cmd.getRefName());
635 }
636
637 Throwable err = req.getException();
638 if (err != null) {
639 state = OFFLINE;
640 error = err.toString();
641 retryLater(req);
642 leader.onReplicaUpdate(this);
643 return;
644 }
645
646 lastRetryMillis = 0;
647 error = null;
648 updateView(req, acceptId, commitCmd);
649
650 if (acceptCmd != null && acceptCmd.getResult() == OK) {
651 state = hasAccepted(leader.getHead()) ? CURRENT : LAGGING;
652 if (stages != null) {
653 staged.put(acceptCmd.getNewId(), stages);
654 }
655 } else if (newState != null) {
656 state = newState;
657 }
658
659 leader.onReplicaUpdate(this);
660 runNextPushRequest();
661 } finally {
662 leader.lock.unlock();
663 }
664 }
665
666 private void updateView(ReplicaPushRequest req, @Nullable ObjectId acceptId,
667 ReceiveCommand commitCmd) {
668 if (acceptId != null) {
669 txnAccepted = acceptId;
670 }
671
672 ObjectId committed = readId(req, commitCmd);
673 if (committed != null) {
674 txnCommitted = committed;
675 } else if (acceptId != null && txnCommitted == null) {
676
677 Map<String, Ref> adv = req.getRefs();
678 if (adv != null) {
679 Ref refs = adv.get(getSystem().getTxnCommitted());
680 txnCommitted = getId(refs);
681 }
682 }
683 }
684
685 @Nullable
686 private static ObjectId readId(ReplicaPushRequest req,
687 @Nullable ReceiveCommand cmd) {
688 if (cmd == null) {
689
690 return null;
691
692 } else if (cmd.getResult() == OK) {
693
694 return cmd.getNewId();
695 }
696
697 Map<String, Ref> refs = req.getRefs();
698 return refs != null ? getId(refs.get(cmd.getRefName())) : null;
699 }
700
701
702
703
704
705
706
707
708
709
710
711
712
713 protected abstract void blockingFetch(Repository repo,
714 ReplicaFetchRequest req) throws IOException;
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731 protected Collection<ReceiveCommand> prepareCommit(Repository git,
732 Map<String, Ref> current, ObjectId committed) throws IOException {
733 List<ReceiveCommand> delta = new ArrayList<>();
734 Map<String, Ref> remote = new HashMap<>(current);
735 try (RevWalklk/RevWalk.html#RevWalk">RevWalk rw = new RevWalk(git);
736 TreeWalk tw = new TreeWalk(rw.getObjectReader())) {
737 tw.setRecursive(true);
738 tw.addTree(rw.parseCommit(committed).getTree());
739 while (tw.next()) {
740 if (tw.getRawMode(0) != TYPE_GITLINK
741 || tw.isPathSuffix(PEEL, 2)) {
742
743
744 continue;
745 }
746
747
748 String name = RefTree.refName(tw.getPathString());
749 Ref oldRef = remote.remove(name);
750 ObjectId oldId = getId(oldRef);
751 ObjectId newId = tw.getObjectId(0);
752 if (!AnyObjectId.equals(oldId, newId)) {
753 delta.add(new ReceiveCommand(oldId, newId, name));
754 }
755 }
756 }
757
758
759 for (Ref ref : remote.values()) {
760 if (canDelete(ref)) {
761 delta.add(new ReceiveCommand(
762 ref.getObjectId(), ObjectId.zeroId(),
763 ref.getName()));
764 }
765 }
766 return delta;
767 }
768
769 boolean canDelete(Ref ref) {
770 String name = ref.getName();
771 if (HEAD.equals(name)) {
772 return false;
773 }
774 if (name.startsWith(getSystem().getTxnNamespace())) {
775 return false;
776 }
777
778 return true;
779 }
780
781 @NonNull
782 static ObjectId getId(@Nullable Ref ref) {
783 if (ref != null) {
784 ObjectId id = ref.getObjectId();
785 if (id != null) {
786 return id;
787 }
788 }
789 return ObjectId.zeroId();
790 }
791 }