View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.ketch;
45  
46  import static org.eclipse.jgit.internal.ketch.Proposal.State.ABORTED;
47  import static org.eclipse.jgit.internal.ketch.Proposal.State.EXECUTED;
48  import static org.eclipse.jgit.internal.ketch.Proposal.State.NEW;
49  import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
50  import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
51  
52  import java.io.IOException;
53  import java.util.ArrayList;
54  import java.util.Collection;
55  import java.util.Collections;
56  import java.util.List;
57  import java.util.concurrent.CopyOnWriteArrayList;
58  import java.util.concurrent.TimeUnit;
59  import java.util.concurrent.atomic.AtomicReference;
60  
61  import org.eclipse.jgit.annotations.Nullable;
62  import org.eclipse.jgit.errors.MissingObjectException;
63  import org.eclipse.jgit.internal.storage.reftree.Command;
64  import org.eclipse.jgit.lib.ObjectId;
65  import org.eclipse.jgit.lib.PersonIdent;
66  import org.eclipse.jgit.lib.Ref;
67  import org.eclipse.jgit.revwalk.RevWalk;
68  import org.eclipse.jgit.transport.PushCertificate;
69  import org.eclipse.jgit.transport.ReceiveCommand;
70  import org.eclipse.jgit.util.time.ProposedTimestamp;
71  
72  /**
73   * A proposal to be applied in a Ketch system.
74   * <p>
75   * Pushing to a Ketch leader results in the leader making a proposal. The
76   * proposal includes the list of reference updates. The leader attempts to send
77   * the proposal to a quorum of replicas by pushing the proposal to a "staging"
78   * area under the {@code refs/txn/stage/} namespace. If the proposal succeeds
79   * then the changes are durable and the leader can commit the proposal.
80   * <p>
81   * Proposals are executed by {@link KetchLeader#queueProposal(Proposal)}, which
82   * runs them asynchronously in the background. Proposals are thread-safe futures
83   * allowing callers to {@link #await()} for results or be notified by callback
84   * using {@link #addListener(Runnable)}.
85   */
86  public class Proposal {
87  	/** Current state of the proposal. */
88  	public enum State {
89  		/** Proposal has not yet been given to a {@link KetchLeader}. */
90  		NEW(false),
91  
92  		/**
93  		 * Proposal was validated and has entered the queue, but a round
94  		 * containing this proposal has not started yet.
95  		 */
96  		QUEUED(false),
97  
98  		/** Round containing the proposal has begun and is in progress. */
99  		RUNNING(false),
100 
101 		/**
102 		 * Proposal was executed through a round. Individual results from
103 		 * {@link Proposal#getCommands()}, {@link Command#getResult()} explain
104 		 * the success or failure outcome.
105 		 */
106 		EXECUTED(true),
107 
108 		/** Proposal was aborted and did not reach consensus. */
109 		ABORTED(true);
110 
111 		private final boolean done;
112 
113 		private State(boolean done) {
114 			this.done = done;
115 		}
116 
117 		/** @return true if this is a terminal state. */
118 		public boolean isDone() {
119 			return done;
120 		}
121 	}
122 
123 	private final List<Command> commands;
124 	private PersonIdent author;
125 	private String message;
126 	private PushCertificate pushCert;
127 
128 	private List<ProposedTimestamp> timestamps;
129 	private final List<Runnable> listeners = new CopyOnWriteArrayList<>();
130 	private final AtomicReference<State> state = new AtomicReference<>(NEW);
131 
132 	/**
133 	 * Create a proposal from a list of Ketch commands.
134 	 *
135 	 * @param cmds
136 	 *            prepared list of commands.
137 	 */
138 	public Proposal(List<Command> cmds) {
139 		commands = Collections.unmodifiableList(new ArrayList<>(cmds));
140 	}
141 
142 	/**
143 	 * Create a proposal from a collection of received commands.
144 	 *
145 	 * @param rw
146 	 *            walker to assist in preparing commands.
147 	 * @param cmds
148 	 *            list of pending commands.
149 	 * @throws MissingObjectException
150 	 *             newId of a command is not found locally.
151 	 * @throws IOException
152 	 *             local objects cannot be accessed.
153 	 */
154 	public Proposal(RevWalk rw, Collection<ReceiveCommand> cmds)
155 			throws MissingObjectException, IOException {
156 		commands = asCommandList(rw, cmds);
157 	}
158 
159 	private static List<Command> asCommandList(RevWalk rw,
160 			Collection<ReceiveCommand> cmds)
161 					throws MissingObjectException, IOException {
162 		List<Command> commands = new ArrayList<>(cmds.size());
163 		for (ReceiveCommand cmd : cmds) {
164 			commands.add(new Command(rw, cmd));
165 		}
166 		return Collections.unmodifiableList(commands);
167 	}
168 
169 	/** @return commands from this proposal. */
170 	public Collection<Command> getCommands() {
171 		return commands;
172 	}
173 
174 	/** @return optional author of the proposal. */
175 	@Nullable
176 	public PersonIdent getAuthor() {
177 		return author;
178 	}
179 
180 	/**
181 	 * Set the author for the proposal.
182 	 *
183 	 * @param who
184 	 *            optional identity of the author of the proposal.
185 	 * @return {@code this}
186 	 */
187 	public Proposal setAuthor(@Nullable PersonIdent who) {
188 		author = who;
189 		return this;
190 	}
191 
192 	/** @return optional message for the commit log of the RefTree. */
193 	@Nullable
194 	public String getMessage() {
195 		return message;
196 	}
197 
198 	/**
199 	 * Set the message to appear in the commit log of the RefTree.
200 	 *
201 	 * @param msg
202 	 *            message text for the commit.
203 	 * @return {@code this}
204 	 */
205 	public Proposal setMessage(@Nullable String msg) {
206 		message = msg != null && !msg.isEmpty() ? msg : null;
207 		return this;
208 	}
209 
210 	/** @return optional certificate signing the references. */
211 	@Nullable
212 	public PushCertificate getPushCertificate() {
213 		return pushCert;
214 	}
215 
216 	/**
217 	 * Set the push certificate signing the references.
218 	 *
219 	 * @param cert
220 	 *            certificate, may be null.
221 	 * @return {@code this}
222 	 */
223 	public Proposal setPushCertificate(@Nullable PushCertificate cert) {
224 		pushCert = cert;
225 		return this;
226 	}
227 
228 	/**
229 	 * @return timestamps that Ketch must block for. These may have been used as
230 	 *         commit times inside the objects involved in the proposal.
231 	 */
232 	public List<ProposedTimestamp> getProposedTimestamps() {
233 		if (timestamps != null) {
234 			return timestamps;
235 		}
236 		return Collections.emptyList();
237 	}
238 
239 	/**
240 	 * Request the proposal to wait for the affected timestamps to resolve.
241 	 *
242 	 * @param ts
243 	 * @return {@code this}.
244 	 */
245 	public Proposal addProposedTimestamp(ProposedTimestamp ts) {
246 		if (timestamps == null) {
247 			timestamps = new ArrayList<>(4);
248 		}
249 		timestamps.add(ts);
250 		return this;
251 	}
252 
253 	/**
254 	 * Add a callback to be invoked when the proposal is done.
255 	 * <p>
256 	 * A proposal is done when it has entered either {@link State#EXECUTED} or
257 	 * {@link State#ABORTED} state. If the proposal is already done
258 	 * {@code callback.run()} is immediately invoked on the caller's thread.
259 	 *
260 	 * @param callback
261 	 *            method to run after the proposal is done. The callback may be
262 	 *            run on a Ketch system thread and should be completed quickly.
263 	 */
264 	public void addListener(Runnable callback) {
265 		boolean runNow = false;
266 		synchronized (state) {
267 			if (state.get().isDone()) {
268 				runNow = true;
269 			} else {
270 				listeners.add(callback);
271 			}
272 		}
273 		if (runNow) {
274 			callback.run();
275 		}
276 	}
277 
278 	/** Set command result as OK. */
279 	void success() {
280 		for (Command c : commands) {
281 			if (c.getResult() == NOT_ATTEMPTED) {
282 				c.setResult(OK);
283 			}
284 		}
285 		notifyState(EXECUTED);
286 	}
287 
288 	/** Mark commands as "transaction aborted". */
289 	void abort() {
290 		Command.abort(commands, null);
291 		notifyState(ABORTED);
292 	}
293 
294 	/** @return read the current state of the proposal. */
295 	public State getState() {
296 		return state.get();
297 	}
298 
299 	/**
300 	 * @return {@code true} if the proposal was attempted. A true value does not
301 	 *         mean consensus was reached, only that the proposal was considered
302 	 *         and will not be making any more progress beyond its current
303 	 *         state.
304 	 */
305 	public boolean isDone() {
306 		return state.get().isDone();
307 	}
308 
309 	/**
310 	 * Wait for the proposal to be attempted and {@link #isDone()} to be true.
311 	 *
312 	 * @throws InterruptedException
313 	 *             caller was interrupted before proposal executed.
314 	 */
315 	public void await() throws InterruptedException {
316 		synchronized (state) {
317 			while (!state.get().isDone()) {
318 				state.wait();
319 			}
320 		}
321 	}
322 
323 	/**
324 	 * Wait for the proposal to be attempted and {@link #isDone()} to be true.
325 	 *
326 	 * @param wait
327 	 *            how long to wait.
328 	 * @param unit
329 	 *            unit describing the wait time.
330 	 * @return true if the proposal is done; false if the method timed out.
331 	 * @throws InterruptedException
332 	 *             caller was interrupted before proposal executed.
333 	 */
334 	public boolean await(long wait, TimeUnit unit) throws InterruptedException {
335 		synchronized (state) {
336 			if (state.get().isDone()) {
337 				return true;
338 			}
339 			state.wait(unit.toMillis(wait));
340 			return state.get().isDone();
341 		}
342 	}
343 
344 	/**
345 	 * Wait for the proposal to exit a state.
346 	 *
347 	 * @param notIn
348 	 *            state the proposal should not be in to return.
349 	 * @param wait
350 	 *            how long to wait.
351 	 * @param unit
352 	 *            unit describing the wait time.
353 	 * @return true if the proposal exited the state; false on time out.
354 	 * @throws InterruptedException
355 	 *             caller was interrupted before proposal executed.
356 	 */
357 	public boolean awaitStateChange(State notIn, long wait, TimeUnit unit)
358 			throws InterruptedException {
359 		synchronized (state) {
360 			if (state.get() != notIn) {
361 				return true;
362 			}
363 			state.wait(unit.toMillis(wait));
364 			return state.get() != notIn;
365 		}
366 	}
367 
368 	void notifyState(State s) {
369 		synchronized (state) {
370 			state.set(s);
371 			state.notifyAll();
372 		}
373 		if (s.isDone()) {
374 			for (Runnable callback : listeners) {
375 				callback.run();
376 			}
377 			listeners.clear();
378 		}
379 	}
380 
381 	@Override
382 	public String toString() {
383 		StringBuilder s = new StringBuilder();
384 		s.append("Ketch Proposal {\n"); //$NON-NLS-1$
385 		s.append("  ").append(state.get()).append('\n'); //$NON-NLS-1$
386 		if (author != null) {
387 			s.append("  author ").append(author).append('\n'); //$NON-NLS-1$
388 		}
389 		if (message != null) {
390 			s.append("  message ").append(message).append('\n'); //$NON-NLS-1$
391 		}
392 		for (Command c : commands) {
393 			s.append("  "); //$NON-NLS-1$
394 			format(s, c.getOldRef(), "CREATE"); //$NON-NLS-1$
395 			s.append(' ');
396 			format(s, c.getNewRef(), "DELETE"); //$NON-NLS-1$
397 			s.append(' ').append(c.getRefName());
398 			if (c.getResult() != ReceiveCommand.Result.NOT_ATTEMPTED) {
399 				s.append(' ').append(c.getResult()); // $NON-NLS-1$
400 			}
401 			s.append('\n');
402 		}
403 		s.append('}');
404 		return s.toString();
405 	}
406 
407 	private static void format(StringBuilder s, @Nullable Ref r, String n) {
408 		if (r == null) {
409 			s.append(n);
410 		} else if (r.isSymbolic()) {
411 			s.append(r.getTarget().getName());
412 		} else {
413 			ObjectId id = r.getObjectId();
414 			if (id != null) {
415 				s.append(id.abbreviate(8).name());
416 			}
417 		}
418 	}
419 }