1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 package org.eclipse.jgit.transport.sshd;
44
45 import static java.text.MessageFormat.format;
46
47 import java.io.IOException;
48 import java.io.InputStream;
49 import java.io.InterruptedIOException;
50 import java.io.OutputStream;
51 import java.time.Duration;
52 import java.util.ArrayList;
53 import java.util.Collection;
54 import java.util.EnumSet;
55 import java.util.List;
56 import java.util.concurrent.CopyOnWriteArrayList;
57 import java.util.concurrent.TimeUnit;
58 import java.util.concurrent.atomic.AtomicReference;
59 import java.util.function.Supplier;
60
61 import org.apache.sshd.client.SshClient;
62 import org.apache.sshd.client.channel.ChannelExec;
63 import org.apache.sshd.client.channel.ClientChannelEvent;
64 import org.apache.sshd.client.session.ClientSession;
65 import org.apache.sshd.client.subsystem.sftp.SftpClient;
66 import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
67 import org.apache.sshd.client.subsystem.sftp.SftpClient.CopyMode;
68 import org.apache.sshd.client.subsystem.sftp.SftpClientFactory;
69 import org.apache.sshd.common.session.Session;
70 import org.apache.sshd.common.session.SessionListener;
71 import org.apache.sshd.common.subsystem.sftp.SftpException;
72 import org.eclipse.jgit.annotations.NonNull;
73 import org.eclipse.jgit.internal.transport.sshd.SshdText;
74 import org.eclipse.jgit.transport.FtpChannel;
75 import org.eclipse.jgit.transport.RemoteSession;
76 import org.eclipse.jgit.transport.URIish;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79
80
81
82
83
84
85 public class SshdSession implements RemoteSession {
86
87 private static final Logger LOG = LoggerFactory
88 .getLogger(SshdSession.class);
89
90 private final CopyOnWriteArrayList<SessionCloseListener> listeners = new CopyOnWriteArrayList<>();
91
92 private final URIish uri;
93
94 private SshClient client;
95
96 private ClientSession session;
97
98 SshdSession(URIish uri, Supplier<SshClient> clientFactory) {
99 this.uri = uri;
100 this.client = clientFactory.get();
101 }
102
103 void connect(Duration timeout) throws IOException {
104 if (!client.isStarted()) {
105 client.start();
106 }
107 try {
108 String username = uri.getUser();
109 String host = uri.getHost();
110 int port = uri.getPort();
111 long t = timeout.toMillis();
112 if (t <= 0) {
113 session = client.connect(username, host, port).verify()
114 .getSession();
115 } else {
116 session = client.connect(username, host, port)
117 .verify(timeout.toMillis()).getSession();
118 }
119 session.addSessionListener(new SessionListener() {
120
121 @Override
122 public void sessionClosed(Session s) {
123 notifyCloseListeners();
124 }
125 });
126
127 session.auth().verify(session.getAuthTimeout());
128 } catch (IOException e) {
129 disconnect(e);
130 throw e;
131 }
132 }
133
134
135
136
137
138
139
140
141 public void addCloseListener(@NonNull SessionCloseListener listener) {
142 listeners.addIfAbsent(listener);
143 }
144
145
146
147
148
149
150
151
152 public void removeCloseListener(@NonNull SessionCloseListener listener) {
153 listeners.remove(listener);
154 }
155
156 private void notifyCloseListeners() {
157 for (SessionCloseListener l : listeners) {
158 try {
159 l.sessionClosed(this);
160 } catch (RuntimeException e) {
161 LOG.warn(SshdText.get().closeListenerFailed, e);
162 }
163 }
164 }
165
166 @Override
167 public Process exec(String commandName, int timeout) throws IOException {
168 @SuppressWarnings("resource")
169 ChannelExec exec = session.createExecChannel(commandName);
170 long timeoutMillis = TimeUnit.SECONDS.toMillis(timeout);
171 try {
172 if (timeout <= 0) {
173 exec.open().verify();
174 } else {
175 long start = System.nanoTime();
176 exec.open().verify(timeoutMillis);
177 timeoutMillis -= TimeUnit.NANOSECONDS
178 .toMillis(System.nanoTime() - start);
179 }
180 } catch (IOException | RuntimeException e) {
181 exec.close(true);
182 throw e;
183 }
184 if (timeout > 0 && timeoutMillis <= 0) {
185
186 exec.close(true);
187 throw new InterruptedIOException(
188 format(SshdText.get().sshCommandTimeout, commandName,
189 Integer.valueOf(timeout)));
190 }
191 return new SshdExecProcess(exec, commandName, timeoutMillis);
192 }
193
194
195
196
197
198 @Override
199 @NonNull
200 public FtpChannel getFtpChannel() {
201 return new SshdFtpChannel();
202 }
203
204 @Override
205 public void disconnect() {
206 disconnect(null);
207 }
208
209 private void disconnect(Throwable reason) {
210 try {
211 if (session != null) {
212 session.close();
213 session = null;
214 }
215 } catch (IOException e) {
216 if (reason != null) {
217 reason.addSuppressed(e);
218 } else {
219 LOG.error(SshdText.get().sessionCloseFailed, e);
220 }
221 } finally {
222 client.stop();
223 client = null;
224 }
225 }
226
227 private static class SshdExecProcess extends Process {
228
229 private final ChannelExec channel;
230
231 private final long timeoutMillis;
232
233 private final String commandName;
234
235 public SshdExecProcess(ChannelExec channel, String commandName,
236 long timeoutMillis) {
237 this.channel = channel;
238 this.timeoutMillis = timeoutMillis > 0 ? timeoutMillis : -1L;
239 this.commandName = commandName;
240 }
241
242 @Override
243 public OutputStream getOutputStream() {
244 return channel.getInvertedIn();
245 }
246
247 @Override
248 public InputStream getInputStream() {
249 return channel.getInvertedOut();
250 }
251
252 @Override
253 public InputStream getErrorStream() {
254 return channel.getInvertedErr();
255 }
256
257 @Override
258 public int waitFor() throws InterruptedException {
259 if (waitFor(timeoutMillis, TimeUnit.MILLISECONDS)) {
260 return exitValue();
261 }
262 return -1;
263 }
264
265 @Override
266 public boolean waitFor(long timeout, TimeUnit unit)
267 throws InterruptedException {
268 long millis = timeout >= 0 ? unit.toMillis(timeout) : -1L;
269 return channel
270 .waitFor(EnumSet.of(ClientChannelEvent.CLOSED), millis)
271 .contains(ClientChannelEvent.CLOSED);
272 }
273
274 @Override
275 public int exitValue() {
276 Integer exitCode = channel.getExitStatus();
277 if (exitCode == null) {
278 throw new IllegalThreadStateException(
279 format(SshdText.get().sshProcessStillRunning,
280 commandName));
281 }
282 return exitCode.intValue();
283 }
284
285 @Override
286 public void destroy() {
287 if (channel.isOpen()) {
288 channel.close(true);
289 }
290 }
291 }
292
293
294
295
296
297
298
299
300 @FunctionalInterface
301 private interface FtpOperation<T> {
302
303 T call() throws IOException;
304
305 }
306
307 private class SshdFtpChannel implements FtpChannel {
308
309 private SftpClient ftp;
310
311
312 private String cwd = "";
313
314 @Override
315 public void connect(int timeout, TimeUnit unit) throws IOException {
316 if (timeout <= 0) {
317 session.getProperties().put(
318 SftpClient.SFTP_CHANNEL_OPEN_TIMEOUT,
319 Long.valueOf(Long.MAX_VALUE));
320 } else {
321 session.getProperties().put(
322 SftpClient.SFTP_CHANNEL_OPEN_TIMEOUT,
323 Long.valueOf(unit.toMillis(timeout)));
324 }
325 ftp = SftpClientFactory.instance().createSftpClient(session);
326 try {
327 cd(cwd);
328 } catch (IOException e) {
329 ftp.close();
330 }
331 }
332
333 @Override
334 public void disconnect() {
335 try {
336 ftp.close();
337 } catch (IOException e) {
338 LOG.error(SshdText.get().ftpCloseFailed, e);
339 }
340 }
341
342 @Override
343 public boolean isConnected() {
344 return session.isAuthenticated() && ftp.isOpen();
345 }
346
347 private String absolute(String path) {
348 if (path.isEmpty()) {
349 return cwd;
350 }
351
352
353
354 if (path.charAt(0) != '/') {
355 if (cwd.charAt(cwd.length() - 1) == '/') {
356 return cwd + path;
357 }
358 return cwd + '/' + path;
359 }
360 return path;
361 }
362
363 private <T> T map(FtpOperation<T> op) throws IOException {
364 try {
365 return op.call();
366 } catch (IOException e) {
367 if (e instanceof SftpException) {
368 throw new FtpChannel.FtpException(e.getLocalizedMessage(),
369 ((SftpException) e).getStatus(), e);
370 }
371 throw e;
372 }
373 }
374
375 @Override
376 public void cd(String path) throws IOException {
377 cwd = map(() -> ftp.canonicalPath(absolute(path)));
378 if (cwd.isEmpty()) {
379 cwd += '/';
380 }
381 }
382
383 @Override
384 public String pwd() throws IOException {
385 return cwd;
386 }
387
388 @Override
389 public Collection<DirEntry> ls(String path) throws IOException {
390 return map(() -> {
391 List<DirEntry> result = new ArrayList<>();
392 try (CloseableHandle handle = ftp.openDir(absolute(path))) {
393 AtomicReference<Boolean> atEnd = new AtomicReference<>(
394 Boolean.FALSE);
395 while (!atEnd.get().booleanValue()) {
396 List<SftpClient.DirEntry> chunk = ftp.readDir(handle,
397 atEnd);
398 if (chunk == null) {
399 break;
400 }
401 for (SftpClient.DirEntry remote : chunk) {
402 result.add(new DirEntry() {
403
404 @Override
405 public String getFilename() {
406 return remote.getFilename();
407 }
408
409 @Override
410 public long getModifiedTime() {
411 return remote.getAttributes()
412 .getModifyTime().toMillis();
413 }
414
415 @Override
416 public boolean isDirectory() {
417 return remote.getAttributes().isDirectory();
418 }
419
420 });
421 }
422 }
423 }
424 return result;
425 });
426 }
427
428 @Override
429 public void rmdir(String path) throws IOException {
430 map(() -> {
431 ftp.rmdir(absolute(path));
432 return null;
433 });
434
435 }
436
437 @Override
438 public void mkdir(String path) throws IOException {
439 map(() -> {
440 ftp.mkdir(absolute(path));
441 return null;
442 });
443 }
444
445 @Override
446 public InputStream get(String path) throws IOException {
447 return map(() -> ftp.read(absolute(path)));
448 }
449
450 @Override
451 public OutputStream put(String path) throws IOException {
452 return map(() -> ftp.write(absolute(path)));
453 }
454
455 @Override
456 public void rm(String path) throws IOException {
457 map(() -> {
458 ftp.remove(absolute(path));
459 return null;
460 });
461 }
462
463 @Override
464 public void rename(String from, String to) throws IOException {
465 map(() -> {
466 String src = absolute(from);
467 String dest = absolute(to);
468 try {
469 ftp.rename(src, dest, CopyMode.Atomic, CopyMode.Overwrite);
470 } catch (UnsupportedOperationException e) {
471
472 if (!src.equals(dest)) {
473 delete(dest);
474 ftp.rename(src, dest);
475 }
476 }
477 return null;
478 });
479 }
480 }
481 }