1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.ketch;
45
46 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod.ALL_REFS;
47 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod.TXN_COMMITTED;
48 import static org.eclipse.jgit.lib.RefDatabase.ALL;
49 import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
50 import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_OTHER_REASON;
51
52 import java.io.IOException;
53 import java.text.MessageFormat;
54 import java.util.ArrayList;
55 import java.util.Collection;
56 import java.util.List;
57 import java.util.Map;
58
59 import org.eclipse.jgit.internal.storage.reftree.RefTreeDatabase;
60 import org.eclipse.jgit.lib.BatchRefUpdate;
61 import org.eclipse.jgit.lib.NullProgressMonitor;
62 import org.eclipse.jgit.lib.Ref;
63 import org.eclipse.jgit.lib.RefDatabase;
64 import org.eclipse.jgit.lib.Repository;
65 import org.eclipse.jgit.revwalk.RevWalk;
66 import org.eclipse.jgit.transport.ReceiveCommand;
67 import org.eclipse.jgit.util.time.MonotonicClock;
68 import org.eclipse.jgit.util.time.ProposedTimestamp;
69
70
71 public class LocalReplica extends KetchReplica {
72
73
74
75
76
77
78
79
80
81
82 public LocalReplica(KetchLeader leader, String name, ReplicaConfig cfg) {
83 super(leader, name, cfg);
84 }
85
86 @Override
87 protected String describeForLog() {
88 return String.format("%s (leader)", getName());
89 }
90
91
92
93
94
95
96
97
98
99
100
101
102 void initialize(Repository repo) throws IOException {
103 RefDatabase refdb = repo.getRefDatabase();
104 if (refdb instanceof RefTreeDatabase) {
105 RefTreeDatabase treeDb = (RefTreeDatabase) refdb;
106 String txnNamespace = getSystem().getTxnNamespace();
107 if (!txnNamespace.equals(treeDb.getTxnNamespace())) {
108 throw new IOException(MessageFormat.format(
109 KetchText.get().mismatchedTxnNamespace,
110 txnNamespace, treeDb.getTxnNamespace()));
111 }
112 refdb = treeDb.getBootstrap();
113 }
114 initialize(refdb.exactRef(
115 getSystem().getTxnAccepted(),
116 getSystem().getTxnCommitted()));
117 }
118
119 @Override
120 protected void startPush(final ReplicaPushRequest req) {
121 getSystem().getExecutor().execute(new Runnable() {
122 @Override
123 public void run() {
124 MonotonicClock clk = getSystem().getClock();
125 try (Repository git = getLeader().openRepository();
126 ProposedTimestamp ts = clk.propose()) {
127 try {
128 update(git, req, ts);
129 req.done(git);
130 } catch (Throwable err) {
131 req.setException(git, err);
132 }
133 } catch (IOException err) {
134 req.setException(null, err);
135 }
136 }
137 });
138 }
139
140 @Override
141 protected void blockingFetch(Repository repo, ReplicaFetchRequest req)
142 throws IOException {
143 throw new IOException(KetchText.get().cannotFetchFromLocalReplica);
144 }
145
146 private void update(Repository git, ReplicaPushRequest req,
147 ProposedTimestamp ts) throws IOException {
148 RefDatabase refdb = git.getRefDatabase();
149 CommitMethod method = getCommitMethod();
150
151
152
153 if (refdb instanceof RefTreeDatabase) {
154 if (!isOnlyTxnNamespace(req.getCommands())) {
155 return;
156 }
157
158 refdb = ((RefTreeDatabase) refdb).getBootstrap();
159 method = TXN_COMMITTED;
160 }
161
162 BatchRefUpdate batch = refdb.newBatchUpdate();
163 batch.addProposedTimestamp(ts);
164 batch.setRefLogIdent(getSystem().newCommitter(ts));
165 batch.setRefLogMessage("ketch", false);
166 batch.setAllowNonFastForwards(true);
167
168
169
170
171
172 ReceiveCommand accepted = null;
173 ReceiveCommand committed = null;
174 for (ReceiveCommand cmd : req.getCommands()) {
175 String name = cmd.getRefName();
176 if (name.equals(getSystem().getTxnAccepted())) {
177 accepted = cmd;
178 } else if (name.equals(getSystem().getTxnCommitted())) {
179 committed = cmd;
180 } else {
181 batch.addCommand(cmd);
182 }
183 }
184 if (committed != null && method == ALL_REFS) {
185 Map<String, Ref> refs = refdb.getRefs(ALL);
186 batch.addCommand(prepareCommit(git, refs, committed.getNewId()));
187 }
188 if (accepted != null) {
189 batch.addCommand(accepted);
190 }
191 if (committed != null) {
192 batch.addCommand(committed);
193 }
194
195 try (RevWalk rw = new RevWalk(git)) {
196 batch.execute(rw, NullProgressMonitor.INSTANCE);
197 }
198
199
200
201
202 List<String> failed = new ArrayList<>(2);
203 checkFailed(failed, accepted);
204 checkFailed(failed, committed);
205 if (!failed.isEmpty()) {
206 String[] arr = failed.toArray(new String[failed.size()]);
207 req.setRefs(refdb.exactRef(arr));
208 }
209 }
210
211 private static void checkFailed(List<String> failed, ReceiveCommand cmd) {
212 if (cmd != null && cmd.getResult() != OK) {
213 failed.add(cmd.getRefName());
214 }
215 }
216
217 private boolean isOnlyTxnNamespace(Collection<ReceiveCommand> cmdList) {
218
219
220
221 String txnNamespace = getSystem().getTxnNamespace();
222 for (ReceiveCommand cmd : cmdList) {
223 if (!cmd.getRefName().startsWith(txnNamespace)) {
224 cmd.setResult(REJECTED_OTHER_REASON,
225 MessageFormat.format(
226 KetchText.get().outsideTxnNamespace,
227 cmd.getRefName(), txnNamespace));
228 ReceiveCommand.abort(cmdList);
229 return false;
230 }
231 }
232 return true;
233 }
234 }