1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.ketch;
45
46 import static org.eclipse.jgit.internal.ketch.KetchConstants.ACCEPTED;
47 import static org.eclipse.jgit.internal.ketch.KetchConstants.COMMITTED;
48 import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_KEY_TYPE;
49 import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_SECTION_KETCH;
50 import static org.eclipse.jgit.internal.ketch.KetchConstants.DEFAULT_TXN_NAMESPACE;
51 import static org.eclipse.jgit.internal.ketch.KetchConstants.STAGE;
52 import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_NAME;
53 import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_REMOTE;
54
55 import java.net.URISyntaxException;
56 import java.time.Duration;
57 import java.util.ArrayList;
58 import java.util.List;
59 import java.util.Random;
60 import java.util.concurrent.Executors;
61 import java.util.concurrent.ScheduledExecutorService;
62 import java.util.concurrent.ThreadFactory;
63 import java.util.concurrent.atomic.AtomicInteger;
64
65 import org.eclipse.jgit.annotations.Nullable;
66 import org.eclipse.jgit.lib.Config;
67 import org.eclipse.jgit.lib.PersonIdent;
68 import org.eclipse.jgit.lib.Repository;
69 import org.eclipse.jgit.transport.RemoteConfig;
70 import org.eclipse.jgit.transport.URIish;
71 import org.eclipse.jgit.util.time.MonotonicClock;
72 import org.eclipse.jgit.util.time.MonotonicSystemClock;
73 import org.eclipse.jgit.util.time.ProposedTimestamp;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77
78
79
80
81
82
83
84
85
86 public class KetchSystem {
87 private static final Random RNG = new Random();
88
89
90 public static ScheduledExecutorService defaultExecutor() {
91 return DefaultExecutorHolder.I;
92 }
93
94 private final ScheduledExecutorService executor;
95 private final MonotonicClock clock;
96 private final String txnNamespace;
97 private final String txnAccepted;
98 private final String txnCommitted;
99 private final String txnStage;
100
101
102 public KetchSystem() {
103 this(defaultExecutor(), new MonotonicSystemClock(), DEFAULT_TXN_NAMESPACE);
104 }
105
106
107
108
109
110
111
112
113
114
115
116
117
118 public KetchSystem(ScheduledExecutorService executor, MonotonicClock clock,
119 String txnNamespace) {
120 this.executor = executor;
121 this.clock = clock;
122 this.txnNamespace = txnNamespace;
123 this.txnAccepted = txnNamespace + ACCEPTED;
124 this.txnCommitted = txnNamespace + COMMITTED;
125 this.txnStage = txnNamespace + STAGE;
126 }
127
128
129 public ScheduledExecutorService getExecutor() {
130 return executor;
131 }
132
133
134 public MonotonicClock getClock() {
135 return clock;
136 }
137
138
139
140
141
142
143 public Duration getMaxWaitForMonotonicClock() {
144 return Duration.ofSeconds(5);
145 }
146
147
148
149
150
151 public boolean requireMonotonicLeaderElections() {
152 return false;
153 }
154
155
156
157
158
159
160 public String getTxnNamespace() {
161 return txnNamespace;
162 }
163
164
165 public String getTxnAccepted() {
166 return txnAccepted;
167 }
168
169
170 public String getTxnCommitted() {
171 return txnCommitted;
172 }
173
174
175 public String getTxnStage() {
176 return txnStage;
177 }
178
179
180
181
182
183
184 public PersonIdent newCommitter(ProposedTimestamp time) {
185 String name = "ketch";
186 String email = "ketch@system";
187 return new PersonIdent(name, email, time);
188 }
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211 @Nullable
212 public String newLeaderTag() {
213 int n = RNG.nextInt(1 << (6 * 4));
214 return String.format("%06x", Integer.valueOf(n));
215 }
216
217
218
219
220
221
222
223
224
225
226 public KetchLeader createLeader(final Repository repo)
227 throws URISyntaxException {
228 KetchLeader leader = new KetchLeader(this) {
229 @Override
230 protected Repository openRepository() {
231 repo.incrementOpen();
232 return repo;
233 }
234 };
235 leader.setReplicas(createReplicas(leader, repo));
236 return leader;
237 }
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252 protected List<KetchReplica> createReplicas(KetchLeader leader,
253 Repository repo) throws URISyntaxException {
254 List<KetchReplica> replicas = new ArrayList<>();
255 Config cfg = repo.getConfig();
256 String localName = getLocalName(cfg);
257 for (String name : cfg.getSubsections(CONFIG_KEY_REMOTE)) {
258 if (!hasParticipation(cfg, name)) {
259 continue;
260 }
261
262 ReplicaConfig kc = ReplicaConfig.newFromConfig(cfg, name);
263 if (name.equals(localName)) {
264 replicas.add(new LocalReplica(leader, name, kc));
265 continue;
266 }
267
268 RemoteConfig rc = new RemoteConfig(cfg, name);
269 List<URIish> uris = rc.getPushURIs();
270 if (uris.isEmpty()) {
271 uris = rc.getURIs();
272 }
273 for (URIish uri : uris) {
274 String n = uris.size() == 1 ? name : uri.getHost();
275 replicas.add(new RemoteGitReplica(leader, n, uri, kc, rc));
276 }
277 }
278 return replicas;
279 }
280
281 private static boolean hasParticipation(Config cfg, String name) {
282 return cfg.getString(CONFIG_KEY_REMOTE, name, CONFIG_KEY_TYPE) != null;
283 }
284
285 private static String getLocalName(Config cfg) {
286 return cfg.getString(CONFIG_SECTION_KETCH, null, CONFIG_KEY_NAME);
287 }
288
289 static class DefaultExecutorHolder {
290 private static final Logger log = LoggerFactory.getLogger(KetchSystem.class);
291 static final ScheduledExecutorService I = create();
292
293 private static ScheduledExecutorService create() {
294 int cores = Runtime.getRuntime().availableProcessors();
295 int threads = Math.max(5, cores);
296 log.info("Using {} threads", Integer.valueOf(threads));
297 return Executors.newScheduledThreadPool(
298 threads,
299 new ThreadFactory() {
300 private final AtomicInteger threadCnt = new AtomicInteger();
301
302 @Override
303 public Thread newThread(Runnable r) {
304 int id = threadCnt.incrementAndGet();
305 Thread thr = new Thread(r);
306 thr.setName("KetchExecutor-" + id);
307 return thr;
308 }
309 });
310 }
311
312 private DefaultExecutorHolder() {
313 }
314 }
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329 static long delay(long last, long min, long max) {
330 long r = Math.max(0, last * 3 - min);
331 if (r > 0) {
332 int c = (int) Math.min(r + 1, Integer.MAX_VALUE);
333 r = RNG.nextInt(c);
334 }
335 return Math.max(Math.min(min + r, max), min);
336 }
337 }