1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.ketch;
45
46 import static org.eclipse.jgit.internal.ketch.KetchLeader.State.CANDIDATE;
47 import static org.eclipse.jgit.internal.ketch.KetchLeader.State.LEADER;
48 import static org.eclipse.jgit.internal.ketch.KetchLeader.State.SHUTDOWN;
49 import static org.eclipse.jgit.internal.ketch.KetchReplica.Participation.FOLLOWER_ONLY;
50 import static org.eclipse.jgit.internal.ketch.Proposal.State.QUEUED;
51
52 import java.io.IOException;
53 import java.text.MessageFormat;
54 import java.util.ArrayList;
55 import java.util.Arrays;
56 import java.util.Collection;
57 import java.util.List;
58 import java.util.concurrent.locks.Lock;
59 import java.util.concurrent.locks.ReentrantLock;
60
61 import org.eclipse.jgit.internal.storage.reftree.RefTree;
62 import org.eclipse.jgit.lib.ObjectId;
63 import org.eclipse.jgit.lib.Repository;
64 import org.eclipse.jgit.revwalk.RevCommit;
65 import org.eclipse.jgit.revwalk.RevWalk;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 public abstract class KetchLeader {
116 private static final Logger log = LoggerFactory.getLogger(KetchLeader.class);
117
118
119 public static enum State {
120
121 CANDIDATE,
122
123
124 LEADER,
125
126
127 DEPOSED,
128
129
130 SHUTDOWN;
131 }
132
133 private final KetchSystem system;
134
135
136 private KetchReplica[] voters;
137 private KetchReplica[] followers;
138 private LocalReplica self;
139
140
141
142
143
144
145
146 final Lock lock;
147
148 private State state = CANDIDATE;
149
150
151 private long term;
152
153
154
155
156
157
158
159
160 private final List<Proposal> queued;
161
162
163
164
165
166
167
168
169
170 private RefTree refTree;
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189 volatile boolean roundHoldsReferenceToRefTree;
190
191
192 private LogIndex headIndex;
193
194
195 private LogIndex committedIndex;
196
197
198
199
200
201
202
203 private boolean idle;
204
205
206 private Round runningRound;
207
208
209
210
211
212
213
214 protected KetchLeader(KetchSystem system) {
215 this.system = system;
216 this.lock = new ReentrantLock(true );
217 this.queued = new ArrayList<>(4);
218 this.idle = true;
219 }
220
221
222 KetchSystem getSystem() {
223 return system;
224 }
225
226
227
228
229
230
231
232
233
234
235
236 public void setReplicas(Collection<KetchReplica> replicas) {
237 List<KetchReplica> v = new ArrayList<>(5);
238 List<KetchReplica> f = new ArrayList<>(5);
239 for (KetchReplica r : replicas) {
240 switch (r.getParticipation()) {
241 case FULL:
242 v.add(r);
243 break;
244
245 case FOLLOWER_ONLY:
246 f.add(r);
247 break;
248 }
249 }
250
251 Collection<Integer> validVoters = validVoterCounts();
252 if (!validVoters.contains(Integer.valueOf(v.size()))) {
253 throw new IllegalArgumentException(MessageFormat.format(
254 KetchText.get().unsupportedVoterCount,
255 Integer.valueOf(v.size()),
256 validVoters));
257 }
258
259 LocalReplica me = findLocal(v);
260 if (me == null) {
261 throw new IllegalArgumentException(
262 KetchText.get().localReplicaRequired);
263 }
264
265 lock.lock();
266 try {
267 voters = v.toArray(new KetchReplica[v.size()]);
268 followers = f.toArray(new KetchReplica[f.size()]);
269 self = me;
270 } finally {
271 lock.unlock();
272 }
273 }
274
275 private static Collection<Integer> validVoterCounts() {
276 @SuppressWarnings("boxing")
277 Integer[] valid = {
278
279 1, 3, 5, 7, 9 };
280 return Arrays.asList(valid);
281 }
282
283 private static LocalReplica findLocal(Collection<KetchReplica> voters) {
284 for (KetchReplica r : voters) {
285 if (r instanceof LocalReplica) {
286 return (LocalReplica) r;
287 }
288 }
289 return null;
290 }
291
292
293
294
295
296
297
298
299
300
301 protected abstract Repository openRepository() throws IOException;
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323 public void queueProposal(Proposal proposal)
324 throws InterruptedException, IOException {
325 try {
326 lock.lockInterruptibly();
327 } catch (InterruptedException e) {
328 proposal.abort();
329 throw e;
330 }
331 try {
332 if (refTree == null) {
333 initialize();
334 for (Proposal p : queued) {
335 refTree.apply(p.getCommands());
336 }
337 } else if (roundHoldsReferenceToRefTree) {
338 refTree = refTree.copy();
339 roundHoldsReferenceToRefTree = false;
340 }
341
342 if (!refTree.apply(proposal.getCommands())) {
343
344 proposal.abort();
345 return;
346 }
347
348 queued.add(proposal);
349 proposal.notifyState(QUEUED);
350
351 if (idle) {
352 scheduleLeader();
353 }
354 } finally {
355 lock.unlock();
356 }
357 }
358
359 private void initialize() throws IOException {
360 try (Repository git = openRepository(); RevWalk rw = new RevWalk(git)) {
361 self.initialize(git);
362
363 ObjectId accepted = self.getTxnAccepted();
364 if (!ObjectId.zeroId().equals(accepted)) {
365 RevCommit c = rw.parseCommit(accepted);
366 headIndex = LogIndex.unknown(accepted);
367 refTree = RefTree.read(rw.getObjectReader(), c.getTree());
368 } else {
369 headIndex = LogIndex.unknown(ObjectId.zeroId());
370 refTree = RefTree.newEmptyTree();
371 }
372 }
373 }
374
375 private void scheduleLeader() {
376 idle = false;
377 system.getExecutor().execute(new Runnable() {
378 @Override
379 public void run() {
380 runLeader();
381 }
382 });
383 }
384
385 private void runLeader() {
386 Round round;
387 lock.lock();
388 try {
389 switch (state) {
390 case CANDIDATE:
391 round = new ElectionRound(this, headIndex);
392 break;
393
394 case LEADER:
395 round = newProposalRound();
396 break;
397
398 case DEPOSED:
399 case SHUTDOWN:
400 default:
401 log.warn("Leader cannot run {}", state);
402
403 return;
404 }
405 } finally {
406 lock.unlock();
407 }
408
409 try {
410 round.start();
411 } catch (IOException e) {
412
413 log.error(KetchText.get().leaderFailedToStore, e);
414 lock.lock();
415 try {
416 nextRound();
417 } finally {
418 lock.unlock();
419 }
420 }
421 }
422
423 private ProposalRound newProposalRound() {
424 List<Proposal> todo = new ArrayList<>(queued);
425 queued.clear();
426 roundHoldsReferenceToRefTree = true;
427 return new ProposalRound(this, headIndex, todo, refTree);
428 }
429
430
431 long getTerm() {
432 return term;
433 }
434
435
436 LogIndex getHead() {
437 return headIndex;
438 }
439
440
441
442
443 LogIndex getCommitted() {
444 return committedIndex;
445 }
446
447 boolean isIdle() {
448 return idle;
449 }
450
451 void runAsync(Round round) {
452 lock.lock();
453 try {
454
455
456
457 headIndex = round.acceptedNewIndex;
458 runningRound = round;
459
460 for (KetchReplica replica : voters) {
461 replica.pushTxnAcceptedAsync(round);
462 }
463 for (KetchReplica replica : followers) {
464 replica.pushTxnAcceptedAsync(round);
465 }
466 } finally {
467 lock.unlock();
468 }
469 }
470
471
472
473
474
475
476
477
478
479 void onReplicaUpdate(KetchReplica replica) {
480 if (log.isDebugEnabled()) {
481 log.debug("Replica {} finished:\n{}",
482 replica.describeForLog(), snapshot());
483 }
484
485 if (replica.getParticipation() == FOLLOWER_ONLY) {
486
487 return;
488 } else if (runningRound == null) {
489
490 return;
491 }
492
493 assert headIndex.equals(runningRound.acceptedNewIndex);
494 int matching = 0;
495 for (KetchReplica r : voters) {
496 if (r.hasAccepted(headIndex)) {
497 matching++;
498 }
499 }
500
501 int quorum = voters.length / 2 + 1;
502 boolean success = matching >= quorum;
503 if (!success) {
504 return;
505 }
506
507 switch (state) {
508 case CANDIDATE:
509 term = ((ElectionRound) runningRound).getTerm();
510 state = LEADER;
511 if (log.isDebugEnabled()) {
512 log.debug("Won election, running term " + term);
513 }
514
515
516 case LEADER:
517 committedIndex = headIndex;
518 if (log.isDebugEnabled()) {
519 log.debug("Committed {} in term {}",
520 committedIndex.describeForLog(),
521 Long.valueOf(term));
522 }
523 nextRound();
524 commitAsync(replica);
525 notifySuccess(runningRound);
526 if (log.isDebugEnabled()) {
527 log.debug("Leader state:\n{}", snapshot());
528 }
529 break;
530
531 default:
532 log.debug("Leader ignoring replica while in {}", state);
533 break;
534 }
535 }
536
537 private void notifySuccess(Round round) {
538
539 lock.unlock();
540 try {
541 round.success();
542 } finally {
543 lock.lock();
544 }
545 }
546
547 private void commitAsync(KetchReplica caller) {
548 for (KetchReplica r : voters) {
549 if (r == caller) {
550 continue;
551 }
552 if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
553 r.pushCommitAsync(committedIndex);
554 }
555 }
556 for (KetchReplica r : followers) {
557 if (r == caller) {
558 continue;
559 }
560 if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
561 r.pushCommitAsync(committedIndex);
562 }
563 }
564 }
565
566
567 void nextRound() {
568 runningRound = null;
569
570 if (queued.isEmpty()) {
571 idle = true;
572 } else {
573
574
575
576 scheduleLeader();
577 }
578 }
579
580
581 public LeaderSnapshot snapshot() {
582 lock.lock();
583 try {
584 LeaderSnapshot s = new LeaderSnapshot();
585 s.state = state;
586 s.term = term;
587 s.headIndex = headIndex;
588 s.committedIndex = committedIndex;
589 s.idle = isIdle();
590 for (KetchReplica r : voters) {
591 s.replicas.add(r.snapshot());
592 }
593 for (KetchReplica r : followers) {
594 s.replicas.add(r.snapshot());
595 }
596 return s;
597 } finally {
598 lock.unlock();
599 }
600 }
601
602
603 public void shutdown() {
604 lock.lock();
605 try {
606 if (state != SHUTDOWN) {
607 state = SHUTDOWN;
608 for (KetchReplica r : voters) {
609 r.shutdown();
610 }
611 for (KetchReplica r : followers) {
612 r.shutdown();
613 }
614 }
615 } finally {
616 lock.unlock();
617 }
618 }
619
620 @Override
621 public String toString() {
622 return snapshot().toString();
623 }
624 }