1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.ketch;
45
46 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod.ALL_REFS;
47 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod.TXN_COMMITTED;
48 import static org.eclipse.jgit.lib.RefDatabase.ALL;
49 import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
50 import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_OTHER_REASON;
51
52 import java.io.IOException;
53 import java.text.MessageFormat;
54 import java.util.ArrayList;
55 import java.util.Collection;
56 import java.util.List;
57 import java.util.Map;
58
59 import org.eclipse.jgit.internal.storage.reftree.RefTreeDatabase;
60 import org.eclipse.jgit.lib.BatchRefUpdate;
61 import org.eclipse.jgit.lib.NullProgressMonitor;
62 import org.eclipse.jgit.lib.Ref;
63 import org.eclipse.jgit.lib.RefDatabase;
64 import org.eclipse.jgit.lib.Repository;
65 import org.eclipse.jgit.revwalk.RevWalk;
66 import org.eclipse.jgit.transport.ReceiveCommand;
67
68
69 public class LocalReplica extends KetchReplica {
70
71
72
73
74
75
76
77
78
79
80 public LocalReplica(KetchLeader leader, String name, ReplicaConfig cfg) {
81 super(leader, name, cfg);
82 }
83
84 @Override
85 protected String describeForLog() {
86 return String.format("%s (leader)", getName());
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100 void initialize(Repository repo) throws IOException {
101 RefDatabase refdb = repo.getRefDatabase();
102 if (refdb instanceof RefTreeDatabase) {
103 RefTreeDatabase treeDb = (RefTreeDatabase) refdb;
104 String txnNamespace = getSystem().getTxnNamespace();
105 if (!txnNamespace.equals(treeDb.getTxnNamespace())) {
106 throw new IOException(MessageFormat.format(
107 KetchText.get().mismatchedTxnNamespace,
108 txnNamespace, treeDb.getTxnNamespace()));
109 }
110 refdb = treeDb.getBootstrap();
111 }
112 initialize(refdb.exactRef(
113 getSystem().getTxnAccepted(),
114 getSystem().getTxnCommitted()));
115 }
116
117 @Override
118 protected void startPush(final ReplicaPushRequest req) {
119 getSystem().getExecutor().execute(new Runnable() {
120 @Override
121 public void run() {
122 try (Repository git = getLeader().openRepository()) {
123 try {
124 update(git, req);
125 req.done(git);
126 } catch (Throwable err) {
127 req.setException(git, err);
128 }
129 } catch (IOException err) {
130 req.setException(null, err);
131 }
132 }
133 });
134 }
135
136 @Override
137 protected void blockingFetch(Repository repo, ReplicaFetchRequest req)
138 throws IOException {
139 throw new IOException(KetchText.get().cannotFetchFromLocalReplica);
140 }
141
142 private void update(Repository git, ReplicaPushRequest req)
143 throws IOException {
144 RefDatabase refdb = git.getRefDatabase();
145 CommitMethod method = getCommitMethod();
146
147
148
149 if (refdb instanceof RefTreeDatabase) {
150 if (!isOnlyTxnNamespace(req.getCommands())) {
151 return;
152 }
153
154 refdb = ((RefTreeDatabase) refdb).getBootstrap();
155 method = TXN_COMMITTED;
156 }
157
158 BatchRefUpdate batch = refdb.newBatchUpdate();
159 batch.setRefLogIdent(getSystem().newCommitter());
160 batch.setRefLogMessage("ketch", false);
161 batch.setAllowNonFastForwards(true);
162
163
164
165
166
167 ReceiveCommand accepted = null;
168 ReceiveCommand committed = null;
169 for (ReceiveCommand cmd : req.getCommands()) {
170 String name = cmd.getRefName();
171 if (name.equals(getSystem().getTxnAccepted())) {
172 accepted = cmd;
173 } else if (name.equals(getSystem().getTxnCommitted())) {
174 committed = cmd;
175 } else {
176 batch.addCommand(cmd);
177 }
178 }
179 if (committed != null && method == ALL_REFS) {
180 Map<String, Ref> refs = refdb.getRefs(ALL);
181 batch.addCommand(prepareCommit(git, refs, committed.getNewId()));
182 }
183 if (accepted != null) {
184 batch.addCommand(accepted);
185 }
186 if (committed != null) {
187 batch.addCommand(committed);
188 }
189
190 try (RevWalk rw = new RevWalk(git)) {
191 batch.execute(rw, NullProgressMonitor.INSTANCE);
192 }
193
194
195
196
197 List<String> failed = new ArrayList<>(2);
198 checkFailed(failed, accepted);
199 checkFailed(failed, committed);
200 if (!failed.isEmpty()) {
201 String[] arr = failed.toArray(new String[failed.size()]);
202 req.setRefs(refdb.exactRef(arr));
203 }
204 }
205
206 private static void checkFailed(List<String> failed, ReceiveCommand cmd) {
207 if (cmd != null && cmd.getResult() != OK) {
208 failed.add(cmd.getRefName());
209 }
210 }
211
212 private boolean isOnlyTxnNamespace(Collection<ReceiveCommand> cmdList) {
213
214
215
216 String txnNamespace = getSystem().getTxnNamespace();
217 for (ReceiveCommand cmd : cmdList) {
218 if (!cmd.getRefName().startsWith(txnNamespace)) {
219 cmd.setResult(REJECTED_OTHER_REASON,
220 MessageFormat.format(
221 KetchText.get().outsideTxnNamespace,
222 cmd.getRefName(), txnNamespace));
223 ReceiveCommand.abort(cmdList);
224 return false;
225 }
226 }
227 return true;
228 }
229 }