1
2
3
4
5
6
7
8
9
10
11 package org.eclipse.jgit.internal.ketch;
12
13 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod.ALL_REFS;
14 import static org.eclipse.jgit.lib.Ref.Storage.NETWORK;
15 import static org.eclipse.jgit.transport.ReceiveCommand.Result.LOCK_FAILURE;
16 import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
17 import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
18 import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_NODELETE;
19 import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_NONFASTFORWARD;
20 import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_OTHER_REASON;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.LinkedHashMap;
27 import java.util.List;
28 import java.util.Map;
29
30 import org.eclipse.jgit.annotations.Nullable;
31 import org.eclipse.jgit.errors.NotSupportedException;
32 import org.eclipse.jgit.errors.TransportException;
33 import org.eclipse.jgit.lib.AnyObjectId;
34 import org.eclipse.jgit.lib.NullProgressMonitor;
35 import org.eclipse.jgit.lib.ObjectId;
36 import org.eclipse.jgit.lib.ObjectIdRef;
37 import org.eclipse.jgit.lib.Ref;
38 import org.eclipse.jgit.lib.Repository;
39 import org.eclipse.jgit.transport.FetchConnection;
40 import org.eclipse.jgit.transport.PushConnection;
41 import org.eclipse.jgit.transport.ReceiveCommand;
42 import org.eclipse.jgit.transport.RemoteConfig;
43 import org.eclipse.jgit.transport.RemoteRefUpdate;
44 import org.eclipse.jgit.transport.Transport;
45 import org.eclipse.jgit.transport.URIish;
46
47
48
49
50
51
52
53
54
55 public class RemoteGitReplica extends KetchReplica {
56 private final URIish uri;
57 private final RemoteConfig remoteConfig;
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 public RemoteGitReplica(KetchLeader leader, String name, URIish uri,
75 ReplicaConfig cfg, @Nullable RemoteConfig rc) {
76 super(leader, name, cfg);
77 this.uri = uri;
78 this.remoteConfig = rc;
79 }
80
81
82
83
84
85
86 public URIish getURI() {
87 return uri;
88 }
89
90
91
92
93
94
95 @Nullable
96 protected RemoteConfig getRemoteConfig() {
97 return remoteConfig;
98 }
99
100
101 @Override
102 protected String describeForLog() {
103 return String.format("%s @ %s", getName(), getURI());
104 }
105
106
107 @Override
108 protected void startPush(ReplicaPushRequest req) {
109 getSystem().getExecutor().execute(() -> {
110 try (Repository git = getLeader().openRepository()) {
111 try {
112 push(git, req);
113 req.done(git);
114 } catch (Throwable err) {
115 req.setException(git, err);
116 }
117 } catch (IOException err) {
118 req.setException(null, err);
119 }
120 });
121 }
122
123 private void push(Repository repo, ReplicaPushRequest req)
124 throws NotSupportedException, TransportException, IOException {
125 Map<String, Ref> adv;
126 List<RemoteCommand> cmds = asUpdateList(req.getCommands());
127 try (Transport transport = Transport.open(repo, uri)) {
128 RemoteConfig rc = getRemoteConfig();
129 if (rc != null) {
130 transport.applyConfig(rc);
131 }
132 transport.setPushAtomic(true);
133 adv = push(repo, transport, cmds);
134 }
135 for (RemoteCommand c : cmds) {
136 c.copyStatusToResult();
137 }
138 req.setRefs(adv);
139 }
140
141 private Map<String, Ref> push(Repository git, Transport transport,
142 List<RemoteCommand> cmds) throws IOException {
143 Map<String, RemoteRefUpdate> updates = asUpdateMap(cmds);
144 try (PushConnection connection = transport.openPush()) {
145 Map<String, Ref> adv = connection.getRefsMap();
146 RemoteRefUpdate accepted = updates.get(getSystem().getTxnAccepted());
147 if (accepted != null && !isExpectedValue(adv, accepted)) {
148 abort(cmds);
149 return adv;
150 }
151
152 RemoteRefUpdate committed = updates.get(getSystem().getTxnCommitted());
153 if (committed != null && !isExpectedValue(adv, committed)) {
154 abort(cmds);
155 return adv;
156 }
157 if (committed != null && getCommitMethod() == ALL_REFS) {
158 prepareCommit(git, cmds, updates, adv,
159 committed.getNewObjectId());
160 }
161
162 connection.push(NullProgressMonitor.INSTANCE, updates);
163 return adv;
164 }
165 }
166
167 private static boolean isExpectedValue(Map<String, Ref> adv,
168 RemoteRefUpdate u) {
169 Ref r = adv.get(u.getRemoteName());
170 if (!AnyObjectId.isEqual(getId(r), u.getExpectedOldObjectId())) {
171 ((RemoteCommand) u).cmd.setResult(LOCK_FAILURE);
172 return false;
173 }
174 return true;
175 }
176
177 private void prepareCommit(Repository git, List<RemoteCommand> cmds,
178 Map<String, RemoteRefUpdate> updates, Map<String, Ref> adv,
179 ObjectId committed) throws IOException {
180 for (ReceiveCommand cmd : prepareCommit(git, adv, committed)) {
181 RemoteCommand c = new RemoteCommand(cmd);
182 cmds.add(c);
183 updates.put(c.getRemoteName(), c);
184 }
185 }
186
187 private static List<RemoteCommand> asUpdateList(
188 Collection<ReceiveCommand> cmds) {
189 try {
190 List<RemoteCommand> toPush = new ArrayList<>(cmds.size());
191 for (ReceiveCommand cmd : cmds) {
192 toPush.add(new RemoteCommand(cmd));
193 }
194 return toPush;
195 } catch (IOException e) {
196
197 throw new IllegalStateException(e);
198 }
199 }
200
201 private static Map<String, RemoteRefUpdate> asUpdateMap(
202 List<RemoteCommand> cmds) {
203 Map<String, RemoteRefUpdate> m = new LinkedHashMap<>();
204 for (RemoteCommand cmd : cmds) {
205 m.put(cmd.getRemoteName(), cmd);
206 }
207 return m;
208 }
209
210 private static void abort(List<RemoteCommand> cmds) {
211 List<ReceiveCommand> tmp = new ArrayList<>(cmds.size());
212 for (RemoteCommand cmd : cmds) {
213 tmp.add(cmd.cmd);
214 }
215 ReceiveCommand.abort(tmp);
216 }
217
218
219 @Override
220 protected void blockingFetch(Repository repo, ReplicaFetchRequest req)
221 throws NotSupportedException, TransportException {
222 try (Transport transport = Transport.open(repo, uri)) {
223 RemoteConfig rc = getRemoteConfig();
224 if (rc != null) {
225 transport.applyConfig(rc);
226 }
227 fetch(transport, req);
228 }
229 }
230
231 private void fetch(Transport transport, ReplicaFetchRequest req)
232 throws NotSupportedException, TransportException {
233 try (FetchConnection conn = transport.openFetch()) {
234 Map<String, Ref> remoteRefs = conn.getRefsMap();
235 req.setRefs(remoteRefs);
236
237 List<Ref> want = new ArrayList<>();
238 for (String name : req.getWantRefs()) {
239 Ref ref = remoteRefs.get(name);
240 if (ref != null && ref.getObjectId() != null) {
241 want.add(ref);
242 }
243 }
244 for (ObjectId id : req.getWantObjects()) {
245 want.add(new ObjectIdRef.Unpeeled(NETWORK, id.name(), id));
246 }
247
248 conn.fetch(NullProgressMonitor.INSTANCE, want,
249 Collections.<ObjectId> emptySet());
250 }
251 }
252
253 static class RemoteCommand extends RemoteRefUpdate {
254 final ReceiveCommand cmd;
255
256 RemoteCommand(ReceiveCommand cmd) throws IOException {
257 super(null, null,
258 cmd.getNewId(), cmd.getRefName(),
259 true ,
260 null ,
261 cmd.getOldId());
262 this.cmd = cmd;
263 }
264
265 void copyStatusToResult() {
266 if (cmd.getResult() == NOT_ATTEMPTED) {
267 switch (getStatus()) {
268 case OK:
269 case UP_TO_DATE:
270 case NON_EXISTING:
271 cmd.setResult(OK);
272 break;
273
274 case REJECTED_NODELETE:
275 cmd.setResult(REJECTED_NODELETE);
276 break;
277
278 case REJECTED_NONFASTFORWARD:
279 cmd.setResult(REJECTED_NONFASTFORWARD);
280 break;
281
282 case REJECTED_OTHER_REASON:
283 cmd.setResult(REJECTED_OTHER_REASON, getMessage());
284 break;
285
286 default:
287 cmd.setResult(REJECTED_OTHER_REASON, getStatus().name());
288 break;
289 }
290 }
291 }
292 }
293 }