1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.ketch;
45
46 import static org.eclipse.jgit.internal.ketch.Proposal.State.RUNNING;
47
48 import java.io.IOException;
49 import java.time.Duration;
50 import java.util.ArrayList;
51 import java.util.Collections;
52 import java.util.HashMap;
53 import java.util.HashSet;
54 import java.util.List;
55 import java.util.Map;
56 import java.util.Set;
57 import java.util.concurrent.TimeoutException;
58 import java.util.stream.Collectors;
59
60 import org.eclipse.jgit.annotations.Nullable;
61 import org.eclipse.jgit.internal.storage.reftree.Command;
62 import org.eclipse.jgit.internal.storage.reftree.RefTree;
63 import org.eclipse.jgit.lib.CommitBuilder;
64 import org.eclipse.jgit.lib.ObjectId;
65 import org.eclipse.jgit.lib.ObjectInserter;
66 import org.eclipse.jgit.lib.PersonIdent;
67 import org.eclipse.jgit.lib.Ref;
68 import org.eclipse.jgit.lib.Repository;
69 import org.eclipse.jgit.revwalk.RevCommit;
70 import org.eclipse.jgit.revwalk.RevWalk;
71 import org.eclipse.jgit.transport.ReceiveCommand;
72 import org.eclipse.jgit.util.time.ProposedTimestamp;
73
74
75 class ProposalRound extends Round {
76 private final List<Proposal> todo;
77 private RefTree queuedTree;
78
79 ProposalRound(KetchLeader leader, LogIndex head, List<Proposal> todo,
80 @Nullable RefTree tree) {
81 super(leader, head);
82 this.todo = todo;
83
84 if (tree != null && canCombine(todo)) {
85 this.queuedTree = tree;
86 } else {
87 leader.roundHoldsReferenceToRefTree = false;
88 }
89 }
90
91 private static boolean canCombine(List<Proposal> todo) {
92 Proposal first = todo.get(0);
93 for (int i = 1; i < todo.size(); i++) {
94 if (!canCombine(first, todo.get(i))) {
95 return false;
96 }
97 }
98 return true;
99 }
100
101 private static boolean canCombine(Proposal a, Proposal b) {
102 String aMsg = nullToEmpty(a.getMessage());
103 String bMsg = nullToEmpty(b.getMessage());
104 return aMsg.equals(bMsg) && canCombine(a.getAuthor(), b.getAuthor());
105 }
106
107 private static String nullToEmpty(@Nullable String str) {
108 return str != null ? str : "";
109 }
110
111 private static boolean canCombine(@Nullable PersonIdent a,
112 @Nullable PersonIdent b) {
113 if (a != null && b != null) {
114
115
116
117 return a.getName().equals(b.getName())
118 && a.getEmailAddress().equals(b.getEmailAddress());
119 }
120
121
122 return a == null && b == null;
123 }
124
125 @Override
126 void start() throws IOException {
127 for (Proposal p : todo) {
128 p.notifyState(RUNNING);
129 }
130 try {
131 ObjectId id;
132 try (Repository git = leader.openRepository();
133 ProposedTimestamp ts = getSystem().getClock().propose()) {
134 id = insertProposals(git, ts);
135 blockUntil(ts);
136 }
137 runAsync(id);
138 } catch (NoOp e) {
139 for (Proposal p : todo) {
140 p.success();
141 }
142 leader.lock.lock();
143 try {
144 leader.nextRound();
145 } finally {
146 leader.lock.unlock();
147 }
148 } catch (IOException e) {
149 abort();
150 throw e;
151 }
152 }
153
154 private ObjectId insertProposals(Repository git, ProposedTimestamp ts)
155 throws IOException, NoOp {
156 ObjectId id;
157 try (ObjectInserter inserter = git.newObjectInserter()) {
158
159
160 if (queuedTree != null) {
161 id = insertSingleProposal(git, ts, inserter);
162 } else {
163 id = insertMultiProposal(git, ts, inserter);
164 }
165
166 stageCommands = makeStageList(git, inserter);
167 inserter.flush();
168 }
169 return id;
170 }
171
172 private ObjectId insertSingleProposal(Repository git, ProposedTimestamp ts,
173 ObjectInserter inserter) throws IOException, NoOp {
174
175 ObjectId treeId = queuedTree.writeTree(inserter);
176 queuedTree = null;
177 leader.roundHoldsReferenceToRefTree = false;
178
179 if (!ObjectId.zeroId().equals(acceptedOldIndex)) {
180 try (RevWalk rw = new RevWalk(git)) {
181 RevCommit c = rw.parseCommit(acceptedOldIndex);
182 if (treeId.equals(c.getTree())) {
183 throw new NoOp();
184 }
185 }
186 }
187
188 Proposal p = todo.get(0);
189 CommitBuilder b = new CommitBuilder();
190 b.setTreeId(treeId);
191 if (!ObjectId.zeroId().equals(acceptedOldIndex)) {
192 b.setParentId(acceptedOldIndex);
193 }
194 b.setCommitter(leader.getSystem().newCommitter(ts));
195 b.setAuthor(p.getAuthor() != null ? p.getAuthor() : b.getCommitter());
196 b.setMessage(message(p));
197 return inserter.insert(b);
198 }
199
200 private ObjectId insertMultiProposal(Repository git, ProposedTimestamp ts,
201 ObjectInserter inserter) throws IOException, NoOp {
202
203
204
205 ObjectId lastIndex = acceptedOldIndex;
206 ObjectId oldTreeId;
207 RefTree tree;
208 if (ObjectId.zeroId().equals(lastIndex)) {
209 oldTreeId = ObjectId.zeroId();
210 tree = RefTree.newEmptyTree();
211 } else {
212 try (RevWalk rw = new RevWalk(git)) {
213 RevCommit c = rw.parseCommit(lastIndex);
214 oldTreeId = c.getTree();
215 tree = RefTree.read(rw.getObjectReader(), c.getTree());
216 }
217 }
218
219 PersonIdent committer = leader.getSystem().newCommitter(ts);
220 for (Proposal p : todo) {
221 if (!tree.apply(p.getCommands())) {
222
223
224
225 throw new IOException(
226 KetchText.get().queuedProposalFailedToApply);
227 }
228
229 ObjectId treeId = tree.writeTree(inserter);
230 if (treeId.equals(oldTreeId)) {
231 continue;
232 }
233
234 CommitBuilder b = new CommitBuilder();
235 b.setTreeId(treeId);
236 if (!ObjectId.zeroId().equals(lastIndex)) {
237 b.setParentId(lastIndex);
238 }
239 b.setAuthor(p.getAuthor() != null ? p.getAuthor() : committer);
240 b.setCommitter(committer);
241 b.setMessage(message(p));
242 lastIndex = inserter.insert(b);
243 }
244 if (lastIndex.equals(acceptedOldIndex)) {
245 throw new NoOp();
246 }
247 return lastIndex;
248 }
249
250 private String message(Proposal p) {
251 StringBuilder m = new StringBuilder();
252 String msg = p.getMessage();
253 if (msg != null && !msg.isEmpty()) {
254 m.append(msg);
255 while (m.length() < 2 || m.charAt(m.length() - 2) != '\n'
256 || m.charAt(m.length() - 1) != '\n') {
257 m.append('\n');
258 }
259 }
260 m.append(KetchConstants.TERM.getName())
261 .append(": ")
262 .append(leader.getTerm());
263 return m.toString();
264 }
265
266 void abort() {
267 for (Proposal p : todo) {
268 p.abort();
269 }
270 }
271
272 @Override
273 void success() {
274 for (Proposal p : todo) {
275 p.success();
276 }
277 }
278
279 private List<ReceiveCommand> makeStageList(Repository git,
280 ObjectInserter inserter) throws IOException {
281
282
283
284 Map<String, ObjectId> byRef = new HashMap<>();
285 for (Proposal p : todo) {
286 for (Command c : p.getCommands()) {
287 Ref n = c.getNewRef();
288 if (n != null && !n.isSymbolic()) {
289 byRef.put(n.getName(), n.getObjectId());
290 }
291 }
292 }
293 if (byRef.isEmpty()) {
294 return Collections.emptyList();
295 }
296
297 Set<ObjectId> newObjs = new HashSet<>(byRef.values());
298 StageBuilder b = new StageBuilder(
299 leader.getSystem().getTxnStage(),
300 acceptedNewIndex);
301 return b.makeStageList(newObjs, git, inserter);
302 }
303
304 private void blockUntil(ProposedTimestamp ts)
305 throws TimeIsUncertainException {
306 List<ProposedTimestamp> times = todo.stream()
307 .flatMap(p -> p.getProposedTimestamps().stream())
308 .collect(Collectors.toCollection(ArrayList::new));
309 times.add(ts);
310
311 try {
312 Duration maxWait = getSystem().getMaxWaitForMonotonicClock();
313 ProposedTimestamp.blockUntil(times, maxWait);
314 } catch (InterruptedException | TimeoutException e) {
315 throw new TimeIsUncertainException(e);
316 }
317 }
318
319 private static class NoOp extends Exception {
320 private static final long serialVersionUID = 1L;
321 }
322 }