1
2
3
4
5
6
7
8
9
10
11 package org.eclipse.jgit.util.io;
12
13 import java.io.IOException;
14 import java.io.InterruptedIOException;
15 import java.io.OutputStream;
16 import java.util.concurrent.ArrayBlockingQueue;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Future;
21 import java.util.concurrent.RejectedExecutionException;
22 import java.util.concurrent.ThreadFactory;
23 import java.util.concurrent.ThreadPoolExecutor;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import org.eclipse.jgit.internal.JGitText;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 public class IsolatedOutputStream extends OutputStream {
45 private final OutputStream dst;
46 private final ExecutorService copier;
47 private Future<Void> pending;
48
49
50
51
52
53
54
55 public IsolatedOutputStream(OutputStream out) {
56 dst = out;
57 copier = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
58 new ArrayBlockingQueue<>(1), new NamedThreadFactory());
59 }
60
61
62 @Override
63 public void write(int ch) throws IOException {
64 write(new byte[] { (byte) ch }, 0, 1);
65 }
66
67
68 @Override
69 public void write(byte[] buf, int pos, int cnt)
70 throws IOException {
71 checkClosed();
72 execute(() -> {
73 dst.write(buf, pos, cnt);
74 return null;
75 });
76 }
77
78
79 @Override
80 public void flush() throws IOException {
81 checkClosed();
82 execute(() -> {
83 dst.flush();
84 return null;
85 });
86 }
87
88
89 @Override
90 public void close() throws IOException {
91 if (!copier.isShutdown()) {
92 try {
93 if (pending == null || tryCleanClose()) {
94 cleanClose();
95 } else {
96 dirtyClose();
97 }
98 } finally {
99 copier.shutdown();
100 }
101 }
102 }
103
104 private boolean tryCleanClose() {
105
106
107
108
109
110 try {
111 pending.get(0, TimeUnit.MILLISECONDS);
112 pending = null;
113 return true;
114 } catch (TimeoutException | InterruptedException e) {
115 return false;
116 } catch (ExecutionException e) {
117 pending = null;
118 return true;
119 }
120 }
121
122 private void cleanClose() throws IOException {
123 execute(() -> {
124 dst.close();
125 return null;
126 });
127 }
128
129 private void dirtyClose() throws IOException {
130
131
132
133
134
135 pending.cancel(true);
136
137 Future<Void> close;
138 try {
139 close = copier.submit(() -> {
140 dst.close();
141 return null;
142 });
143 } catch (RejectedExecutionException e) {
144 throw new IOException(e);
145 }
146 try {
147 close.get(200, TimeUnit.MILLISECONDS);
148 } catch (InterruptedException | TimeoutException e) {
149 close.cancel(true);
150 throw new IOException(e);
151 } catch (ExecutionException e) {
152 throw new IOException(e.getCause());
153 }
154 }
155
156 private void checkClosed() throws IOException {
157 if (copier.isShutdown()) {
158 throw new IOException(JGitText.get().closed);
159 }
160 }
161
162 private void execute(Callable<Void> task) throws IOException {
163 if (pending != null) {
164
165 checkedGet(pending);
166 }
167 try {
168 pending = copier.submit(task);
169 } catch (RejectedExecutionException e) {
170 throw new IOException(e);
171 }
172 checkedGet(pending);
173 pending = null;
174 }
175
176 private static void checkedGet(Future<Void> future) throws IOException {
177 try {
178 future.get();
179 } catch (InterruptedException e) {
180 throw interrupted(e);
181 } catch (ExecutionException e) {
182 throw new IOException(e.getCause());
183 }
184 }
185
186 private static InterruptedIOException interrupted(InterruptedException c) {
187 InterruptedIOException e = new InterruptedIOException();
188 e.initCause(c);
189 return e;
190 }
191
192 private static class NamedThreadFactory implements ThreadFactory {
193 private static final AtomicInteger cnt = new AtomicInteger();
194
195 @Override
196 public Thread newThread(Runnable r) {
197 int n = cnt.incrementAndGet();
198 String name = IsolatedOutputStream.class.getSimpleName() + '-' + n;
199 return new Thread(r, name);
200 }
201 }
202 }