1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.ketch;
45
46 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod.ALL_REFS;
47 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod.TXN_COMMITTED;
48 import static org.eclipse.jgit.lib.RefDatabase.ALL;
49 import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
50 import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_OTHER_REASON;
51
52 import java.io.IOException;
53 import java.text.MessageFormat;
54 import java.util.ArrayList;
55 import java.util.Collection;
56 import java.util.List;
57 import java.util.Map;
58
59 import org.eclipse.jgit.internal.storage.reftree.RefTreeDatabase;
60 import org.eclipse.jgit.lib.BatchRefUpdate;
61 import org.eclipse.jgit.lib.NullProgressMonitor;
62 import org.eclipse.jgit.lib.Ref;
63 import org.eclipse.jgit.lib.RefDatabase;
64 import org.eclipse.jgit.lib.Repository;
65 import org.eclipse.jgit.revwalk.RevWalk;
66 import org.eclipse.jgit.transport.ReceiveCommand;
67 import org.eclipse.jgit.util.time.MonotonicClock;
68 import org.eclipse.jgit.util.time.ProposedTimestamp;
69
70
71
72
73
74 public class LocalReplica extends KetchReplica {
75
76
77
78
79
80
81
82
83
84
85 public LocalReplica(KetchLeader leader, String name, ReplicaConfig cfg) {
86 super(leader, name, cfg);
87 }
88
89
90 @Override
91 protected String describeForLog() {
92 return String.format("%s (leader)", getName());
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106 void initialize(Repository repo) throws IOException {
107 RefDatabase refdb = repo.getRefDatabase();
108 if (refdb instanceof RefTreeDatabase) {
109 RefTreeDatabase treeDb = (RefTreeDatabase) refdb;
110 String txnNamespace = getSystem().getTxnNamespace();
111 if (!txnNamespace.equals(treeDb.getTxnNamespace())) {
112 throw new IOException(MessageFormat.format(
113 KetchText.get().mismatchedTxnNamespace,
114 txnNamespace, treeDb.getTxnNamespace()));
115 }
116 refdb = treeDb.getBootstrap();
117 }
118 initialize(refdb.exactRef(
119 getSystem().getTxnAccepted(),
120 getSystem().getTxnCommitted()));
121 }
122
123
124 @Override
125 protected void startPush(ReplicaPushRequest req) {
126 getSystem().getExecutor().execute(new Runnable() {
127 @Override
128 public void run() {
129 MonotonicClock clk = getSystem().getClock();
130 try (Repository git = getLeader().openRepository();
131 ProposedTimestamp ts = clk.propose()) {
132 try {
133 update(git, req, ts);
134 req.done(git);
135 } catch (Throwable err) {
136 req.setException(git, err);
137 }
138 } catch (IOException err) {
139 req.setException(null, err);
140 }
141 }
142 });
143 }
144
145
146 @Override
147 protected void blockingFetch(Repository repo, ReplicaFetchRequest req)
148 throws IOException {
149 throw new IOException(KetchText.get().cannotFetchFromLocalReplica);
150 }
151
152 private void update(Repository git, ReplicaPushRequest req,
153 ProposedTimestamp ts) throws IOException {
154 RefDatabase refdb = git.getRefDatabase();
155 CommitMethod method = getCommitMethod();
156
157
158
159 if (refdb instanceof RefTreeDatabase) {
160 if (!isOnlyTxnNamespace(req.getCommands())) {
161 return;
162 }
163
164 refdb = ((RefTreeDatabase) refdb).getBootstrap();
165 method = TXN_COMMITTED;
166 }
167
168 BatchRefUpdate batch = refdb.newBatchUpdate();
169 batch.addProposedTimestamp(ts);
170 batch.setRefLogIdent(getSystem().newCommitter(ts));
171 batch.setRefLogMessage("ketch", false);
172 batch.setAllowNonFastForwards(true);
173
174
175
176
177
178 ReceiveCommand accepted = null;
179 ReceiveCommand committed = null;
180 for (ReceiveCommand cmd : req.getCommands()) {
181 String name = cmd.getRefName();
182 if (name.equals(getSystem().getTxnAccepted())) {
183 accepted = cmd;
184 } else if (name.equals(getSystem().getTxnCommitted())) {
185 committed = cmd;
186 } else {
187 batch.addCommand(cmd);
188 }
189 }
190 if (committed != null && method == ALL_REFS) {
191 Map<String, Ref> refs = refdb.getRefs(ALL);
192 batch.addCommand(prepareCommit(git, refs, committed.getNewId()));
193 }
194 if (accepted != null) {
195 batch.addCommand(accepted);
196 }
197 if (committed != null) {
198 batch.addCommand(committed);
199 }
200
201 try (RevWalk rw = new RevWalk(git)) {
202 batch.execute(rw, NullProgressMonitor.INSTANCE);
203 }
204
205
206
207
208 List<String> failed = new ArrayList<>(2);
209 checkFailed(failed, accepted);
210 checkFailed(failed, committed);
211 if (!failed.isEmpty()) {
212 String[] arr = failed.toArray(new String[0]);
213 req.setRefs(refdb.exactRef(arr));
214 }
215 }
216
217 private static void checkFailed(List<String> failed, ReceiveCommand cmd) {
218 if (cmd != null && cmd.getResult() != OK) {
219 failed.add(cmd.getRefName());
220 }
221 }
222
223 private boolean isOnlyTxnNamespace(Collection<ReceiveCommand> cmdList) {
224
225
226
227 String txnNamespace = getSystem().getTxnNamespace();
228 for (ReceiveCommand cmd : cmdList) {
229 if (!cmd.getRefName().startsWith(txnNamespace)) {
230 cmd.setResult(REJECTED_OTHER_REASON,
231 MessageFormat.format(
232 KetchText.get().outsideTxnNamespace,
233 cmd.getRefName(), txnNamespace));
234 ReceiveCommand.abort(cmdList);
235 return false;
236 }
237 }
238 return true;
239 }
240 }