1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.util;
20
21 import java.nio.channels.ClosedChannelException;
22 import java.util.concurrent.atomic.AtomicReference;
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 public abstract class IteratingCallback implements Callback
53 {
54
55
56
57 private enum State
58 {
59
60
61
62 IDLE,
63
64
65
66
67
68
69 PROCESSING,
70
71
72
73
74 PENDING,
75
76
77
78
79 CALLED,
80
81
82
83
84
85 SUCCEEDED,
86
87
88
89
90 FAILED,
91
92
93
94
95 CLOSED,
96
97
98
99
100 LOCKED
101 }
102
103
104
105
106
107 protected enum Action
108 {
109
110
111
112
113
114 IDLE,
115
116
117
118
119
120 SCHEDULED,
121
122
123
124
125 SUCCEEDED
126 }
127
128 private final AtomicReference<State> _state;
129 private boolean _iterate;
130
131
132 protected IteratingCallback()
133 {
134 _state = new AtomicReference<>(State.IDLE);
135 }
136
137 protected IteratingCallback(boolean needReset)
138 {
139 _state = new AtomicReference<>(needReset ? State.SUCCEEDED : State.IDLE);
140 }
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 protected abstract Action process() throws Exception;
158
159
160
161
162 @Deprecated
163 protected void completed()
164 {
165 }
166
167
168
169
170
171
172 protected void onCompleteSuccess()
173 {
174 completed();
175 }
176
177
178
179
180
181
182 protected void onCompleteFailure(Throwable x)
183 {
184 }
185
186
187
188
189
190
191
192
193 public void iterate()
194 {
195 loop: while (true)
196 {
197 State state=_state.get();
198 switch (state)
199 {
200 case PENDING:
201 case CALLED:
202
203 break loop;
204
205 case IDLE:
206 if (!_state.compareAndSet(state,State.PROCESSING))
207 continue;
208 processing();
209 break loop;
210
211 case PROCESSING:
212 if (!_state.compareAndSet(state,State.LOCKED))
213 continue;
214
215 _iterate=true;
216 _state.set(State.PROCESSING);
217 break loop;
218
219 case LOCKED:
220 Thread.yield();
221 continue loop;
222
223 case FAILED:
224 case SUCCEEDED:
225 break loop;
226
227 case CLOSED:
228 default:
229 throw new IllegalStateException("state="+state);
230 }
231 }
232 }
233
234 private void processing()
235 {
236
237
238
239
240 processing: while (true)
241 {
242
243 Action action;
244 try
245 {
246 action = process();
247 }
248 catch (Throwable x)
249 {
250 failed(x);
251 break processing;
252 }
253
254
255 acting: while(true)
256 {
257
258 State state=_state.get();
259
260 switch (state)
261 {
262 case PROCESSING:
263 {
264 switch (action)
265 {
266 case IDLE:
267 {
268
269 if (!_state.compareAndSet(state,State.LOCKED))
270 continue acting;
271
272
273 if (_iterate)
274 {
275
276 _iterate=false;
277 _state.set(State.PROCESSING);
278 continue processing;
279 }
280
281
282 _state.set(State.IDLE);
283 break processing;
284 }
285
286 case SCHEDULED:
287 {
288 if (!_state.compareAndSet(state, State.PENDING))
289 continue acting;
290
291 break processing;
292 }
293
294 case SUCCEEDED:
295 {
296 if (!_state.compareAndSet(state, State.LOCKED))
297 continue acting;
298 _iterate=false;
299 _state.set(State.SUCCEEDED);
300 onCompleteSuccess();
301 break processing;
302 }
303
304 default:
305 throw new IllegalStateException("state="+state+" action="+action);
306 }
307 }
308
309 case CALLED:
310 {
311 switch (action)
312 {
313 case SCHEDULED:
314 {
315 if (!_state.compareAndSet(state, State.PROCESSING))
316 continue acting;
317
318 continue processing;
319 }
320
321 default:
322 throw new IllegalStateException("state="+state+" action="+action);
323 }
324 }
325
326 case LOCKED:
327 Thread.yield();
328 continue acting;
329
330 case SUCCEEDED:
331 case FAILED:
332 case CLOSED:
333 break processing;
334
335 case IDLE:
336 case PENDING:
337 default:
338 throw new IllegalStateException("state="+state+" action="+action);
339 }
340 }
341 }
342 }
343
344
345
346
347
348
349 @Override
350 public void succeeded()
351 {
352 loop: while (true)
353 {
354 State state = _state.get();
355 switch (state)
356 {
357 case PROCESSING:
358 {
359 if (!_state.compareAndSet(state, State.CALLED))
360 continue loop;
361 break loop;
362 }
363 case PENDING:
364 {
365 if (!_state.compareAndSet(state, State.PROCESSING))
366 continue loop;
367 processing();
368 break loop;
369 }
370 case CLOSED:
371 case FAILED:
372 {
373
374 break loop;
375 }
376 case LOCKED:
377 {
378 Thread.yield();
379 continue loop;
380 }
381 default:
382 {
383 throw new IllegalStateException("state="+state);
384 }
385 }
386 }
387 }
388
389
390
391
392
393
394 @Override
395 public void failed(Throwable x)
396 {
397 loop: while (true)
398 {
399 State state = _state.get();
400 switch (state)
401 {
402 case SUCCEEDED:
403 case FAILED:
404 case IDLE:
405 case CLOSED:
406 case CALLED:
407 {
408
409 break loop;
410 }
411 case LOCKED:
412 {
413 Thread.yield();
414 continue loop;
415 }
416 case PENDING:
417 case PROCESSING:
418 {
419 if (!_state.compareAndSet(state, State.FAILED))
420 continue loop;
421
422 onCompleteFailure(x);
423 break loop;
424 }
425 default:
426 throw new IllegalStateException("state="+state);
427 }
428 }
429 }
430
431 public void close()
432 {
433 loop: while (true)
434 {
435 State state = _state.get();
436 switch (state)
437 {
438 case IDLE:
439 case SUCCEEDED:
440 case FAILED:
441 {
442 if (!_state.compareAndSet(state, State.CLOSED))
443 continue loop;
444 break loop;
445 }
446 case CLOSED:
447 {
448 break loop;
449 }
450 case LOCKED:
451 {
452 Thread.yield();
453 continue loop;
454 }
455 default:
456 {
457 if (!_state.compareAndSet(state, State.CLOSED))
458 continue loop;
459 onCompleteFailure(new ClosedChannelException());
460 break loop;
461 }
462 }
463 }
464 }
465
466
467
468
469
470 boolean isIdle()
471 {
472 return _state.get() == State.IDLE;
473 }
474
475 public boolean isClosed()
476 {
477 return _state.get() == State.CLOSED;
478 }
479
480
481
482
483 public boolean isFailed()
484 {
485 return _state.get() == State.FAILED;
486 }
487
488
489
490
491 public boolean isSucceeded()
492 {
493 return _state.get() == State.SUCCEEDED;
494 }
495
496
497
498
499
500
501
502
503
504 public boolean reset()
505 {
506 while (true)
507 {
508 State state=_state.get();
509 switch(state)
510 {
511 case IDLE:
512 return true;
513
514 case SUCCEEDED:
515 if (!_state.compareAndSet(state, State.LOCKED))
516 continue;
517 _iterate=false;
518 _state.set(State.IDLE);
519 return true;
520
521 case FAILED:
522 if (!_state.compareAndSet(state, State.LOCKED))
523 continue;
524 _iterate=false;
525 _state.set(State.IDLE);
526 return true;
527
528 case LOCKED:
529 Thread.yield();
530 continue;
531
532 default:
533 return false;
534 }
535 }
536 }
537
538 @Override
539 public String toString()
540 {
541 return String.format("%s[%s]", super.toString(), _state);
542 }
543 }