1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 package org.eclipse.jgit.transport.sshd;
44
45 import static java.text.MessageFormat.format;
46
47 import java.io.IOException;
48 import java.io.InputStream;
49 import java.io.InterruptedIOException;
50 import java.io.OutputStream;
51 import java.time.Duration;
52 import java.util.ArrayList;
53 import java.util.Collection;
54 import java.util.EnumSet;
55 import java.util.List;
56 import java.util.concurrent.CopyOnWriteArrayList;
57 import java.util.concurrent.TimeUnit;
58 import java.util.concurrent.atomic.AtomicReference;
59 import java.util.function.Supplier;
60
61 import org.apache.sshd.client.SshClient;
62 import org.apache.sshd.client.channel.ChannelExec;
63 import org.apache.sshd.client.channel.ClientChannelEvent;
64 import org.apache.sshd.client.session.ClientSession;
65 import org.apache.sshd.client.subsystem.sftp.SftpClient;
66 import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
67 import org.apache.sshd.client.subsystem.sftp.SftpClient.CopyMode;
68 import org.apache.sshd.client.subsystem.sftp.SftpClientFactory;
69 import org.apache.sshd.common.session.Session;
70 import org.apache.sshd.common.session.SessionListener;
71 import org.apache.sshd.common.subsystem.sftp.SftpException;
72 import org.eclipse.jgit.annotations.NonNull;
73 import org.eclipse.jgit.internal.transport.sshd.SshdText;
74 import org.eclipse.jgit.transport.FtpChannel;
75 import org.eclipse.jgit.transport.RemoteSession;
76 import org.eclipse.jgit.transport.URIish;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79
80
81
82
83
84
85 public class SshdSession implements RemoteSession {
86
87 private static final Logger LOG = LoggerFactory
88 .getLogger(SshdSession.class);
89
90 private final CopyOnWriteArrayList<SessionCloseListener> listeners = new CopyOnWriteArrayList<>();
91
92 private final URIish uri;
93
94 private SshClient client;
95
96 private ClientSession session;
97
98 SshdSession(URIish uri, Supplier<SshClient> clientFactory) {
99 this.uri = uri;
100 this.client = clientFactory.get();
101 }
102
103 void connect(Duration timeout) throws IOException {
104 if (!client.isStarted()) {
105 client.start();
106 }
107 try {
108 String username = uri.getUser();
109 String host = uri.getHost();
110 int port = uri.getPort();
111 long t = timeout.toMillis();
112 if (t <= 0) {
113 session = client.connect(username, host, port).verify()
114 .getSession();
115 } else {
116 session = client.connect(username, host, port)
117 .verify(timeout.toMillis()).getSession();
118 }
119 session.addSessionListener(new SessionListener() {
120
121 @Override
122 public void sessionClosed(Session s) {
123 notifyCloseListeners();
124 }
125 });
126
127 session.auth().verify(session.getAuthTimeout());
128 } catch (IOException e) {
129 disconnect(e);
130 throw e;
131 }
132 }
133
134
135
136
137
138
139
140
141 public void addCloseListener(@NonNull SessionCloseListener listener) {
142 listeners.addIfAbsent(listener);
143 }
144
145
146
147
148
149
150
151
152 public void removeCloseListener(@NonNull SessionCloseListener listener) {
153 listeners.remove(listener);
154 }
155
156 private void notifyCloseListeners() {
157 for (SessionCloseListener l : listeners) {
158 try {
159 l.sessionClosed(this);
160 } catch (RuntimeException e) {
161 LOG.warn(SshdText.get().closeListenerFailed, e);
162 }
163 }
164 }
165
166 @Override
167 public Process exec(String commandName, int timeout) throws IOException {
168 @SuppressWarnings("resource")
169 ChannelExec exec = session.createExecChannel(commandName);
170 long timeoutMillis = TimeUnit.SECONDS.toMillis(timeout);
171 try {
172 if (timeout <= 0) {
173 exec.open().verify();
174 } else {
175 long start = System.nanoTime();
176 exec.open().verify(timeoutMillis);
177 timeoutMillis -= TimeUnit.NANOSECONDS
178 .toMillis(System.nanoTime() - start);
179 }
180 } catch (IOException e) {
181 exec.close(true);
182 throw e;
183 } catch (RuntimeException e) {
184 exec.close(true);
185 throw e;
186 }
187 if (timeout > 0 && timeoutMillis <= 0) {
188
189 exec.close(true);
190 throw new InterruptedIOException(
191 format(SshdText.get().sshCommandTimeout, commandName,
192 Integer.valueOf(timeout)));
193 }
194 return new SshdExecProcess(exec, commandName, timeoutMillis);
195 }
196
197
198
199
200
201 @Override
202 @NonNull
203 public FtpChannel getFtpChannel() {
204 return new SshdFtpChannel();
205 }
206
207 @Override
208 public void disconnect() {
209 disconnect(null);
210 }
211
212 private void disconnect(Throwable reason) {
213 try {
214 if (session != null) {
215 session.close();
216 session = null;
217 }
218 } catch (IOException e) {
219 if (reason != null) {
220 reason.addSuppressed(e);
221 } else {
222 LOG.error(SshdText.get().sessionCloseFailed, e);
223 }
224 } finally {
225 client.stop();
226 client = null;
227 }
228 }
229
230 private static class SshdExecProcess extends Process {
231
232 private final ChannelExec channel;
233
234 private final long timeoutMillis;
235
236 private final String commandName;
237
238 public SshdExecProcess(ChannelExec channel, String commandName,
239 long timeoutMillis) {
240 this.channel = channel;
241 this.timeoutMillis = timeoutMillis > 0 ? timeoutMillis : -1L;
242 this.commandName = commandName;
243 }
244
245 @Override
246 public OutputStream getOutputStream() {
247 return channel.getInvertedIn();
248 }
249
250 @Override
251 public InputStream getInputStream() {
252 return channel.getInvertedOut();
253 }
254
255 @Override
256 public InputStream getErrorStream() {
257 return channel.getInvertedErr();
258 }
259
260 @Override
261 public int waitFor() throws InterruptedException {
262 if (waitFor(timeoutMillis, TimeUnit.MILLISECONDS)) {
263 return exitValue();
264 }
265 return -1;
266 }
267
268 @Override
269 public boolean waitFor(long timeout, TimeUnit unit)
270 throws InterruptedException {
271 long millis = timeout >= 0 ? unit.toMillis(timeout) : -1L;
272 return channel
273 .waitFor(EnumSet.of(ClientChannelEvent.CLOSED), millis)
274 .contains(ClientChannelEvent.CLOSED);
275 }
276
277 @Override
278 public int exitValue() {
279 Integer exitCode = channel.getExitStatus();
280 if (exitCode == null) {
281 throw new IllegalThreadStateException(
282 format(SshdText.get().sshProcessStillRunning,
283 commandName));
284 }
285 return exitCode.intValue();
286 }
287
288 @Override
289 public void destroy() {
290 if (channel.isOpen()) {
291 channel.close(true);
292 }
293 }
294 }
295
296
297
298
299
300
301
302
303 @FunctionalInterface
304 private interface FtpOperation<T> {
305
306 T call() throws IOException;
307
308 }
309
310 private class SshdFtpChannel implements FtpChannel {
311
312 private SftpClient ftp;
313
314
315 private String cwd = "";
316
317 @Override
318 public void connect(int timeout, TimeUnit unit) throws IOException {
319 if (timeout <= 0) {
320 session.getProperties().put(
321 SftpClient.SFTP_CHANNEL_OPEN_TIMEOUT,
322 Long.valueOf(Long.MAX_VALUE));
323 } else {
324 session.getProperties().put(
325 SftpClient.SFTP_CHANNEL_OPEN_TIMEOUT,
326 Long.valueOf(unit.toMillis(timeout)));
327 }
328 ftp = SftpClientFactory.instance().createSftpClient(session);
329 try {
330 cd(cwd);
331 } catch (IOException e) {
332 ftp.close();
333 }
334 }
335
336 @Override
337 public void disconnect() {
338 try {
339 ftp.close();
340 } catch (IOException e) {
341 LOG.error(SshdText.get().ftpCloseFailed, e);
342 }
343 }
344
345 @Override
346 public boolean isConnected() {
347 return session.isAuthenticated() && ftp.isOpen();
348 }
349
350 private String absolute(String path) {
351 if (path.isEmpty()) {
352 return cwd;
353 }
354
355
356
357 if (path.charAt(0) != '/') {
358 if (cwd.charAt(cwd.length() - 1) == '/') {
359 return cwd + path;
360 } else {
361 return cwd + '/' + path;
362 }
363 }
364 return path;
365 }
366
367 private <T> T map(FtpOperation<T> op) throws IOException {
368 try {
369 return op.call();
370 } catch (IOException e) {
371 if (e instanceof SftpException) {
372 throw new FtpChannel.FtpException(e.getLocalizedMessage(),
373 ((SftpException) e).getStatus(), e);
374 }
375 throw e;
376 }
377 }
378
379 @Override
380 public void cd(String path) throws IOException {
381 cwd = map(() -> ftp.canonicalPath(absolute(path)));
382 if (cwd.isEmpty()) {
383 cwd += '/';
384 }
385 }
386
387 @Override
388 public String pwd() throws IOException {
389 return cwd;
390 }
391
392 @Override
393 public Collection<DirEntry> ls(String path) throws IOException {
394 return map(() -> {
395 List<DirEntry> result = new ArrayList<>();
396 try (CloseableHandle handle = ftp.openDir(absolute(path))) {
397 AtomicReference<Boolean> atEnd = new AtomicReference<>(
398 Boolean.FALSE);
399 while (!atEnd.get().booleanValue()) {
400 List<SftpClient.DirEntry> chunk = ftp.readDir(handle,
401 atEnd);
402 if (chunk == null) {
403 break;
404 }
405 for (SftpClient.DirEntry remote : chunk) {
406 result.add(new DirEntry() {
407
408 @Override
409 public String getFilename() {
410 return remote.getFilename();
411 }
412
413 @Override
414 public long getModifiedTime() {
415 return remote.getAttributes()
416 .getModifyTime().toMillis();
417 }
418
419 @Override
420 public boolean isDirectory() {
421 return remote.getAttributes().isDirectory();
422 }
423
424 });
425 }
426 }
427 }
428 return result;
429 });
430 }
431
432 @Override
433 public void rmdir(String path) throws IOException {
434 map(() -> {
435 ftp.rmdir(absolute(path));
436 return null;
437 });
438
439 }
440
441 @Override
442 public void mkdir(String path) throws IOException {
443 map(() -> {
444 ftp.mkdir(absolute(path));
445 return null;
446 });
447 }
448
449 @Override
450 public InputStream get(String path) throws IOException {
451 return map(() -> ftp.read(absolute(path)));
452 }
453
454 @Override
455 public OutputStream put(String path) throws IOException {
456 return map(() -> ftp.write(absolute(path)));
457 }
458
459 @Override
460 public void rm(String path) throws IOException {
461 map(() -> {
462 ftp.remove(absolute(path));
463 return null;
464 });
465 }
466
467 @Override
468 public void rename(String from, String to) throws IOException {
469 map(() -> {
470 String src = absolute(from);
471 String dest = absolute(to);
472 try {
473 ftp.rename(src, dest, CopyMode.Atomic, CopyMode.Overwrite);
474 } catch (UnsupportedOperationException e) {
475
476 if (!src.equals(dest)) {
477 delete(dest);
478 ftp.rename(src, dest);
479 }
480 }
481 return null;
482 });
483 }
484 }
485 }