1 /*
2 * Copyright (C) 2016, Google Inc.
3 * and other copyright owners as documented in the project's IP log.
4 *
5 * This program and the accompanying materials are made available
6 * under the terms of the Eclipse Distribution License v1.0 which
7 * accompanies this distribution, is reproduced below, and is
8 * available at http://www.eclipse.org/org/documents/edl-v10.php
9 *
10 * All rights reserved.
11 *
12 * Redistribution and use in source and binary forms, with or
13 * without modification, are permitted provided that the following
14 * conditions are met:
15 *
16 * - Redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer.
18 *
19 * - Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials provided
22 * with the distribution.
23 *
24 * - Neither the name of the Eclipse Foundation, Inc. nor the
25 * names of its contributors may be used to endorse or promote
26 * products derived from this software without specific prior
27 * written permission.
28 *
29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42 */
43
44 package org.eclipse.jgit.internal.ketch;
45
46 import static org.eclipse.jgit.internal.ketch.Proposal.State.ABORTED;
47 import static org.eclipse.jgit.internal.ketch.Proposal.State.EXECUTED;
48 import static org.eclipse.jgit.internal.ketch.Proposal.State.NEW;
49 import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
50 import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
51
52 import java.io.IOException;
53 import java.util.ArrayList;
54 import java.util.Collection;
55 import java.util.Collections;
56 import java.util.List;
57 import java.util.concurrent.CopyOnWriteArrayList;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.atomic.AtomicReference;
60
61 import org.eclipse.jgit.annotations.Nullable;
62 import org.eclipse.jgit.errors.MissingObjectException;
63 import org.eclipse.jgit.internal.storage.reftree.Command;
64 import org.eclipse.jgit.lib.ObjectId;
65 import org.eclipse.jgit.lib.PersonIdent;
66 import org.eclipse.jgit.lib.Ref;
67 import org.eclipse.jgit.revwalk.RevWalk;
68 import org.eclipse.jgit.transport.PushCertificate;
69 import org.eclipse.jgit.transport.ReceiveCommand;
70 import org.eclipse.jgit.util.time.ProposedTimestamp;
71
72 /**
73 * A proposal to be applied in a Ketch system.
74 * <p>
75 * Pushing to a Ketch leader results in the leader making a proposal. The
76 * proposal includes the list of reference updates. The leader attempts to send
77 * the proposal to a quorum of replicas by pushing the proposal to a "staging"
78 * area under the {@code refs/txn/stage/} namespace. If the proposal succeeds
79 * then the changes are durable and the leader can commit the proposal.
80 * <p>
81 * Proposals are executed by
82 * {@link org.eclipse.jgit.internal.ketch.KetchLeader#queueProposal(Proposal)},
83 * which runs them asynchronously in the background. Proposals are thread-safe
84 * futures allowing callers to {@link #await()} for results or be notified by
85 * callback using {@link #addListener(Runnable)}.
86 */
87 public class Proposal {
88 /** Current state of the proposal. */
89 public enum State {
90 /** Proposal has not yet been given to a {@link KetchLeader}. */
91 NEW(false),
92
93 /**
94 * Proposal was validated and has entered the queue, but a round
95 * containing this proposal has not started yet.
96 */
97 QUEUED(false),
98
99 /** Round containing the proposal has begun and is in progress. */
100 RUNNING(false),
101
102 /**
103 * Proposal was executed through a round. Individual results from
104 * {@link Proposal#getCommands()}, {@link Command#getResult()} explain
105 * the success or failure outcome.
106 */
107 EXECUTED(true),
108
109 /** Proposal was aborted and did not reach consensus. */
110 ABORTED(true);
111
112 private final boolean done;
113
114 private State(boolean done) {
115 this.done = done;
116 }
117
118 /** @return true if this is a terminal state. */
119 public boolean isDone() {
120 return done;
121 }
122 }
123
124 private final List<Command> commands;
125 private PersonIdent author;
126 private String message;
127 private PushCertificate pushCert;
128
129 private List<ProposedTimestamp> timestamps;
130 private final List<Runnable> listeners = new CopyOnWriteArrayList<>();
131 private final AtomicReference<State> state = new AtomicReference<>(NEW);
132
133 /**
134 * Create a proposal from a list of Ketch commands.
135 *
136 * @param cmds
137 * prepared list of commands.
138 */
139 public Proposal(List<Command> cmds) {
140 commands = Collections.unmodifiableList(new ArrayList<>(cmds));
141 }
142
143 /**
144 * Create a proposal from a collection of received commands.
145 *
146 * @param rw
147 * walker to assist in preparing commands.
148 * @param cmds
149 * list of pending commands.
150 * @throws org.eclipse.jgit.errors.MissingObjectException
151 * newId of a command is not found locally.
152 * @throws java.io.IOException
153 * local objects cannot be accessed.
154 */
155 public Proposal(RevWalk rw, Collection<ReceiveCommand> cmds)
156 throws MissingObjectException, IOException {
157 commands = asCommandList(rw, cmds);
158 }
159
160 private static List<Command> asCommandList(RevWalk rw,
161 Collection<ReceiveCommand> cmds)
162 throws MissingObjectException, IOException {
163 List<Command> commands = new ArrayList<>(cmds.size());
164 for (ReceiveCommand cmd : cmds) {
165 commands.add(new Command(rw, cmd));
166 }
167 return Collections.unmodifiableList(commands);
168 }
169
170 /**
171 * Get commands from this proposal.
172 *
173 * @return commands from this proposal.
174 */
175 public Collection<Command> getCommands() {
176 return commands;
177 }
178
179 /**
180 * Get optional author of the proposal.
181 *
182 * @return optional author of the proposal.
183 */
184 @Nullable
185 public PersonIdent getAuthor() {
186 return author;
187 }
188
189 /**
190 * Set the author for the proposal.
191 *
192 * @param who
193 * optional identity of the author of the proposal.
194 * @return {@code this}
195 */
196 public Proposal setAuthor(@Nullable PersonIdent who) {
197 author = who;
198 return this;
199 }
200
201 /**
202 * Get optional message for the commit log of the RefTree.
203 *
204 * @return optional message for the commit log of the RefTree.
205 */
206 @Nullable
207 public String getMessage() {
208 return message;
209 }
210
211 /**
212 * Set the message to appear in the commit log of the RefTree.
213 *
214 * @param msg
215 * message text for the commit.
216 * @return {@code this}
217 */
218 public Proposal setMessage(@Nullable String msg) {
219 message = msg != null && !msg.isEmpty() ? msg : null;
220 return this;
221 }
222
223 /**
224 * Get optional certificate signing the references.
225 *
226 * @return optional certificate signing the references.
227 */
228 @Nullable
229 public PushCertificate getPushCertificate() {
230 return pushCert;
231 }
232
233 /**
234 * Set the push certificate signing the references.
235 *
236 * @param cert
237 * certificate, may be null.
238 * @return {@code this}
239 */
240 public Proposal setPushCertificate(@Nullable PushCertificate cert) {
241 pushCert = cert;
242 return this;
243 }
244
245 /**
246 * Get timestamps that Ketch must block for.
247 *
248 * @return timestamps that Ketch must block for. These may have been used as
249 * commit times inside the objects involved in the proposal.
250 */
251 public List<ProposedTimestamp> getProposedTimestamps() {
252 if (timestamps != null) {
253 return timestamps;
254 }
255 return Collections.emptyList();
256 }
257
258 /**
259 * Request the proposal to wait for the affected timestamps to resolve.
260 *
261 * @param ts
262 * a {@link org.eclipse.jgit.util.time.ProposedTimestamp} object.
263 * @return {@code this}.
264 */
265 public Proposal addProposedTimestamp(ProposedTimestamp ts) {
266 if (timestamps == null) {
267 timestamps = new ArrayList<>(4);
268 }
269 timestamps.add(ts);
270 return this;
271 }
272
273 /**
274 * Add a callback to be invoked when the proposal is done.
275 * <p>
276 * A proposal is done when it has entered either
277 * {@link org.eclipse.jgit.internal.ketch.Proposal.State#EXECUTED} or
278 * {@link org.eclipse.jgit.internal.ketch.Proposal.State#ABORTED} state. If
279 * the proposal is already done {@code callback.run()} is immediately
280 * invoked on the caller's thread.
281 *
282 * @param callback
283 * method to run after the proposal is done. The callback may be
284 * run on a Ketch system thread and should be completed quickly.
285 */
286 public void addListener(Runnable callback) {
287 boolean runNow = false;
288 synchronized (state) {
289 if (state.get().isDone()) {
290 runNow = true;
291 } else {
292 listeners.add(callback);
293 }
294 }
295 if (runNow) {
296 callback.run();
297 }
298 }
299
300 /** Set command result as OK. */
301 void success() {
302 for (Command c : commands) {
303 if (c.getResult() == NOT_ATTEMPTED) {
304 c.setResult(OK);
305 }
306 }
307 notifyState(EXECUTED);
308 }
309
310 /** Mark commands as "transaction aborted". */
311 void abort() {
312 Command.abort(commands, null);
313 notifyState(ABORTED);
314 }
315
316 /**
317 * Read the current state of the proposal.
318 *
319 * @return read the current state of the proposal.
320 */
321 public State getState() {
322 return state.get();
323 }
324
325 /**
326 * Whether the proposal was attempted
327 *
328 * @return {@code true} if the proposal was attempted. A true value does not
329 * mean consensus was reached, only that the proposal was considered
330 * and will not be making any more progress beyond its current
331 * state.
332 */
333 public boolean isDone() {
334 return state.get().isDone();
335 }
336
337 /**
338 * Wait for the proposal to be attempted and {@link #isDone()} to be true.
339 *
340 * @throws java.lang.InterruptedException
341 * caller was interrupted before proposal executed.
342 */
343 public void await() throws InterruptedException {
344 synchronized (state) {
345 while (!state.get().isDone()) {
346 state.wait();
347 }
348 }
349 }
350
351 /**
352 * Wait for the proposal to be attempted and {@link #isDone()} to be true.
353 *
354 * @param wait
355 * how long to wait.
356 * @param unit
357 * unit describing the wait time.
358 * @return true if the proposal is done; false if the method timed out.
359 * @throws java.lang.InterruptedException
360 * caller was interrupted before proposal executed.
361 */
362 public boolean await(long wait, TimeUnit unit) throws InterruptedException {
363 synchronized (state) {
364 if (state.get().isDone()) {
365 return true;
366 }
367 state.wait(unit.toMillis(wait));
368 return state.get().isDone();
369 }
370 }
371
372 /**
373 * Wait for the proposal to exit a state.
374 *
375 * @param notIn
376 * state the proposal should not be in to return.
377 * @param wait
378 * how long to wait.
379 * @param unit
380 * unit describing the wait time.
381 * @return true if the proposal exited the state; false on time out.
382 * @throws java.lang.InterruptedException
383 * caller was interrupted before proposal executed.
384 */
385 public boolean awaitStateChange(State notIn, long wait, TimeUnit unit)
386 throws InterruptedException {
387 synchronized (state) {
388 if (state.get() != notIn) {
389 return true;
390 }
391 state.wait(unit.toMillis(wait));
392 return state.get() != notIn;
393 }
394 }
395
396 void notifyState(State s) {
397 synchronized (state) {
398 state.set(s);
399 state.notifyAll();
400 }
401 if (s.isDone()) {
402 for (Runnable callback : listeners) {
403 callback.run();
404 }
405 listeners.clear();
406 }
407 }
408
409 /** {@inheritDoc} */
410 @Override
411 public String toString() {
412 StringBuilder s = new StringBuilder();
413 s.append("Ketch Proposal {\n"); //$NON-NLS-1$
414 s.append(" ").append(state.get()).append('\n'); //$NON-NLS-1$
415 if (author != null) {
416 s.append(" author ").append(author).append('\n'); //$NON-NLS-1$
417 }
418 if (message != null) {
419 s.append(" message ").append(message).append('\n'); //$NON-NLS-1$
420 }
421 for (Command c : commands) {
422 s.append(" "); //$NON-NLS-1$
423 format(s, c.getOldRef(), "CREATE"); //$NON-NLS-1$
424 s.append(' ');
425 format(s, c.getNewRef(), "DELETE"); //$NON-NLS-1$
426 s.append(' ').append(c.getRefName());
427 if (c.getResult() != ReceiveCommand.Result.NOT_ATTEMPTED) {
428 s.append(' ').append(c.getResult()); // $NON-NLS-1$
429 }
430 s.append('\n');
431 }
432 s.append('}');
433 return s.toString();
434 }
435
436 private static void format(StringBuilder s, @Nullable Ref r, String n) {
437 if (r == null) {
438 s.append(n);
439 } else if (r.isSymbolic()) {
440 s.append(r.getTarget().getName());
441 } else {
442 ObjectId id = r.getObjectId();
443 if (id != null) {
444 s.append(id.abbreviate(8).name());
445 }
446 }
447 }
448 }