1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.ketch;
45
46 import static org.eclipse.jgit.internal.ketch.KetchConstants.ACCEPTED;
47 import static org.eclipse.jgit.internal.ketch.KetchConstants.COMMITTED;
48 import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_KEY_TYPE;
49 import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_SECTION_KETCH;
50 import static org.eclipse.jgit.internal.ketch.KetchConstants.DEFAULT_TXN_NAMESPACE;
51 import static org.eclipse.jgit.internal.ketch.KetchConstants.STAGE;
52 import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_NAME;
53 import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_REMOTE;
54
55 import java.net.URISyntaxException;
56 import java.time.Duration;
57 import java.util.ArrayList;
58 import java.util.List;
59 import java.util.Random;
60 import java.util.concurrent.Executors;
61 import java.util.concurrent.ScheduledExecutorService;
62 import java.util.concurrent.ThreadFactory;
63 import java.util.concurrent.atomic.AtomicInteger;
64
65 import org.eclipse.jgit.annotations.Nullable;
66 import org.eclipse.jgit.lib.Config;
67 import org.eclipse.jgit.lib.PersonIdent;
68 import org.eclipse.jgit.lib.Repository;
69 import org.eclipse.jgit.transport.RemoteConfig;
70 import org.eclipse.jgit.transport.URIish;
71 import org.eclipse.jgit.util.time.MonotonicClock;
72 import org.eclipse.jgit.util.time.MonotonicSystemClock;
73 import org.eclipse.jgit.util.time.ProposedTimestamp;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77
78
79
80
81
82
83
84
85
86
87 public class KetchSystem {
88 private static final Random RNG = new Random();
89
90
91
92
93
94
95 public static ScheduledExecutorService defaultExecutor() {
96 return DefaultExecutorHolder.I;
97 }
98
99 private final ScheduledExecutorService executor;
100 private final MonotonicClock clock;
101 private final String txnNamespace;
102 private final String txnAccepted;
103 private final String txnCommitted;
104 private final String txnStage;
105
106
107
108
109 public KetchSystem() {
110 this(defaultExecutor(), new MonotonicSystemClock(), DEFAULT_TXN_NAMESPACE);
111 }
112
113
114
115
116
117
118
119
120
121
122
123
124
125 public KetchSystem(ScheduledExecutorService executor, MonotonicClock clock,
126 String txnNamespace) {
127 this.executor = executor;
128 this.clock = clock;
129 this.txnNamespace = txnNamespace;
130 this.txnAccepted = txnNamespace + ACCEPTED;
131 this.txnCommitted = txnNamespace + COMMITTED;
132 this.txnStage = txnNamespace + STAGE;
133 }
134
135
136
137
138
139
140 public ScheduledExecutorService getExecutor() {
141 return executor;
142 }
143
144
145
146
147
148
149 public MonotonicClock getClock() {
150 return clock;
151 }
152
153
154
155
156
157
158
159
160
161
162 public Duration getMaxWaitForMonotonicClock() {
163 return Duration.ofSeconds(5);
164 }
165
166
167
168
169
170
171
172
173
174 public boolean requireMonotonicLeaderElections() {
175 return false;
176 }
177
178
179
180
181
182
183 public String getTxnNamespace() {
184 return txnNamespace;
185 }
186
187
188
189
190
191
192 public String getTxnAccepted() {
193 return txnAccepted;
194 }
195
196
197
198
199
200
201 public String getTxnCommitted() {
202 return txnCommitted;
203 }
204
205
206
207
208
209
210 public String getTxnStage() {
211 return txnStage;
212 }
213
214
215
216
217
218
219
220
221 public PersonIdent newCommitter(ProposedTimestamp time) {
222 String name = "ketch";
223 String email = "ketch@system";
224 return new PersonIdent(name, email, time);
225 }
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248 @Nullable
249 public String newLeaderTag() {
250 int n = RNG.nextInt(1 << (6 * 4));
251 return String.format("%06x", Integer.valueOf(n));
252 }
253
254
255
256
257
258
259
260
261
262
263 public KetchLeader createLeader(Repository repo)
264 throws URISyntaxException {
265 KetchLeader leader = new KetchLeader(this) {
266 @Override
267 protected Repository openRepository() {
268 repo.incrementOpen();
269 return repo;
270 }
271 };
272 leader.setReplicas(createReplicas(leader, repo));
273 return leader;
274 }
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289 protected List<KetchReplica> createReplicas(KetchLeader leader,
290 Repository repo) throws URISyntaxException {
291 List<KetchReplica> replicas = new ArrayList<>();
292 Config cfg = repo.getConfig();
293 String localName = getLocalName(cfg);
294 for (String name : cfg.getSubsections(CONFIG_KEY_REMOTE)) {
295 if (!hasParticipation(cfg, name)) {
296 continue;
297 }
298
299 ReplicaConfig kc = ReplicaConfig.newFromConfig(cfg, name);
300 if (name.equals(localName)) {
301 replicas.add(new LocalReplica(leader, name, kc));
302 continue;
303 }
304
305 RemoteConfig rc = new RemoteConfig(cfg, name);
306 List<URIish> uris = rc.getPushURIs();
307 if (uris.isEmpty()) {
308 uris = rc.getURIs();
309 }
310 for (URIish uri : uris) {
311 String n = uris.size() == 1 ? name : uri.getHost();
312 replicas.add(new RemoteGitReplica(leader, n, uri, kc, rc));
313 }
314 }
315 return replicas;
316 }
317
318 private static boolean hasParticipation(Config cfg, String name) {
319 return cfg.getString(CONFIG_KEY_REMOTE, name, CONFIG_KEY_TYPE) != null;
320 }
321
322 private static String getLocalName(Config cfg) {
323 return cfg.getString(CONFIG_SECTION_KETCH, null, CONFIG_KEY_NAME);
324 }
325
326 static class DefaultExecutorHolder {
327 private static final Logger log = LoggerFactory.getLogger(KetchSystem.class);
328 static final ScheduledExecutorService I = create();
329
330 private static ScheduledExecutorService create() {
331 int cores = Runtime.getRuntime().availableProcessors();
332 int threads = Math.max(5, cores);
333 log.info("Using {} threads", Integer.valueOf(threads));
334 return Executors.newScheduledThreadPool(
335 threads,
336 new ThreadFactory() {
337 private final AtomicInteger threadCnt = new AtomicInteger();
338
339 @Override
340 public Thread newThread(Runnable r) {
341 int id = threadCnt.incrementAndGet();
342 Thread thr = new Thread(r);
343 thr.setName("KetchExecutor-" + id);
344 return thr;
345 }
346 });
347 }
348
349 private DefaultExecutorHolder() {
350 }
351 }
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366 static long delay(long last, long min, long max) {
367 long r = Math.max(0, last * 3 - min);
368 if (r > 0) {
369 int c = (int) Math.min(r + 1, Integer.MAX_VALUE);
370 r = RNG.nextInt(c);
371 }
372 return Math.max(Math.min(min + r, max), min);
373 }
374 }