1
2
3
4
5
6
7
8
9
10
11 package org.eclipse.jgit.internal.ketch;
12
13 import static org.eclipse.jgit.internal.ketch.KetchLeader.State.CANDIDATE;
14 import static org.eclipse.jgit.internal.ketch.KetchLeader.State.LEADER;
15 import static org.eclipse.jgit.internal.ketch.KetchLeader.State.SHUTDOWN;
16 import static org.eclipse.jgit.internal.ketch.KetchReplica.Participation.FOLLOWER_ONLY;
17 import static org.eclipse.jgit.internal.ketch.Proposal.State.QUEUED;
18
19 import java.io.IOException;
20 import java.text.MessageFormat;
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.concurrent.locks.Lock;
26 import java.util.concurrent.locks.ReentrantLock;
27
28 import org.eclipse.jgit.internal.storage.reftree.RefTree;
29 import org.eclipse.jgit.lib.ObjectId;
30 import org.eclipse.jgit.lib.Repository;
31 import org.eclipse.jgit.revwalk.RevCommit;
32 import org.eclipse.jgit.revwalk.RevWalk;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92 public abstract class KetchLeader {
93 private static final Logger log = LoggerFactory.getLogger(KetchLeader.class);
94
95
96 public enum State {
97
98 CANDIDATE,
99
100
101 LEADER,
102
103
104 DEPOSED,
105
106
107 SHUTDOWN;
108 }
109
110 private final KetchSystem system;
111
112
113 private KetchReplica[] voters;
114 private KetchReplica[] followers;
115 private LocalReplica self;
116
117
118
119
120
121
122
123 final Lock lock;
124
125 private State state = CANDIDATE;
126
127
128 private long term;
129
130
131
132
133
134
135
136
137 private final List<Proposal> queued;
138
139
140
141
142
143
144
145
146
147 private RefTree refTree;
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 volatile boolean roundHoldsReferenceToRefTree;
167
168
169 private LogIndex headIndex;
170
171
172 private LogIndex committedIndex;
173
174
175
176
177
178
179
180 private boolean idle;
181
182
183 private Round runningRound;
184
185
186
187
188
189
190
191 protected KetchLeader(KetchSystem system) {
192 this.system = system;
193 this.lock = new ReentrantLock(true );
194 this.queued = new ArrayList<>(4);
195 this.idle = true;
196 }
197
198
199 KetchSystem getSystem() {
200 return system;
201 }
202
203
204
205
206
207
208
209
210
211
212
213 public void setReplicas(Collection<KetchReplica> replicas) {
214 List<KetchReplica> v = new ArrayList<>(5);
215 List<KetchReplica> f = new ArrayList<>(5);
216 for (KetchReplica r : replicas) {
217 switch (r.getParticipation()) {
218 case FULL:
219 v.add(r);
220 break;
221
222 case FOLLOWER_ONLY:
223 f.add(r);
224 break;
225 }
226 }
227
228 Collection<Integer> validVoters = validVoterCounts();
229 if (!validVoters.contains(Integer.valueOf(v.size()))) {
230 throw new IllegalArgumentException(MessageFormat.format(
231 KetchText.get().unsupportedVoterCount,
232 Integer.valueOf(v.size()),
233 validVoters));
234 }
235
236 LocalReplica me = findLocal(v);
237 if (me == null) {
238 throw new IllegalArgumentException(
239 KetchText.get().localReplicaRequired);
240 }
241
242 lock.lock();
243 try {
244 voters = v.toArray(new KetchReplica[0]);
245 followers = f.toArray(new KetchReplica[0]);
246 self = me;
247 } finally {
248 lock.unlock();
249 }
250 }
251
252 private static Collection<Integer> validVoterCounts() {
253 @SuppressWarnings("boxing")
254 Integer[] valid = {
255
256 1, 3, 5, 7, 9 };
257 return Arrays.asList(valid);
258 }
259
260 private static LocalReplica findLocal(Collection<KetchReplica> voters) {
261 for (KetchReplica r : voters) {
262 if (r instanceof LocalReplica) {
263 return (LocalReplica) r;
264 }
265 }
266 return null;
267 }
268
269
270
271
272
273
274
275
276
277
278 protected abstract Repository openRepository() throws IOException;
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301 public void queueProposal(Proposal proposal)
302 throws InterruptedException, IOException {
303 try {
304 lock.lockInterruptibly();
305 } catch (InterruptedException e) {
306 proposal.abort();
307 throw e;
308 }
309 try {
310 if (refTree == null) {
311 initialize();
312 for (Proposal p : queued) {
313 refTree.apply(p.getCommands());
314 }
315 } else if (roundHoldsReferenceToRefTree) {
316 refTree = refTree.copy();
317 roundHoldsReferenceToRefTree = false;
318 }
319
320 if (!refTree.apply(proposal.getCommands())) {
321
322 proposal.abort();
323 return;
324 }
325
326 queued.add(proposal);
327 proposal.notifyState(QUEUED);
328
329 if (idle) {
330 scheduleLeader();
331 }
332 } finally {
333 lock.unlock();
334 }
335 }
336
337 private void initialize() throws IOException {
338 try (Repository git = openRepository(); RevWalklk/RevWalk.html#RevWalk">RevWalk rw = new RevWalk(git)) {
339 self.initialize(git);
340
341 ObjectId accepted = self.getTxnAccepted();
342 if (!ObjectId.zeroId().equals(accepted)) {
343 RevCommit c = rw.parseCommit(accepted);
344 headIndex = LogIndex.unknown(accepted);
345 refTree = RefTree.read(rw.getObjectReader(), c.getTree());
346 } else {
347 headIndex = LogIndex.unknown(ObjectId.zeroId());
348 refTree = RefTree.newEmptyTree();
349 }
350 }
351 }
352
353 private void scheduleLeader() {
354 idle = false;
355 system.getExecutor().execute(this::runLeader);
356 }
357
358 private void runLeader() {
359 Round round;
360 lock.lock();
361 try {
362 switch (state) {
363 case CANDIDATE:
364 round = new ElectionRound(this, headIndex);
365 break;
366
367 case LEADER:
368 round = newProposalRound();
369 break;
370
371 case DEPOSED:
372 case SHUTDOWN:
373 default:
374 log.warn("Leader cannot run {}", state);
375
376 return;
377 }
378 } finally {
379 lock.unlock();
380 }
381
382 try {
383 round.start();
384 } catch (IOException e) {
385
386 log.error(KetchText.get().leaderFailedToStore, e);
387 lock.lock();
388 try {
389 nextRound();
390 } finally {
391 lock.unlock();
392 }
393 }
394 }
395
396 private ProposalRound newProposalRound() {
397 List<Proposal> todo = new ArrayList<>(queued);
398 queued.clear();
399 roundHoldsReferenceToRefTree = true;
400 return new ProposalRound(this, headIndex, todo, refTree);
401 }
402
403
404 long getTerm() {
405 return term;
406 }
407
408
409 LogIndex getHead() {
410 return headIndex;
411 }
412
413
414
415
416 LogIndex getCommitted() {
417 return committedIndex;
418 }
419
420 boolean isIdle() {
421 return idle;
422 }
423
424 void runAsync(Round round) {
425 lock.lock();
426 try {
427
428
429
430 headIndex = round.acceptedNewIndex;
431 runningRound = round;
432
433 for (KetchReplica replica : voters) {
434 replica.pushTxnAcceptedAsync(round);
435 }
436 for (KetchReplica replica : followers) {
437 replica.pushTxnAcceptedAsync(round);
438 }
439 } finally {
440 lock.unlock();
441 }
442 }
443
444
445
446
447
448
449
450
451
452 void onReplicaUpdate(KetchReplica replica) {
453 if (log.isDebugEnabled()) {
454 log.debug("Replica {} finished:\n{}",
455 replica.describeForLog(), snapshot());
456 }
457
458 if (replica.getParticipation() == FOLLOWER_ONLY) {
459
460 return;
461 } else if (runningRound == null) {
462
463 return;
464 }
465
466 assert headIndex.equals(runningRound.acceptedNewIndex);
467 int matching = 0;
468 for (KetchReplica r : voters) {
469 if (r.hasAccepted(headIndex)) {
470 matching++;
471 }
472 }
473
474 int quorum = voters.length / 2 + 1;
475 boolean success = matching >= quorum;
476 if (!success) {
477 return;
478 }
479
480 switch (state) {
481 case CANDIDATE:
482 term = ((ElectionRound) runningRound).getTerm();
483 state = LEADER;
484 if (log.isDebugEnabled()) {
485 log.debug("Won election, running term " + term);
486 }
487
488
489 case LEADER:
490 committedIndex = headIndex;
491 if (log.isDebugEnabled()) {
492 log.debug("Committed {} in term {}",
493 committedIndex.describeForLog(),
494 Long.valueOf(term));
495 }
496 nextRound();
497 commitAsync(replica);
498 notifySuccess(runningRound);
499 if (log.isDebugEnabled()) {
500 log.debug("Leader state:\n{}", snapshot());
501 }
502 break;
503
504 default:
505 log.debug("Leader ignoring replica while in {}", state);
506 break;
507 }
508 }
509
510 private void notifySuccess(Round round) {
511
512 lock.unlock();
513 try {
514 round.success();
515 } finally {
516 lock.lock();
517 }
518 }
519
520 private void commitAsync(KetchReplica caller) {
521 for (KetchReplica r : voters) {
522 if (r == caller) {
523 continue;
524 }
525 if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
526 r.pushCommitAsync(committedIndex);
527 }
528 }
529 for (KetchReplica r : followers) {
530 if (r == caller) {
531 continue;
532 }
533 if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
534 r.pushCommitAsync(committedIndex);
535 }
536 }
537 }
538
539
540 void nextRound() {
541 runningRound = null;
542
543 if (queued.isEmpty()) {
544 idle = true;
545 } else {
546
547
548
549 scheduleLeader();
550 }
551 }
552
553
554
555
556
557
558 public LeaderSnapshot snapshot() {
559 lock.lock();
560 try {
561 LeaderSnapshot s = new LeaderSnapshot();
562 s.state = state;
563 s.term = term;
564 s.headIndex = headIndex;
565 s.committedIndex = committedIndex;
566 s.idle = isIdle();
567 for (KetchReplica r : voters) {
568 s.replicas.add(r.snapshot());
569 }
570 for (KetchReplica r : followers) {
571 s.replicas.add(r.snapshot());
572 }
573 return s;
574 } finally {
575 lock.unlock();
576 }
577 }
578
579
580
581
582 public void shutdown() {
583 lock.lock();
584 try {
585 if (state != SHUTDOWN) {
586 state = SHUTDOWN;
587 for (KetchReplica r : voters) {
588 r.shutdown();
589 }
590 for (KetchReplica r : followers) {
591 r.shutdown();
592 }
593 }
594 } finally {
595 lock.unlock();
596 }
597 }
598
599
600 @Override
601 public String toString() {
602 return snapshot().toString();
603 }
604 }