View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.ketch;
45  
46  import static org.eclipse.jgit.internal.ketch.KetchConstants.ACCEPTED;
47  import static org.eclipse.jgit.internal.ketch.KetchConstants.COMMITTED;
48  import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_KEY_TYPE;
49  import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_SECTION_KETCH;
50  import static org.eclipse.jgit.internal.ketch.KetchConstants.DEFAULT_TXN_NAMESPACE;
51  import static org.eclipse.jgit.internal.ketch.KetchConstants.STAGE;
52  import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_NAME;
53  import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_REMOTE;
54  
55  import java.net.URISyntaxException;
56  import java.util.ArrayList;
57  import java.util.List;
58  import java.util.Random;
59  import java.util.concurrent.Executors;
60  import java.util.concurrent.ScheduledExecutorService;
61  import java.util.concurrent.ThreadFactory;
62  import java.util.concurrent.atomic.AtomicInteger;
63  
64  import org.eclipse.jgit.annotations.Nullable;
65  import org.eclipse.jgit.lib.Config;
66  import org.eclipse.jgit.lib.PersonIdent;
67  import org.eclipse.jgit.lib.Repository;
68  import org.eclipse.jgit.transport.RemoteConfig;
69  import org.eclipse.jgit.transport.URIish;
70  import org.slf4j.Logger;
71  import org.slf4j.LoggerFactory;
72  
73  /**
74   * Ketch system-wide configuration.
75   * <p>
76   * This class provides useful defaults for testing and small proof of concepts.
77   * Full scale installations are expected to subclass and override methods to
78   * provide consistent configuration across all managed repositories.
79   * <p>
80   * Servers should configure their own {@link ScheduledExecutorService}.
81   */
82  public class KetchSystem {
83  	private static final Random RNG = new Random();
84  
85  	/** @return default executor, one thread per available processor. */
86  	public static ScheduledExecutorService defaultExecutor() {
87  		return DefaultExecutorHolder.I;
88  	}
89  
90  	private final ScheduledExecutorService executor;
91  	private final String txnNamespace;
92  	private final String txnAccepted;
93  	private final String txnCommitted;
94  	private final String txnStage;
95  
96  	/** Create a default system with a thread pool of 1 thread per CPU. */
97  	public KetchSystem() {
98  		this(defaultExecutor(), DEFAULT_TXN_NAMESPACE);
99  	}
100 
101 	/**
102 	 * Create a Ketch system with the provided executor service.
103 	 *
104 	 * @param executor
105 	 *            thread pool to run background operations.
106 	 * @param txnNamespace
107 	 *            reference namespace for the RefTree graph and associated
108 	 *            transaction state. Must begin with {@code "refs/"} and end
109 	 *            with {@code '/'}, for example {@code "refs/txn/"}.
110 	 */
111 	public KetchSystem(ScheduledExecutorService executor, String txnNamespace) {
112 		this.executor = executor;
113 		this.txnNamespace = txnNamespace;
114 		this.txnAccepted = txnNamespace + ACCEPTED;
115 		this.txnCommitted = txnNamespace + COMMITTED;
116 		this.txnStage = txnNamespace + STAGE;
117 	}
118 
119 	/** @return executor to perform background operations. */
120 	public ScheduledExecutorService getExecutor() {
121 		return executor;
122 	}
123 
124 	/**
125 	 * Get the namespace used for the RefTree graph and transaction management.
126 	 *
127 	 * @return reference namespace such as {@code "refs/txn/"}.
128 	 */
129 	public String getTxnNamespace() {
130 		return txnNamespace;
131 	}
132 
133 	/** @return name of the accepted RefTree graph. */
134 	public String getTxnAccepted() {
135 		return txnAccepted;
136 	}
137 
138 	/** @return name of the committed RefTree graph. */
139 	public String getTxnCommitted() {
140 		return txnCommitted;
141 	}
142 
143 	/** @return prefix for staged objects, e.g. {@code "refs/txn/stage/"}. */
144 	public String getTxnStage() {
145 		return txnStage;
146 	}
147 
148 	/** @return identity line for the committer header of a RefTreeGraph. */
149 	public PersonIdent newCommitter() {
150 		String name = "ketch"; //$NON-NLS-1$
151 		String email = "ketch@system"; //$NON-NLS-1$
152 		return new PersonIdent(name, email);
153 	}
154 
155 	/**
156 	 * Construct a random tag to identify a candidate during leader election.
157 	 * <p>
158 	 * Multiple processes trying to elect themselves leaders at exactly the same
159 	 * time (rounded to seconds) using the same {@link #newCommitter()} identity
160 	 * strings, for the same term, may generate the same ObjectId for the
161 	 * election commit and falsely assume they have both won.
162 	 * <p>
163 	 * Candidates add this tag to their election ballot commit to disambiguate
164 	 * the election. The tag only needs to be unique for a given triplet of
165 	 * {@link #newCommitter()}, system time (rounded to seconds), and term. If
166 	 * every replica in the system uses a unique {@code newCommitter} (such as
167 	 * including the host name after the {@code "@"} in the email address) the
168 	 * tag could be the empty string.
169 	 * <p>
170 	 * The default implementation generates a few bytes of random data.
171 	 *
172 	 * @return unique tag; null or empty string if {@code newCommitter()} is
173 	 *         sufficiently unique to identify the leader.
174 	 */
175 	@Nullable
176 	public String newLeaderTag() {
177 		int n = RNG.nextInt(1 << (6 * 4));
178 		return String.format("%06x", Integer.valueOf(n)); //$NON-NLS-1$
179 	}
180 
181 	/**
182 	 * Construct the KetchLeader instance of a repository.
183 	 *
184 	 * @param repo
185 	 *            local repository stored by the leader.
186 	 * @return leader instance.
187 	 * @throws URISyntaxException
188 	 *             a follower configuration contains an unsupported URI.
189 	 */
190 	public KetchLeader createLeader(final Repository repo)
191 			throws URISyntaxException {
192 		KetchLeader leader = new KetchLeader(this) {
193 			@Override
194 			protected Repository openRepository() {
195 				repo.incrementOpen();
196 				return repo;
197 			}
198 		};
199 		leader.setReplicas(createReplicas(leader, repo));
200 		return leader;
201 	}
202 
203 	/**
204 	 * Get the collection of replicas for a repository.
205 	 * <p>
206 	 * The collection of replicas must include the local repository.
207 	 *
208 	 * @param leader
209 	 *            the leader driving these replicas.
210 	 * @param repo
211 	 *            repository to get the replicas of.
212 	 * @return collection of replicas for the specified repository.
213 	 * @throws URISyntaxException
214 	 *             a configured URI is invalid.
215 	 */
216 	protected List<KetchReplica> createReplicas(KetchLeader leader,
217 			Repository repo) throws URISyntaxException {
218 		List<KetchReplica> replicas = new ArrayList<>();
219 		Config cfg = repo.getConfig();
220 		String localName = getLocalName(cfg);
221 		for (String name : cfg.getSubsections(CONFIG_KEY_REMOTE)) {
222 			if (!hasParticipation(cfg, name)) {
223 				continue;
224 			}
225 
226 			ReplicaConfig kc = ReplicaConfig.newFromConfig(cfg, name);
227 			if (name.equals(localName)) {
228 				replicas.add(new LocalReplica(leader, name, kc));
229 				continue;
230 			}
231 
232 			RemoteConfig rc = new RemoteConfig(cfg, name);
233 			List<URIish> uris = rc.getPushURIs();
234 			if (uris.isEmpty()) {
235 				uris = rc.getURIs();
236 			}
237 			for (URIish uri : uris) {
238 				String n = uris.size() == 1 ? name : uri.getHost();
239 				replicas.add(new RemoteGitReplica(leader, n, uri, kc, rc));
240 			}
241 		}
242 		return replicas;
243 	}
244 
245 	private static boolean hasParticipation(Config cfg, String name) {
246 		return cfg.getString(CONFIG_KEY_REMOTE, name, CONFIG_KEY_TYPE) != null;
247 	}
248 
249 	private static String getLocalName(Config cfg) {
250 		return cfg.getString(CONFIG_SECTION_KETCH, null, CONFIG_KEY_NAME);
251 	}
252 
253 	static class DefaultExecutorHolder {
254 		private static final Logger log = LoggerFactory.getLogger(KetchSystem.class);
255 		static final ScheduledExecutorService I = create();
256 
257 		private static ScheduledExecutorService create() {
258 			int cores = Runtime.getRuntime().availableProcessors();
259 			int threads = Math.max(5, cores);
260 			log.info("Using {} threads", Integer.valueOf(threads)); //$NON-NLS-1$
261 			return Executors.newScheduledThreadPool(
262 				threads,
263 				new ThreadFactory() {
264 					private final AtomicInteger threadCnt = new AtomicInteger();
265 
266 					@Override
267 					public Thread newThread(Runnable r) {
268 						int id = threadCnt.incrementAndGet();
269 						Thread thr = new Thread(r);
270 						thr.setName("KetchExecutor-" + id); //$NON-NLS-1$
271 						return thr;
272 					}
273 				});
274 		}
275 
276 		private DefaultExecutorHolder() {
277 		}
278 	}
279 
280 	/**
281 	 * Compute a delay in a {@code min..max} interval with random jitter.
282 	 *
283 	 * @param last
284 	 *            amount of delay waited before the last attempt. This is used
285 	 *            to seed the next delay interval. Should be 0 if there was no
286 	 *            prior delay.
287 	 * @param min
288 	 *            shortest amount of allowable delay between attempts.
289 	 * @param max
290 	 *            longest amount of allowable delay between attempts.
291 	 * @return new amount of delay to wait before the next attempt.
292 	 */
293 	static long delay(long last, long min, long max) {
294 		long r = Math.max(0, last * 3 - min);
295 		if (r > 0) {
296 			int c = (int) Math.min(r + 1, Integer.MAX_VALUE);
297 			r = RNG.nextInt(c);
298 		}
299 		return Math.max(Math.min(min + r, max), min);
300 	}
301 }