1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.storage.pack;
45
46 import java.io.IOException;
47 import java.util.ArrayList;
48 import java.util.Collections;
49 import java.util.Comparator;
50 import java.util.Iterator;
51 import java.util.LinkedList;
52 import java.util.List;
53 import java.util.concurrent.Callable;
54
55 import org.eclipse.jgit.lib.ObjectReader;
56 import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
57 import org.eclipse.jgit.storage.pack.PackConfig;
58
59 final class DeltaTask implements Callable<Object> {
60 static final long MAX_METER = 9 << 20;
61
62 static final class Block {
63 private static final int MIN_TOP_PATH = 50 << 20;
64
65 final List<DeltaTask> tasks;
66 final int threads;
67 final PackConfig config;
68 final ObjectReader templateReader;
69 final DeltaCache dc;
70 final ThreadSafeProgressMonitor pm;
71 final ObjectToPack[] list;
72 final int beginIndex;
73 final int endIndex;
74
75 private long totalWeight;
76 long bytesPerUnit;
77
78 Block(int threads, PackConfig config, ObjectReader reader,
79 DeltaCache dc, ThreadSafeProgressMonitor pm,
80 ObjectToPack[] list, int begin, int end) {
81 this.tasks = new ArrayList<>(threads);
82 this.threads = threads;
83 this.config = config;
84 this.templateReader = reader;
85 this.dc = dc;
86 this.pm = pm;
87 this.list = list;
88 this.beginIndex = begin;
89 this.endIndex = end;
90 }
91
92 int cost() {
93 int d = (int) (totalWeight / bytesPerUnit);
94 if (totalWeight % bytesPerUnit != 0)
95 d++;
96 return d;
97 }
98
99 synchronized DeltaWindow stealWork(DeltaTask forThread) {
100 for (;;) {
101 DeltaTask maxTask = null;
102 Slice maxSlice = null;
103 int maxWork = 0;
104
105 for (DeltaTask task : tasks) {
106 Slice s = task.remaining();
107 if (s != null && maxWork < s.size()) {
108 maxTask = task;
109 maxSlice = s;
110 maxWork = s.size();
111 }
112 }
113 if (maxTask == null) {
114 return null;
115 }
116 if (maxTask.tryStealWork(maxSlice)) {
117 return forThread.initWindow(maxSlice);
118 }
119 }
120 }
121
122 void partitionTasks() {
123 ArrayList<WeightedPath> topPaths = computeTopPaths();
124 Iterator<WeightedPath> topPathItr = topPaths.iterator();
125 int nextTop = 0;
126 long weightPerThread = Math.max(totalWeight / threads, 1);
127 for (int i = beginIndex; i < endIndex;) {
128 DeltaTask task = new DeltaTask(this);
129 long w = 0;
130
131
132 if (topPathItr.hasNext()) {
133 WeightedPath p = topPathItr.next();
134 w += p.weight;
135 task.add(p.slice);
136 }
137
138
139 int s = i;
140 for (; w < weightPerThread && i < endIndex;) {
141 if (nextTop < topPaths.size()
142 && i == topPaths.get(nextTop).slice.beginIndex) {
143 if (s < i) {
144 task.add(new Slice(s, i));
145 }
146 s = i = topPaths.get(nextTop++).slice.endIndex;
147 } else {
148 w += getAdjustedWeight(list[i++]);
149 }
150 }
151
152
153 if (s < i) {
154 int h = list[i - 1].getPathHash();
155 while (i < endIndex) {
156 if (h == list[i].getPathHash()) {
157 i++;
158 } else {
159 break;
160 }
161 }
162 task.add(new Slice(s, i));
163 }
164 if (!task.slices.isEmpty()) {
165 tasks.add(task);
166 }
167 }
168 while (topPathItr.hasNext()) {
169 WeightedPath p = topPathItr.next();
170 DeltaTask task = new DeltaTask(this);
171 task.add(p.slice);
172 tasks.add(task);
173 }
174
175 topPaths = null;
176 }
177
178 private ArrayList<WeightedPath> computeTopPaths() {
179 ArrayList<WeightedPath> topPaths = new ArrayList<>(
180 threads);
181 int cp = beginIndex;
182 int ch = list[cp].getPathHash();
183 long cw = getAdjustedWeight(list[cp]);
184 totalWeight = cw;
185
186 for (int i = cp + 1; i < endIndex; i++) {
187 ObjectToPack o = list[i];
188 if (ch != o.getPathHash()) {
189 if (MIN_TOP_PATH < cw) {
190 if (topPaths.size() < threads) {
191 Slice s = new Slice(cp, i);
192 topPaths.add(new WeightedPath(cw, s));
193 if (topPaths.size() == threads) {
194 Collections.sort(topPaths);
195 }
196 } else if (topPaths.get(0).weight < cw) {
197 Slice s = new Slice(cp, i);
198 WeightedPath p = new WeightedPath(cw, s);
199 topPaths.set(0, p);
200 if (p.compareTo(topPaths.get(1)) > 0) {
201 Collections.sort(topPaths);
202 }
203 }
204 }
205 cp = i;
206 ch = o.getPathHash();
207 cw = 0;
208 }
209 int weight = getAdjustedWeight(o);
210 cw += weight;
211 totalWeight += weight;
212 }
213
214
215 Collections.sort(topPaths, new Comparator<WeightedPath>() {
216 @Override
217 public int compare(WeightedPath a, WeightedPath b) {
218 return a.slice.beginIndex - b.slice.beginIndex;
219 }
220 });
221
222 bytesPerUnit = 1;
223 while (MAX_METER <= (totalWeight / bytesPerUnit)) {
224 bytesPerUnit <<= 10;
225 }
226 return topPaths;
227 }
228 }
229
230 static int getAdjustedWeight(ObjectToPack o) {
231
232
233 if (o.isEdge() || o.doNotAttemptDelta()) {
234 return 0;
235 }
236 return o.getWeight();
237 }
238
239 static final class WeightedPath implements Comparable<WeightedPath> {
240 final long weight;
241 final Slice slice;
242
243 WeightedPath(long weight, Slice s) {
244 this.weight = weight;
245 this.slice = s;
246 }
247
248 @Override
249 public int compareTo(WeightedPath o) {
250 int cmp = Long.signum(weight - o.weight);
251 if (cmp != 0) {
252 return cmp;
253 }
254 return slice.beginIndex - o.slice.beginIndex;
255 }
256 }
257
258 static final class Slice {
259 final int beginIndex;
260 final int endIndex;
261
262 Slice(int b, int e) {
263 beginIndex = b;
264 endIndex = e;
265 }
266
267 final int size() {
268 return endIndex - beginIndex;
269 }
270 }
271
272 private final Block block;
273 final LinkedList<Slice> slices;
274
275 private ObjectReader or;
276 private DeltaWindow dw;
277
278 DeltaTask(Block b) {
279 this.block = b;
280 this.slices = new LinkedList<>();
281 }
282
283 void add(Slice s) {
284 if (!slices.isEmpty()) {
285 Slice last = slices.getLast();
286 if (last.endIndex == s.beginIndex) {
287 slices.removeLast();
288 slices.add(new Slice(last.beginIndex, s.endIndex));
289 return;
290 }
291 }
292 slices.add(s);
293 }
294
295
296 @Override
297 public Object call() throws Exception {
298 or = block.templateReader.newReader();
299 try {
300 DeltaWindow w;
301 for (;;) {
302 synchronized (this) {
303 if (slices.isEmpty()) {
304 break;
305 }
306 w = initWindow(slices.removeFirst());
307 }
308 runWindow(w);
309 }
310 while ((w = block.stealWork(this)) != null) {
311 runWindow(w);
312 }
313 } finally {
314 block.pm.endWorker();
315 or.close();
316 or = null;
317 }
318 return null;
319 }
320
321 DeltaWindow initWindow(Slice s) {
322 DeltaWindow w = new DeltaWindow(block.config, block.dc,
323 or, block.pm, block.bytesPerUnit,
324 block.list, s.beginIndex, s.endIndex);
325 synchronized (this) {
326 dw = w;
327 }
328 return w;
329 }
330
331 private void runWindow(DeltaWindow w) throws IOException {
332 try {
333 w.search();
334 } finally {
335 synchronized (this) {
336 dw = null;
337 }
338 }
339 }
340
341 synchronized Slice remaining() {
342 if (!slices.isEmpty()) {
343 return slices.getLast();
344 }
345 DeltaWindow d = dw;
346 return d != null ? d.remaining() : null;
347 }
348
349 synchronized boolean tryStealWork(Slice s) {
350 if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) {
351 slices.removeLast();
352 return true;
353 }
354 DeltaWindow d = dw;
355 return d != null ? d.tryStealWork(s) : false;
356 }
357 }