1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.storage.pack;
45
46 import java.io.IOException;
47 import java.util.ArrayList;
48 import java.util.Collections;
49 import java.util.Comparator;
50 import java.util.Iterator;
51 import java.util.LinkedList;
52 import java.util.List;
53 import java.util.concurrent.Callable;
54
55 import org.eclipse.jgit.lib.ObjectReader;
56 import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
57 import org.eclipse.jgit.storage.pack.PackConfig;
58
59 final class DeltaTask implements Callable<Object> {
60 static final long MAX_METER = 9 << 20;
61
62 static final class Block {
63 private static final int MIN_TOP_PATH = 50 << 20;
64
65 final List<DeltaTask> tasks;
66 final int threads;
67 final PackConfig config;
68 final ObjectReader templateReader;
69 final DeltaCache dc;
70 final ThreadSafeProgressMonitor pm;
71 final ObjectToPack[] list;
72 final int beginIndex;
73 final int endIndex;
74
75 private long totalWeight;
76 long bytesPerUnit;
77
78 Block(int threads, PackConfig config, ObjectReader reader,
79 DeltaCache dc, ThreadSafeProgressMonitor pm,
80 ObjectToPack[] list, int begin, int end) {
81 this.tasks = new ArrayList<DeltaTask>(threads);
82 this.threads = threads;
83 this.config = config;
84 this.templateReader = reader;
85 this.dc = dc;
86 this.pm = pm;
87 this.list = list;
88 this.beginIndex = begin;
89 this.endIndex = end;
90 }
91
92 int cost() {
93 int d = (int) (totalWeight / bytesPerUnit);
94 if (totalWeight % bytesPerUnit != 0)
95 d++;
96 return d;
97 }
98
99 synchronized DeltaWindow stealWork(DeltaTask forThread) {
100 for (;;) {
101 DeltaTask maxTask = null;
102 Slice maxSlice = null;
103 int maxWork = 0;
104
105 for (DeltaTask task : tasks) {
106 Slice s = task.remaining();
107 if (s != null && maxWork < s.size()) {
108 maxTask = task;
109 maxSlice = s;
110 maxWork = s.size();
111 }
112 }
113 if (maxTask == null) {
114 return null;
115 }
116 if (maxTask.tryStealWork(maxSlice)) {
117 return forThread.initWindow(maxSlice);
118 }
119 }
120 }
121
122 void partitionTasks() {
123 ArrayList<WeightedPath> topPaths = computeTopPaths();
124 Iterator<WeightedPath> topPathItr = topPaths.iterator();
125 int nextTop = 0;
126 long weightPerThread = Math.max(totalWeight / threads, 1);
127 for (int i = beginIndex; i < endIndex;) {
128 DeltaTask task = new DeltaTask(this);
129 long w = 0;
130
131
132 if (topPathItr.hasNext()) {
133 WeightedPath p = topPathItr.next();
134 w += p.weight;
135 task.add(p.slice);
136 }
137
138
139 int s = i;
140 for (; w < weightPerThread && i < endIndex;) {
141 if (nextTop < topPaths.size()
142 && i == topPaths.get(nextTop).slice.beginIndex) {
143 if (s < i) {
144 task.add(new Slice(s, i));
145 }
146 s = i = topPaths.get(nextTop++).slice.endIndex;
147 } else {
148 w += getAdjustedWeight(list[i++]);
149 }
150 }
151
152
153 if (s < i) {
154 int h = list[i - 1].getPathHash();
155 while (i < endIndex) {
156 if (h == list[i].getPathHash()) {
157 i++;
158 } else {
159 break;
160 }
161 }
162 task.add(new Slice(s, i));
163 }
164 if (!task.slices.isEmpty()) {
165 tasks.add(task);
166 }
167 }
168 while (topPathItr.hasNext()) {
169 WeightedPath p = topPathItr.next();
170 DeltaTask task = new DeltaTask(this);
171 task.add(p.slice);
172 tasks.add(task);
173 }
174
175 topPaths = null;
176 }
177
178 private ArrayList<WeightedPath> computeTopPaths() {
179 ArrayList<WeightedPath> topPaths = new ArrayList<WeightedPath>(
180 threads);
181 int cp = beginIndex;
182 int ch = list[cp].getPathHash();
183 long cw = getAdjustedWeight(list[cp]);
184 totalWeight = cw;
185
186 for (int i = cp + 1; i < endIndex; i++) {
187 ObjectToPack o = list[i];
188 if (ch != o.getPathHash()) {
189 if (MIN_TOP_PATH < cw) {
190 if (topPaths.size() < threads) {
191 Slice s = new Slice(cp, i);
192 topPaths.add(new WeightedPath(cw, s));
193 if (topPaths.size() == threads) {
194 Collections.sort(topPaths);
195 }
196 } else if (topPaths.get(0).weight < cw) {
197 Slice s = new Slice(cp, i);
198 WeightedPath p = new WeightedPath(cw, s);
199 topPaths.set(0, p);
200 if (p.compareTo(topPaths.get(1)) > 0) {
201 Collections.sort(topPaths);
202 }
203 }
204 }
205 cp = i;
206 ch = o.getPathHash();
207 cw = 0;
208 }
209 int weight = getAdjustedWeight(o);
210 cw += weight;
211 totalWeight += weight;
212 }
213
214
215 Collections.sort(topPaths, new Comparator<WeightedPath>() {
216 public int compare(WeightedPath a, WeightedPath b) {
217 return a.slice.beginIndex - b.slice.beginIndex;
218 }
219 });
220
221 bytesPerUnit = 1;
222 while (MAX_METER <= (totalWeight / bytesPerUnit)) {
223 bytesPerUnit <<= 10;
224 }
225 return topPaths;
226 }
227 }
228
229 static int getAdjustedWeight(ObjectToPack o) {
230
231
232 if (o.isEdge() || o.doNotAttemptDelta()) {
233 return 0;
234 }
235 return o.getWeight();
236 }
237
238 static final class WeightedPath implements Comparable<WeightedPath> {
239 final long weight;
240 final Slice slice;
241
242 WeightedPath(long weight, Slice s) {
243 this.weight = weight;
244 this.slice = s;
245 }
246
247 public int compareTo(WeightedPath o) {
248 int cmp = Long.signum(weight - o.weight);
249 if (cmp != 0) {
250 return cmp;
251 }
252 return slice.beginIndex - o.slice.beginIndex;
253 }
254 }
255
256 static final class Slice {
257 final int beginIndex;
258 final int endIndex;
259
260 Slice(int b, int e) {
261 beginIndex = b;
262 endIndex = e;
263 }
264
265 final int size() {
266 return endIndex - beginIndex;
267 }
268 }
269
270 private final Block block;
271 final LinkedList<Slice> slices;
272
273 private ObjectReader or;
274 private DeltaWindow dw;
275
276 DeltaTask(Block b) {
277 this.block = b;
278 this.slices = new LinkedList<Slice>();
279 }
280
281 void add(Slice s) {
282 if (!slices.isEmpty()) {
283 Slice last = slices.getLast();
284 if (last.endIndex == s.beginIndex) {
285 slices.removeLast();
286 slices.add(new Slice(last.beginIndex, s.endIndex));
287 return;
288 }
289 }
290 slices.add(s);
291 }
292
293 public Object call() throws Exception {
294 or = block.templateReader.newReader();
295 try {
296 DeltaWindow w;
297 for (;;) {
298 synchronized (this) {
299 if (slices.isEmpty()) {
300 break;
301 }
302 w = initWindow(slices.removeFirst());
303 }
304 runWindow(w);
305 }
306 while ((w = block.stealWork(this)) != null) {
307 runWindow(w);
308 }
309 } finally {
310 block.pm.endWorker();
311 or.close();
312 or = null;
313 }
314 return null;
315 }
316
317 DeltaWindow initWindow(Slice s) {
318 DeltaWindow w = new DeltaWindow(block.config, block.dc,
319 or, block.pm, block.bytesPerUnit,
320 block.list, s.beginIndex, s.endIndex);
321 synchronized (this) {
322 dw = w;
323 }
324 return w;
325 }
326
327 private void runWindow(DeltaWindow w) throws IOException {
328 try {
329 w.search();
330 } finally {
331 synchronized (this) {
332 dw = null;
333 }
334 }
335 }
336
337 synchronized Slice remaining() {
338 if (!slices.isEmpty()) {
339 return slices.getLast();
340 }
341 DeltaWindow d = dw;
342 return d != null ? d.remaining() : null;
343 }
344
345 synchronized boolean tryStealWork(Slice s) {
346 if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) {
347 slices.removeLast();
348 return true;
349 }
350 DeltaWindow d = dw;
351 return d != null ? d.tryStealWork(s) : false;
352 }
353 }