1
2
3
4
5
6
7
8
9
10
11 package org.eclipse.jgit.internal.ketch;
12
13 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod.ALL_REFS;
14 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod.TXN_COMMITTED;
15 import static org.eclipse.jgit.lib.RefDatabase.ALL;
16 import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
17 import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_OTHER_REASON;
18
19 import java.io.IOException;
20 import java.text.MessageFormat;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.List;
24 import java.util.Map;
25
26 import org.eclipse.jgit.internal.storage.reftree.RefTreeDatabase;
27 import org.eclipse.jgit.lib.BatchRefUpdate;
28 import org.eclipse.jgit.lib.NullProgressMonitor;
29 import org.eclipse.jgit.lib.Ref;
30 import org.eclipse.jgit.lib.RefDatabase;
31 import org.eclipse.jgit.lib.Repository;
32 import org.eclipse.jgit.revwalk.RevWalk;
33 import org.eclipse.jgit.transport.ReceiveCommand;
34 import org.eclipse.jgit.util.time.MonotonicClock;
35 import org.eclipse.jgit.util.time.ProposedTimestamp;
36
37
38
39
40
41 public class LocalReplica extends KetchReplica {
42
43
44
45
46
47
48
49
50
51
52 public LocalReplica(KetchLeader leader, String name, ReplicaConfig cfg) {
53 super(leader, name, cfg);
54 }
55
56
57 @Override
58 protected String describeForLog() {
59 return String.format("%s (leader)", getName());
60 }
61
62
63
64
65
66
67
68
69
70
71
72
73 void initialize(Repository repo) throws IOException {
74 RefDatabase refdb = repo.getRefDatabase();
75 if (refdb instanceof RefTreeDatabase) {
76 RefTreeDatabase treeDb = (RefTreeDatabase) refdb;
77 String txnNamespace = getSystem().getTxnNamespace();
78 if (!txnNamespace.equals(treeDb.getTxnNamespace())) {
79 throw new IOException(MessageFormat.format(
80 KetchText.get().mismatchedTxnNamespace,
81 txnNamespace, treeDb.getTxnNamespace()));
82 }
83 refdb = treeDb.getBootstrap();
84 }
85 initialize(refdb.exactRef(
86 getSystem().getTxnAccepted(),
87 getSystem().getTxnCommitted()));
88 }
89
90
91 @Override
92 protected void startPush(ReplicaPushRequest req) {
93 getSystem().getExecutor().execute(() -> {
94 MonotonicClock clk = getSystem().getClock();
95 try (Repository git = getLeader().openRepository();
96 ProposedTimestamp ts = clk.propose()) {
97 try {
98 update(git, req, ts);
99 req.done(git);
100 } catch (Throwable err) {
101 req.setException(git, err);
102 }
103 } catch (IOException err) {
104 req.setException(null, err);
105 }
106 });
107 }
108
109
110 @Override
111 protected void blockingFetch(Repository repo, ReplicaFetchRequest req)
112 throws IOException {
113 throw new IOException(KetchText.get().cannotFetchFromLocalReplica);
114 }
115
116 private void update(Repository git, ReplicaPushRequest req,
117 ProposedTimestamp ts) throws IOException {
118 RefDatabase refdb = git.getRefDatabase();
119 CommitMethod method = getCommitMethod();
120
121
122
123 if (refdb instanceof RefTreeDatabase) {
124 if (!isOnlyTxnNamespace(req.getCommands())) {
125 return;
126 }
127
128 refdb = ((RefTreeDatabase) refdb).getBootstrap();
129 method = TXN_COMMITTED;
130 }
131
132 BatchRefUpdate batch = refdb.newBatchUpdate();
133 batch.addProposedTimestamp(ts);
134 batch.setRefLogIdent(getSystem().newCommitter(ts));
135 batch.setRefLogMessage("ketch", false);
136 batch.setAllowNonFastForwards(true);
137
138
139
140
141
142 ReceiveCommand accepted = null;
143 ReceiveCommand committed = null;
144 for (ReceiveCommand cmd : req.getCommands()) {
145 String name = cmd.getRefName();
146 if (name.equals(getSystem().getTxnAccepted())) {
147 accepted = cmd;
148 } else if (name.equals(getSystem().getTxnCommitted())) {
149 committed = cmd;
150 } else {
151 batch.addCommand(cmd);
152 }
153 }
154 if (committed != null && method == ALL_REFS) {
155 Map<String, Ref> refs = refdb.getRefs(ALL);
156 batch.addCommand(prepareCommit(git, refs, committed.getNewId()));
157 }
158 if (accepted != null) {
159 batch.addCommand(accepted);
160 }
161 if (committed != null) {
162 batch.addCommand(committed);
163 }
164
165 try (RevWalklk/RevWalk.html#RevWalk">RevWalk rw = new RevWalk(git)) {
166 batch.execute(rw, NullProgressMonitor.INSTANCE);
167 }
168
169
170
171
172 List<String> failed = new ArrayList<>(2);
173 checkFailed(failed, accepted);
174 checkFailed(failed, committed);
175 if (!failed.isEmpty()) {
176 String[] arr = failed.toArray(new String[0]);
177 req.setRefs(refdb.exactRef(arr));
178 }
179 }
180
181 private static void checkFailed(List<String> failed, ReceiveCommand cmd) {
182 if (cmd != null && cmd.getResult() != OK) {
183 failed.add(cmd.getRefName());
184 }
185 }
186
187 private boolean isOnlyTxnNamespace(Collection<ReceiveCommand> cmdList) {
188
189
190
191 String txnNamespace = getSystem().getTxnNamespace();
192 for (ReceiveCommand cmd : cmdList) {
193 if (!cmd.getRefName().startsWith(txnNamespace)) {
194 cmd.setResult(REJECTED_OTHER_REASON,
195 MessageFormat.format(
196 KetchText.get().outsideTxnNamespace,
197 cmd.getRefName(), txnNamespace));
198 ReceiveCommand.abort(cmdList);
199 return false;
200 }
201 }
202 return true;
203 }
204 }