View Javadoc
1   /*
2    * Copyright (C) 2008, Robin Rosenberg <robin.rosenberg@dewire.com>
3    * Copyright (C) 2008, 2022 Shawn O. Pearce <spearce@spearce.org> and others
4    *
5    * This program and the accompanying materials are made available under the
6    * terms of the Eclipse Distribution License v. 1.0 which is available at
7    * https://www.eclipse.org/org/documents/edl-v10.php.
8    *
9    * SPDX-License-Identifier: BSD-3-Clause
10   */
11  
12  package org.eclipse.jgit.transport;
13  
14  import static java.nio.charset.StandardCharsets.UTF_8;
15  import static org.eclipse.jgit.transport.SideBandOutputStream.HDR_SIZE;
16  
17  import java.io.IOException;
18  import java.io.InputStream;
19  import java.io.OutputStream;
20  import java.io.Writer;
21  import java.text.MessageFormat;
22  import java.util.regex.Matcher;
23  import java.util.regex.Pattern;
24  
25  import org.eclipse.jgit.errors.PackProtocolException;
26  import org.eclipse.jgit.errors.TransportException;
27  import org.eclipse.jgit.internal.JGitText;
28  import org.eclipse.jgit.lib.ProgressMonitor;
29  import org.eclipse.jgit.util.IO;
30  import org.eclipse.jgit.util.RawParseUtils;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  /**
35   * Unmultiplexes the data portion of a side-band channel.
36   * <p>
37   * Reading from this input stream obtains data from channel 1, which is
38   * typically the bulk data stream.
39   * <p>
40   * Channel 2 is transparently unpacked and "scraped" to update a progress
41   * monitor. The scraping is performed behind the scenes as part of any of the
42   * read methods offered by this stream.
43   * <p>
44   * Channel 3 results in an exception being thrown, as the remote side has issued
45   * an unrecoverable error.
46   *
47   * @see SideBandOutputStream
48   * @since 4.11
49   */
50  public class SideBandInputStream extends InputStream {
51  
52  	private static final Logger LOG = LoggerFactory
53  			.getLogger(SideBandInputStream.class);
54  
55  	static final int CH_DATA = 1;
56  	static final int CH_PROGRESS = 2;
57  	static final int CH_ERROR = 3;
58  
59  	private static Pattern P_UNBOUNDED = Pattern
60  			.compile("^([\\w ]+): +(\\d+)(?:, done\\.)? *[\r\n]$"); //$NON-NLS-1$
61  
62  	private static Pattern P_BOUNDED = Pattern
63  			.compile("^([\\w ]+): +\\d+% +\\( *(\\d+)/ *(\\d+)\\)(?:, done\\.)? *[\r\n]$"); //$NON-NLS-1$
64  
65  	private final InputStream rawIn;
66  
67  	private final PacketLineIn pckIn;
68  
69  	private final ProgressMonitor monitor;
70  
71  	private final Writer messages;
72  
73  	private final OutputStream out;
74  
75  	private String progressBuffer = ""; //$NON-NLS-1$
76  
77  	private String currentTask;
78  
79  	private int lastCnt;
80  
81  	private boolean eof;
82  
83  	private int channel;
84  
85  	private int available;
86  
87  	SideBandInputStream(final InputStream in, final ProgressMonitor progress,
88  			final Writer messageStream, OutputStream outputStream) {
89  		rawIn = in;
90  		pckIn = new PacketLineIn(rawIn);
91  		monitor = progress;
92  		messages = messageStream;
93  		currentTask = ""; //$NON-NLS-1$
94  		out = outputStream;
95  	}
96  
97  	/** {@inheritDoc} */
98  	@Override
99  	public int read() throws IOException {
100 		needDataPacket();
101 		if (eof)
102 			return -1;
103 		available--;
104 		return rawIn.read();
105 	}
106 
107 	/** {@inheritDoc} */
108 	@Override
109 	public int read(byte[] b, int off, int len) throws IOException {
110 		int r = 0;
111 		while (len > 0) {
112 			needDataPacket();
113 			if (eof)
114 				break;
115 			final int n = rawIn.read(b, off, Math.min(len, available));
116 			if (n < 0)
117 				break;
118 			r += n;
119 			off += n;
120 			len -= n;
121 			available -= n;
122 		}
123 		return eof && r == 0 ? -1 : r;
124 	}
125 
126 	private void needDataPacket() throws IOException {
127 		if (eof || (channel == CH_DATA && available > 0))
128 			return;
129 		for (;;) {
130 			available = pckIn.readLength();
131 			if (available == 0) {
132 				eof = true;
133 				return;
134 			}
135 
136 			channel = rawIn.read() & 0xff;
137 			available -= HDR_SIZE; // length header plus channel indicator
138 			if (available == 0)
139 				continue;
140 
141 			switch (channel) {
142 			case CH_DATA:
143 				return;
144 			case CH_PROGRESS:
145 				progress(readString(available));
146 				continue;
147 			case CH_ERROR:
148 				eof = true;
149 				throw new TransportException(remote(readString(available)));
150 			default:
151 				throw new PackProtocolException(
152 						MessageFormat.format(JGitText.get().invalidChannel,
153 								Integer.valueOf(channel)));
154 			}
155 		}
156 	}
157 
158 	private void progress(String pkt) throws IOException {
159 		pkt = progressBuffer + pkt;
160 		for (;;) {
161 			final int lf = pkt.indexOf('\n');
162 			final int cr = pkt.indexOf('\r');
163 			final int s;
164 			if (0 <= lf && 0 <= cr)
165 				s = Math.min(lf, cr);
166 			else if (0 <= lf)
167 				s = lf;
168 			else if (0 <= cr)
169 				s = cr;
170 			else
171 				break;
172 
173 			doProgressLine(pkt.substring(0, s + 1));
174 			pkt = pkt.substring(s + 1);
175 		}
176 		progressBuffer = pkt;
177 	}
178 
179 	private void doProgressLine(String msg) throws IOException {
180 		Matcher matcher;
181 
182 		matcher = P_BOUNDED.matcher(msg);
183 		if (matcher.matches()) {
184 			final String taskname = matcher.group(1);
185 			if (!currentTask.equals(taskname)) {
186 				currentTask = taskname;
187 				lastCnt = 0;
188 				beginTask(Integer.parseInt(matcher.group(3)));
189 			}
190 			final int cnt = Integer.parseInt(matcher.group(2));
191 			monitor.update(cnt - lastCnt);
192 			lastCnt = cnt;
193 			return;
194 		}
195 
196 		matcher = P_UNBOUNDED.matcher(msg);
197 		if (matcher.matches()) {
198 			final String taskname = matcher.group(1);
199 			if (!currentTask.equals(taskname)) {
200 				currentTask = taskname;
201 				lastCnt = 0;
202 				beginTask(ProgressMonitor.UNKNOWN);
203 			}
204 			final int cnt = Integer.parseInt(matcher.group(2));
205 			monitor.update(cnt - lastCnt);
206 			lastCnt = cnt;
207 			return;
208 		}
209 
210 		messages.write(msg);
211 		if (out != null)
212 			out.write(msg.getBytes(UTF_8));
213 	}
214 
215 	private void beginTask(int totalWorkUnits) {
216 		monitor.beginTask(remote(currentTask), totalWorkUnits);
217 	}
218 
219 	/**
220 	 * Forces any buffered progress messages to be written.
221 	 */
222 	void drainMessages() {
223 		if (!progressBuffer.isEmpty()) {
224 			try {
225 				progress("\n"); //$NON-NLS-1$
226 			} catch (IOException e) {
227 				// Just log; otherwise this IOException might hide a real
228 				// TransportException
229 				LOG.error(e.getMessage(), e);
230 			}
231 		}
232 	}
233 
234 	private static String remote(String msg) {
235 		String prefix = JGitText.get().prefixRemote;
236 		StringBuilder r = new StringBuilder(prefix.length() + msg.length() + 1);
237 		r.append(prefix);
238 		if (prefix.length() > 0 && prefix.charAt(prefix.length() - 1) != ' ') {
239 			r.append(' ');
240 		}
241 		r.append(msg);
242 		return r.toString();
243 	}
244 
245 	private String readString(int len) throws IOException {
246 		final byte[] raw = new byte[len];
247 		IO.readFully(rawIn, raw, 0, len);
248 		return RawParseUtils.decode(UTF_8, raw, 0, len);
249 	}
250 }