1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.ketch;
45
46 import static java.util.concurrent.TimeUnit.MILLISECONDS;
47 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.BATCHED;
48 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.FAST;
49 import static org.eclipse.jgit.internal.ketch.KetchReplica.State.CURRENT;
50 import static org.eclipse.jgit.internal.ketch.KetchReplica.State.LAGGING;
51 import static org.eclipse.jgit.internal.ketch.KetchReplica.State.OFFLINE;
52 import static org.eclipse.jgit.internal.ketch.KetchReplica.State.UNKNOWN;
53 import static org.eclipse.jgit.lib.Constants.HEAD;
54 import static org.eclipse.jgit.lib.FileMode.TYPE_GITLINK;
55 import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
56 import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
57 import static org.eclipse.jgit.transport.ReceiveCommand.Type.CREATE;
58
59 import java.io.IOException;
60 import java.lang.ref.WeakReference;
61 import java.util.ArrayList;
62 import java.util.Collection;
63 import java.util.HashMap;
64 import java.util.Iterator;
65 import java.util.List;
66 import java.util.Map;
67 import java.util.concurrent.Callable;
68 import java.util.concurrent.Future;
69
70 import org.eclipse.jgit.annotations.NonNull;
71 import org.eclipse.jgit.annotations.Nullable;
72 import org.eclipse.jgit.internal.storage.reftree.RefTree;
73 import org.eclipse.jgit.lib.AnyObjectId;
74 import org.eclipse.jgit.lib.ObjectId;
75 import org.eclipse.jgit.lib.Ref;
76 import org.eclipse.jgit.lib.Repository;
77 import org.eclipse.jgit.revwalk.RevWalk;
78 import org.eclipse.jgit.transport.ReceiveCommand;
79 import org.eclipse.jgit.treewalk.TreeWalk;
80 import org.eclipse.jgit.util.SystemReader;
81 import org.slf4j.Logger;
82 import org.slf4j.LoggerFactory;
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 public abstract class KetchReplica {
107 static final Logger log = LoggerFactory.getLogger(KetchReplica.class);
108 private static final byte[] PEEL = { ' ', '^' };
109
110
111 public enum Participation {
112
113 FULL,
114
115
116 FOLLOWER_ONLY;
117 }
118
119
120 public enum CommitMethod {
121
122 ALL_REFS,
123
124
125 TXN_COMMITTED;
126 }
127
128
129 public enum CommitSpeed {
130
131
132
133
134 FAST,
135
136
137
138
139
140
141 BATCHED;
142 }
143
144
145 public enum State {
146
147 UNKNOWN,
148
149
150 LAGGING,
151
152
153 CURRENT,
154
155
156 DIVERGENT,
157
158
159 AHEAD,
160
161
162 OFFLINE;
163 }
164
165 private final KetchLeader leader;
166 private final String replicaName;
167 private final Participation participation;
168 private final CommitMethod commitMethod;
169 private final CommitSpeed commitSpeed;
170 private final long minRetryMillis;
171 private final long maxRetryMillis;
172 private final Map<ObjectId, List<ReceiveCommand>> staged;
173 private final Map<String, ReceiveCommand> running;
174 private final Map<String, ReceiveCommand> waiting;
175 private final List<ReplicaPushRequest> queued;
176
177
178
179
180
181
182 private ObjectId txnAccepted;
183
184
185
186
187
188
189
190
191 private ObjectId txnCommitted;
192
193
194 private State state = UNKNOWN;
195 private String error;
196
197
198 private Future<?> retryFuture;
199 private long lastRetryMillis;
200 private long retryAtMillis;
201
202
203
204
205
206
207
208
209
210
211
212 protected KetchReplica(KetchLeader leader, String name, ReplicaConfig cfg) {
213 this.leader = leader;
214 this.replicaName = name;
215 this.participation = cfg.getParticipation();
216 this.commitMethod = cfg.getCommitMethod();
217 this.commitSpeed = cfg.getCommitSpeed();
218 this.minRetryMillis = cfg.getMinRetry(MILLISECONDS);
219 this.maxRetryMillis = cfg.getMaxRetry(MILLISECONDS);
220 this.staged = new HashMap<>();
221 this.running = new HashMap<>();
222 this.waiting = new HashMap<>();
223 this.queued = new ArrayList<>(4);
224 }
225
226
227 public KetchSystem getSystem() {
228 return getLeader().getSystem();
229 }
230
231
232 public KetchLeader getLeader() {
233 return leader;
234 }
235
236
237 public String getName() {
238 return replicaName;
239 }
240
241
242 protected String describeForLog() {
243 return getName();
244 }
245
246
247 public Participation getParticipation() {
248 return participation;
249 }
250
251
252 public CommitMethod getCommitMethod() {
253 return commitMethod;
254 }
255
256
257 public CommitSpeed getCommitSpeed() {
258 return commitSpeed;
259 }
260
261
262
263
264
265
266
267
268
269 protected void shutdown() {
270 Future<?> f = retryFuture;
271 if (f != null) {
272 retryFuture = null;
273 f.cancel(true);
274 }
275 }
276
277 ReplicaSnapshot snapshot() {
278 ReplicaSnapshot s = new ReplicaSnapshot(this);
279 s.accepted = txnAccepted;
280 s.committed = txnCommitted;
281 s.state = state;
282 s.error = error;
283 s.retryAtMillis = waitingForRetry() ? retryAtMillis : 0;
284 return s;
285 }
286
287
288
289
290
291
292
293
294
295 void initialize(Map<String, Ref> refs) {
296 if (txnAccepted == null) {
297 txnAccepted = getId(refs.get(getSystem().getTxnAccepted()));
298 }
299 if (txnCommitted == null) {
300 txnCommitted = getId(refs.get(getSystem().getTxnCommitted()));
301 }
302 }
303
304 ObjectId getTxnAccepted() {
305 return txnAccepted;
306 }
307
308 boolean hasAccepted(LogIndex id) {
309 return equals(txnAccepted, id);
310 }
311
312 private static boolean equals(@Nullable ObjectId a, LogIndex b) {
313 return a != null && b != null && AnyObjectId.equals(a, b);
314 }
315
316
317
318
319
320
321
322
323
324 void pushTxnAcceptedAsync(Round round) {
325 List<ReceiveCommand> cmds = new ArrayList<>();
326 if (commitSpeed == BATCHED) {
327 LogIndex committedIndex = leader.getCommitted();
328 if (equals(txnAccepted, committedIndex)
329 && !equals(txnCommitted, committedIndex)) {
330 prepareTxnCommitted(cmds, committedIndex);
331 }
332 }
333
334
335 if (round.stageCommands != null) {
336 for (ReceiveCommand cmd : round.stageCommands) {
337
338 cmds.add(copy(cmd));
339 }
340 }
341 cmds.add(new ReceiveCommand(
342 round.acceptedOldIndex, round.acceptedNewIndex,
343 getSystem().getTxnAccepted()));
344 pushAsync(new ReplicaPushRequest(this, cmds));
345 }
346
347 private static ReceiveCommand copy(ReceiveCommand c) {
348 return new ReceiveCommand(c.getOldId(), c.getNewId(), c.getRefName());
349 }
350
351 boolean shouldPushUnbatchedCommit(LogIndex committed, boolean leaderIdle) {
352 return (leaderIdle || commitSpeed == FAST) && hasAccepted(committed);
353 }
354
355 void pushCommitAsync(LogIndex committed) {
356 List<ReceiveCommand> cmds = new ArrayList<>();
357 prepareTxnCommitted(cmds, committed);
358 pushAsync(new ReplicaPushRequest(this, cmds));
359 }
360
361 private void prepareTxnCommitted(List<ReceiveCommand> cmds,
362 ObjectId committed) {
363 removeStaged(cmds, committed);
364 cmds.add(new ReceiveCommand(
365 txnCommitted, committed,
366 getSystem().getTxnCommitted()));
367 }
368
369 private void removeStaged(List<ReceiveCommand> cmds, ObjectId committed) {
370 List<ReceiveCommand> a = staged.remove(committed);
371 if (a != null) {
372 delete(cmds, a);
373 }
374 if (staged.isEmpty() || !(committed instanceof LogIndex)) {
375 return;
376 }
377
378 LogIndex committedIndex = (LogIndex) committed;
379 Iterator<Map.Entry<ObjectId, List<ReceiveCommand>>> itr = staged
380 .entrySet().iterator();
381 while (itr.hasNext()) {
382 Map.Entry<ObjectId, List<ReceiveCommand>> e = itr.next();
383 if (e.getKey() instanceof LogIndex) {
384 LogIndex stagedIndex = (LogIndex) e.getKey();
385 if (stagedIndex.isBefore(committedIndex)) {
386 delete(cmds, e.getValue());
387 itr.remove();
388 }
389 }
390 }
391 }
392
393 private static void delete(List<ReceiveCommand> cmds,
394 List<ReceiveCommand> createCmds) {
395 for (ReceiveCommand cmd : createCmds) {
396 ObjectId id = cmd.getNewId();
397 String name = cmd.getRefName();
398 cmds.add(new ReceiveCommand(id, ObjectId.zeroId(), name));
399 }
400 }
401
402
403
404
405
406
407
408
409
410
411
412
413
414 private void runNextPushRequest() {
415 LogIndex committed = leader.getCommitted();
416 if (!equals(txnCommitted, committed)
417 && shouldPushUnbatchedCommit(committed, leader.isIdle())) {
418 pushCommitAsync(committed);
419 }
420
421 if (queued.isEmpty() || !running.isEmpty() || waitingForRetry()) {
422 return;
423 }
424
425
426 Map<String, ReceiveCommand> cmdMap = new HashMap<>();
427 for (ReplicaPushRequest req : queued) {
428 for (ReceiveCommand cmd : req.getCommands()) {
429 String name = cmd.getRefName();
430 ReceiveCommand old = cmdMap.remove(name);
431 if (old != null) {
432 cmd = new ReceiveCommand(
433 old.getOldId(), cmd.getNewId(),
434 name);
435 }
436 cmdMap.put(name, cmd);
437 }
438 }
439 queued.clear();
440 waiting.clear();
441
442 List<ReceiveCommand> next = new ArrayList<>(cmdMap.values());
443 for (ReceiveCommand cmd : next) {
444 running.put(cmd.getRefName(), cmd);
445 }
446 startPush(new ReplicaPushRequest(this, next));
447 }
448
449 private void pushAsync(ReplicaPushRequest req) {
450 if (defer(req)) {
451
452 for (ReceiveCommand cmd : req.getCommands()) {
453 waiting.put(cmd.getRefName(), cmd);
454 }
455 queued.add(req);
456 } else {
457 for (ReceiveCommand cmd : req.getCommands()) {
458 running.put(cmd.getRefName(), cmd);
459 }
460 startPush(req);
461 }
462 }
463
464 private boolean defer(ReplicaPushRequest req) {
465 if (waitingForRetry()) {
466
467 return true;
468 }
469
470 for (ReceiveCommand nextCmd : req.getCommands()) {
471 ReceiveCommand priorCmd = waiting.get(nextCmd.getRefName());
472 if (priorCmd == null) {
473 priorCmd = running.get(nextCmd.getRefName());
474 }
475 if (priorCmd != null) {
476
477
478 return true;
479 }
480 }
481 return false;
482 }
483
484 private boolean waitingForRetry() {
485 Future<?> f = retryFuture;
486 return f != null && !f.isDone();
487 }
488
489 private void retryLater(ReplicaPushRequest req) {
490 Collection<ReceiveCommand> cmds = req.getCommands();
491 for (ReceiveCommand cmd : cmds) {
492 cmd.setResult(NOT_ATTEMPTED, null);
493 if (!waiting.containsKey(cmd.getRefName())) {
494 waiting.put(cmd.getRefName(), cmd);
495 }
496 }
497 queued.add(0, new ReplicaPushRequest(this, cmds));
498
499 if (!waitingForRetry()) {
500 long delay = KetchSystem.delay(
501 lastRetryMillis,
502 minRetryMillis, maxRetryMillis);
503 if (log.isDebugEnabled()) {
504 log.debug("Retrying {} after {} ms",
505 describeForLog(), Long.valueOf(delay));
506 }
507 lastRetryMillis = delay;
508 retryAtMillis = SystemReader.getInstance().getCurrentTime() + delay;
509 retryFuture = getSystem().getExecutor()
510 .schedule(new WeakRetryPush(this), delay, MILLISECONDS);
511 }
512 }
513
514
515 static class WeakRetryPush extends WeakReference<KetchReplica>
516 implements Callable<Void> {
517 WeakRetryPush(KetchReplica r) {
518 super(r);
519 }
520
521 @Override
522 public Void call() throws Exception {
523 KetchReplica r = get();
524 if (r != null) {
525 r.doRetryPush();
526 }
527 return null;
528 }
529 }
530
531 private void doRetryPush() {
532 leader.lock.lock();
533 try {
534 retryFuture = null;
535 runNextPushRequest();
536 } finally {
537 leader.lock.unlock();
538 }
539 }
540
541
542
543
544
545
546
547
548
549
550 protected abstract void startPush(ReplicaPushRequest req);
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566 void afterPush(@Nullable Repository repo, ReplicaPushRequest req) {
567 ReceiveCommand acceptCmd = null;
568 ReceiveCommand commitCmd = null;
569 List<ReceiveCommand> stages = null;
570
571 for (ReceiveCommand cmd : req.getCommands()) {
572 String name = cmd.getRefName();
573 if (name.equals(getSystem().getTxnAccepted())) {
574 acceptCmd = cmd;
575 } else if (name.equals(getSystem().getTxnCommitted())) {
576 commitCmd = cmd;
577 } else if (cmd.getResult() == OK && cmd.getType() == CREATE
578 && name.startsWith(getSystem().getTxnStage())) {
579 if (stages == null) {
580 stages = new ArrayList<>();
581 }
582 stages.add(cmd);
583 }
584 }
585
586 State newState = null;
587 ObjectId acceptId = readId(req, acceptCmd);
588 if (repo != null && acceptCmd != null && acceptCmd.getResult() != OK
589 && req.getException() == null) {
590 try (LagCheck lag = new LagCheck(this, repo)) {
591 newState = lag.check(acceptId, acceptCmd);
592 acceptId = lag.getRemoteId();
593 }
594 }
595
596 leader.lock.lock();
597 try {
598 for (ReceiveCommand cmd : req.getCommands()) {
599 running.remove(cmd.getRefName());
600 }
601
602 Throwable err = req.getException();
603 if (err != null) {
604 state = OFFLINE;
605 error = err.toString();
606 retryLater(req);
607 leader.onReplicaUpdate(this);
608 return;
609 }
610
611 lastRetryMillis = 0;
612 error = null;
613 updateView(req, acceptId, commitCmd);
614
615 if (acceptCmd != null && acceptCmd.getResult() == OK) {
616 state = hasAccepted(leader.getHead()) ? CURRENT : LAGGING;
617 if (stages != null) {
618 staged.put(acceptCmd.getNewId(), stages);
619 }
620 } else if (newState != null) {
621 state = newState;
622 }
623
624 leader.onReplicaUpdate(this);
625 runNextPushRequest();
626 } finally {
627 leader.lock.unlock();
628 }
629 }
630
631 private void updateView(ReplicaPushRequest req, @Nullable ObjectId acceptId,
632 ReceiveCommand commitCmd) {
633 if (acceptId != null) {
634 txnAccepted = acceptId;
635 }
636
637 ObjectId committed = readId(req, commitCmd);
638 if (committed != null) {
639 txnCommitted = committed;
640 } else if (acceptId != null && txnCommitted == null) {
641
642 Map<String, Ref> adv = req.getRefs();
643 if (adv != null) {
644 Ref refs = adv.get(getSystem().getTxnCommitted());
645 txnCommitted = getId(refs);
646 }
647 }
648 }
649
650 @Nullable
651 private static ObjectId readId(ReplicaPushRequest req,
652 @Nullable ReceiveCommand cmd) {
653 if (cmd == null) {
654
655 return null;
656
657 } else if (cmd.getResult() == OK) {
658
659 return cmd.getNewId();
660 }
661
662 Map<String, Ref> refs = req.getRefs();
663 return refs != null ? getId(refs.get(cmd.getRefName())) : null;
664 }
665
666
667
668
669
670
671
672
673
674
675
676
677
678 protected abstract void blockingFetch(Repository repo,
679 ReplicaFetchRequest req) throws IOException;
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695 protected Collection<ReceiveCommand> prepareCommit(Repository git,
696 Map<String, Ref> current, ObjectId committed) throws IOException {
697 List<ReceiveCommand> delta = new ArrayList<>();
698 Map<String, Ref> remote = new HashMap<>(current);
699 try (RevWalk rw = new RevWalk(git);
700 TreeWalk tw = new TreeWalk(rw.getObjectReader())) {
701 tw.setRecursive(true);
702 tw.addTree(rw.parseCommit(committed).getTree());
703 while (tw.next()) {
704 if (tw.getRawMode(0) != TYPE_GITLINK
705 || tw.isPathSuffix(PEEL, 2)) {
706
707
708 continue;
709 }
710
711
712 String name = RefTree.refName(tw.getPathString());
713 Ref oldRef = remote.remove(name);
714 ObjectId oldId = getId(oldRef);
715 ObjectId newId = tw.getObjectId(0);
716 if (!AnyObjectId.equals(oldId, newId)) {
717 delta.add(new ReceiveCommand(oldId, newId, name));
718 }
719 }
720 }
721
722
723 for (Ref ref : remote.values()) {
724 if (canDelete(ref)) {
725 delta.add(new ReceiveCommand(
726 ref.getObjectId(), ObjectId.zeroId(),
727 ref.getName()));
728 }
729 }
730 return delta;
731 }
732
733 boolean canDelete(Ref ref) {
734 String name = ref.getName();
735 if (HEAD.equals(name)) {
736 return false;
737 }
738 if (name.startsWith(getSystem().getTxnNamespace())) {
739 return false;
740 }
741
742 return true;
743 }
744
745 @NonNull
746 static ObjectId getId(@Nullable Ref ref) {
747 if (ref != null) {
748 ObjectId id = ref.getObjectId();
749 if (id != null) {
750 return id;
751 }
752 }
753 return ObjectId.zeroId();
754 }
755 }