1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.util.io;
45
46 import java.io.IOException;
47 import java.io.InterruptedIOException;
48 import java.io.OutputStream;
49 import java.util.concurrent.ArrayBlockingQueue;
50 import java.util.concurrent.Callable;
51 import java.util.concurrent.ExecutionException;
52 import java.util.concurrent.ExecutorService;
53 import java.util.concurrent.Future;
54 import java.util.concurrent.RejectedExecutionException;
55 import java.util.concurrent.ThreadFactory;
56 import java.util.concurrent.ThreadPoolExecutor;
57 import java.util.concurrent.TimeUnit;
58 import java.util.concurrent.TimeoutException;
59 import java.util.concurrent.atomic.AtomicInteger;
60
61 import org.eclipse.jgit.internal.JGitText;
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 public class IsolatedOutputStream extends OutputStream {
78 private final OutputStream dst;
79 private final ExecutorService copier;
80 private Future<Void> pending;
81
82
83
84
85
86
87
88 public IsolatedOutputStream(OutputStream out) {
89 dst = out;
90 copier = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
91 new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory());
92 }
93
94 @Override
95 public void write(int ch) throws IOException {
96 write(new byte[] { (byte) ch }, 0, 1);
97 }
98
99 @Override
100 public void write(final byte[] buf, final int pos, final int cnt)
101 throws IOException {
102 checkClosed();
103 execute(new Callable<Void>() {
104 @Override
105 public Void call() throws IOException {
106 dst.write(buf, pos, cnt);
107 return null;
108 }
109 });
110 }
111
112 @Override
113 public void flush() throws IOException {
114 checkClosed();
115 execute(new Callable<Void>() {
116 @Override
117 public Void call() throws IOException {
118 dst.flush();
119 return null;
120 }
121 });
122 }
123
124 @Override
125 public void close() throws IOException {
126 if (!copier.isShutdown()) {
127 try {
128 if (pending == null || tryCleanClose()) {
129 cleanClose();
130 } else {
131 dirtyClose();
132 }
133 } finally {
134 copier.shutdown();
135 }
136 }
137 }
138
139 private boolean tryCleanClose() {
140
141
142
143
144
145 try {
146 pending.get(0, TimeUnit.MILLISECONDS);
147 pending = null;
148 return true;
149 } catch (TimeoutException | InterruptedException e) {
150 return false;
151 } catch (ExecutionException e) {
152 pending = null;
153 return true;
154 }
155 }
156
157 private void cleanClose() throws IOException {
158 execute(new Callable<Void>() {
159 @Override
160 public Void call() throws IOException {
161 dst.close();
162 return null;
163 }
164 });
165 }
166
167 private void dirtyClose() throws IOException {
168
169
170
171
172
173 pending.cancel(true);
174
175 Future<Void> close;
176 try {
177 close = copier.submit(new Callable<Void>() {
178 @Override
179 public Void call() throws IOException {
180 dst.close();
181 return null;
182 }
183 });
184 } catch (RejectedExecutionException e) {
185 throw new IOException(e);
186 }
187 try {
188 close.get(200, TimeUnit.MILLISECONDS);
189 } catch (InterruptedException | TimeoutException e) {
190 close.cancel(true);
191 throw new IOException(e);
192 } catch (ExecutionException e) {
193 throw new IOException(e.getCause());
194 }
195 }
196
197 private void checkClosed() throws IOException {
198 if (copier.isShutdown()) {
199 throw new IOException(JGitText.get().closed);
200 }
201 }
202
203 private void execute(Callable<Void> task) throws IOException {
204 if (pending != null) {
205
206 checkedGet(pending);
207 }
208 try {
209 pending = copier.submit(task);
210 } catch (RejectedExecutionException e) {
211 throw new IOException(e);
212 }
213 checkedGet(pending);
214 pending = null;
215 }
216
217 private static void checkedGet(Future<Void> future) throws IOException {
218 try {
219 future.get();
220 } catch (InterruptedException e) {
221 throw interrupted(e);
222 } catch (ExecutionException e) {
223 throw new IOException(e.getCause());
224 }
225 }
226
227 private static InterruptedIOException interrupted(InterruptedException c) {
228 InterruptedIOException e = new InterruptedIOException();
229 e.initCause(c);
230 return e;
231 }
232
233 private static class NamedThreadFactory implements ThreadFactory {
234 private static final AtomicInteger cnt = new AtomicInteger();
235
236 @Override
237 public Thread newThread(Runnable r) {
238 int n = cnt.incrementAndGet();
239 String name = IsolatedOutputStream.class.getSimpleName() + '-' + n;
240 return new Thread(r, name);
241 }
242 }
243 }