1 /*
2 * Copyright (C) 2016, Google Inc.
3 * and other copyright owners as documented in the project's IP log.
4 *
5 * This program and the accompanying materials are made available
6 * under the terms of the Eclipse Distribution License v1.0 which
7 * accompanies this distribution, is reproduced below, and is
8 * available at http://www.eclipse.org/org/documents/edl-v10.php
9 *
10 * All rights reserved.
11 *
12 * Redistribution and use in source and binary forms, with or
13 * without modification, are permitted provided that the following
14 * conditions are met:
15 *
16 * - Redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer.
18 *
19 * - Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials provided
22 * with the distribution.
23 *
24 * - Neither the name of the Eclipse Foundation, Inc. nor the
25 * names of its contributors may be used to endorse or promote
26 * products derived from this software without specific prior
27 * written permission.
28 *
29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42 */
43
44 package org.eclipse.jgit.internal.ketch;
45
46 import static org.eclipse.jgit.internal.ketch.KetchConstants.ACCEPTED;
47 import static org.eclipse.jgit.internal.ketch.KetchConstants.COMMITTED;
48 import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_KEY_TYPE;
49 import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_SECTION_KETCH;
50 import static org.eclipse.jgit.internal.ketch.KetchConstants.DEFAULT_TXN_NAMESPACE;
51 import static org.eclipse.jgit.internal.ketch.KetchConstants.STAGE;
52 import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_NAME;
53 import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_REMOTE;
54
55 import java.net.URISyntaxException;
56 import java.util.ArrayList;
57 import java.util.List;
58 import java.util.Random;
59 import java.util.concurrent.Executors;
60 import java.util.concurrent.ScheduledExecutorService;
61 import java.util.concurrent.ThreadFactory;
62 import java.util.concurrent.atomic.AtomicInteger;
63
64 import org.eclipse.jgit.annotations.Nullable;
65 import org.eclipse.jgit.lib.Config;
66 import org.eclipse.jgit.lib.PersonIdent;
67 import org.eclipse.jgit.lib.Repository;
68 import org.eclipse.jgit.transport.RemoteConfig;
69 import org.eclipse.jgit.transport.URIish;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72
73 /**
74 * Ketch system-wide configuration.
75 * <p>
76 * This class provides useful defaults for testing and small proof of concepts.
77 * Full scale installations are expected to subclass and override methods to
78 * provide consistent configuration across all managed repositories.
79 * <p>
80 * Servers should configure their own {@link ScheduledExecutorService}.
81 */
82 public class KetchSystem {
83 private static final Random RNG = new Random();
84
85 /** @return default executor, one thread per available processor. */
86 public static ScheduledExecutorService defaultExecutor() {
87 return DefaultExecutorHolder.I;
88 }
89
90 private final ScheduledExecutorService executor;
91 private final String txnNamespace;
92 private final String txnAccepted;
93 private final String txnCommitted;
94 private final String txnStage;
95
96 /** Create a default system with a thread pool of 1 thread per CPU. */
97 public KetchSystem() {
98 this(defaultExecutor(), DEFAULT_TXN_NAMESPACE);
99 }
100
101 /**
102 * Create a Ketch system with the provided executor service.
103 *
104 * @param executor
105 * thread pool to run background operations.
106 * @param txnNamespace
107 * reference namespace for the RefTree graph and associated
108 * transaction state. Must begin with {@code "refs/"} and end
109 * with {@code '/'}, for example {@code "refs/txn/"}.
110 */
111 public KetchSystem(ScheduledExecutorService executor, String txnNamespace) {
112 this.executor = executor;
113 this.txnNamespace = txnNamespace;
114 this.txnAccepted = txnNamespace + ACCEPTED;
115 this.txnCommitted = txnNamespace + COMMITTED;
116 this.txnStage = txnNamespace + STAGE;
117 }
118
119 /** @return executor to perform background operations. */
120 public ScheduledExecutorService getExecutor() {
121 return executor;
122 }
123
124 /**
125 * Get the namespace used for the RefTree graph and transaction management.
126 *
127 * @return reference namespace such as {@code "refs/txn/"}.
128 */
129 public String getTxnNamespace() {
130 return txnNamespace;
131 }
132
133 /** @return name of the accepted RefTree graph. */
134 public String getTxnAccepted() {
135 return txnAccepted;
136 }
137
138 /** @return name of the committed RefTree graph. */
139 public String getTxnCommitted() {
140 return txnCommitted;
141 }
142
143 /** @return prefix for staged objects, e.g. {@code "refs/txn/stage/"}. */
144 public String getTxnStage() {
145 return txnStage;
146 }
147
148 /** @return identity line for the committer header of a RefTreeGraph. */
149 public PersonIdent newCommitter() {
150 String name = "ketch"; //$NON-NLS-1$
151 String email = "ketch@system"; //$NON-NLS-1$
152 return new PersonIdent(name, email);
153 }
154
155 /**
156 * Construct a random tag to identify a candidate during leader election.
157 * <p>
158 * Multiple processes trying to elect themselves leaders at exactly the same
159 * time (rounded to seconds) using the same {@link #newCommitter()} identity
160 * strings, for the same term, may generate the same ObjectId for the
161 * election commit and falsely assume they have both won.
162 * <p>
163 * Candidates add this tag to their election ballot commit to disambiguate
164 * the election. The tag only needs to be unique for a given triplet of
165 * {@link #newCommitter()}, system time (rounded to seconds), and term. If
166 * every replica in the system uses a unique {@code newCommitter} (such as
167 * including the host name after the {@code "@"} in the email address) the
168 * tag could be the empty string.
169 * <p>
170 * The default implementation generates a few bytes of random data.
171 *
172 * @return unique tag; null or empty string if {@code newCommitter()} is
173 * sufficiently unique to identify the leader.
174 */
175 @Nullable
176 public String newLeaderTag() {
177 int n = RNG.nextInt(1 << (6 * 4));
178 return String.format("%06x", Integer.valueOf(n)); //$NON-NLS-1$
179 }
180
181 /**
182 * Construct the KetchLeader instance of a repository.
183 *
184 * @param repo
185 * local repository stored by the leader.
186 * @return leader instance.
187 * @throws URISyntaxException
188 * a follower configuration contains an unsupported URI.
189 */
190 public KetchLeader createLeader(final Repository repo)
191 throws URISyntaxException {
192 KetchLeader leader = new KetchLeader(this) {
193 @Override
194 protected Repository openRepository() {
195 repo.incrementOpen();
196 return repo;
197 }
198 };
199 leader.setReplicas(createReplicas(leader, repo));
200 return leader;
201 }
202
203 /**
204 * Get the collection of replicas for a repository.
205 * <p>
206 * The collection of replicas must include the local repository.
207 *
208 * @param leader
209 * the leader driving these replicas.
210 * @param repo
211 * repository to get the replicas of.
212 * @return collection of replicas for the specified repository.
213 * @throws URISyntaxException
214 * a configured URI is invalid.
215 */
216 protected List<KetchReplica> createReplicas(KetchLeader leader,
217 Repository repo) throws URISyntaxException {
218 List<KetchReplica> replicas = new ArrayList<>();
219 Config cfg = repo.getConfig();
220 String localName = getLocalName(cfg);
221 for (String name : cfg.getSubsections(CONFIG_KEY_REMOTE)) {
222 if (!hasParticipation(cfg, name)) {
223 continue;
224 }
225
226 ReplicaConfig kc = ReplicaConfig.newFromConfig(cfg, name);
227 if (name.equals(localName)) {
228 replicas.add(new LocalReplica(leader, name, kc));
229 continue;
230 }
231
232 RemoteConfig rc = new RemoteConfig(cfg, name);
233 List<URIish> uris = rc.getPushURIs();
234 if (uris.isEmpty()) {
235 uris = rc.getURIs();
236 }
237 for (URIish uri : uris) {
238 String n = uris.size() == 1 ? name : uri.getHost();
239 replicas.add(new RemoteGitReplica(leader, n, uri, kc, rc));
240 }
241 }
242 return replicas;
243 }
244
245 private static boolean hasParticipation(Config cfg, String name) {
246 return cfg.getString(CONFIG_KEY_REMOTE, name, CONFIG_KEY_TYPE) != null;
247 }
248
249 private static String getLocalName(Config cfg) {
250 return cfg.getString(CONFIG_SECTION_KETCH, null, CONFIG_KEY_NAME);
251 }
252
253 static class DefaultExecutorHolder {
254 private static final Logger log = LoggerFactory.getLogger(KetchSystem.class);
255 static final ScheduledExecutorService I = create();
256
257 private static ScheduledExecutorService create() {
258 int cores = Runtime.getRuntime().availableProcessors();
259 int threads = Math.max(5, cores);
260 log.info("Using {} threads", Integer.valueOf(threads)); //$NON-NLS-1$
261 return Executors.newScheduledThreadPool(
262 threads,
263 new ThreadFactory() {
264 private final AtomicInteger threadCnt = new AtomicInteger();
265
266 @Override
267 public Thread newThread(Runnable r) {
268 int id = threadCnt.incrementAndGet();
269 Thread thr = new Thread(r);
270 thr.setName("KetchExecutor-" + id); //$NON-NLS-1$
271 return thr;
272 }
273 });
274 }
275
276 private DefaultExecutorHolder() {
277 }
278 }
279
280 /**
281 * Compute a delay in a {@code min..max} interval with random jitter.
282 *
283 * @param last
284 * amount of delay waited before the last attempt. This is used
285 * to seed the next delay interval. Should be 0 if there was no
286 * prior delay.
287 * @param min
288 * shortest amount of allowable delay between attempts.
289 * @param max
290 * longest amount of allowable delay between attempts.
291 * @return new amount of delay to wait before the next attempt.
292 */
293 static long delay(long last, long min, long max) {
294 long r = Math.max(0, last * 3 - min);
295 if (r > 0) {
296 int c = (int) Math.min(r + 1, Integer.MAX_VALUE);
297 r = RNG.nextInt(c);
298 }
299 return Math.max(Math.min(min + r, max), min);
300 }
301 }