1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.WritePendingException;
25 import java.util.Arrays;
26 import java.util.EnumMap;
27 import java.util.EnumSet;
28 import java.util.Set;
29 import java.util.concurrent.atomic.AtomicReference;
30
31 import org.eclipse.jetty.util.BufferUtil;
32 import org.eclipse.jetty.util.Callback;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35
36
37
38
39
40
41
42
43
44
45 abstract public class WriteFlusher
46 {
47 private static final Logger LOG = Log.getLogger(WriteFlusher.class);
48 private static final boolean DEBUG = LOG.isDebugEnabled();
49 private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[0];
50 private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
51 private static final State __IDLE = new IdleState();
52 private static final State __WRITING = new WritingState();
53 private static final State __COMPLETING = new CompletingState();
54 private final EndPoint _endPoint;
55 private final AtomicReference<State> _state = new AtomicReference<>();
56
57 static
58 {
59
60 __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING));
61 __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
62 __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE));
63 __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
64 __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
65 }
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 protected WriteFlusher(EndPoint endPoint)
90 {
91 _state.set(__IDLE);
92 _endPoint = endPoint;
93 }
94
95 private enum StateType
96 {
97 IDLE,
98 WRITING,
99 PENDING,
100 COMPLETING,
101 FAILED
102 }
103
104
105
106
107
108
109
110
111 private boolean updateState(State previous,State next)
112 {
113 if (!isTransitionAllowed(previous,next))
114 throw new IllegalStateException();
115
116 boolean updated = _state.compareAndSet(previous, next);
117 if (DEBUG)
118 LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next);
119 return updated;
120 }
121
122 private void fail(PendingState pending)
123 {
124 State current = _state.get();
125 if (current.getType()==StateType.FAILED)
126 {
127 FailedState failed=(FailedState)current;
128 if (updateState(failed,__IDLE))
129 {
130 pending.fail(failed.getCause());
131 return;
132 }
133 }
134 throw new IllegalStateException();
135 }
136
137 private void ignoreFail()
138 {
139 State current = _state.get();
140 while (current.getType()==StateType.FAILED)
141 {
142 if (updateState(current,__IDLE))
143 return;
144 current = _state.get();
145 }
146 }
147
148 private boolean isTransitionAllowed(State currentState, State newState)
149 {
150 Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
151 if (!allowedNewStateTypes.contains(newState.getType()))
152 {
153 LOG.warn("{}: {} -> {} not allowed", this, currentState, newState);
154 return false;
155 }
156 return true;
157 }
158
159
160
161
162 private static class State
163 {
164 private final StateType _type;
165
166 private State(StateType stateType)
167 {
168 _type = stateType;
169 }
170
171 public StateType getType()
172 {
173 return _type;
174 }
175
176 @Override
177 public String toString()
178 {
179 return String.format("%s", _type);
180 }
181 }
182
183
184
185
186 private static class IdleState extends State
187 {
188 private IdleState()
189 {
190 super(StateType.IDLE);
191 }
192 }
193
194
195
196
197 private static class WritingState extends State
198 {
199 private WritingState()
200 {
201 super(StateType.WRITING);
202 }
203 }
204
205
206
207
208 private static class FailedState extends State
209 {
210 private final Throwable _cause;
211 private FailedState(Throwable cause)
212 {
213 super(StateType.FAILED);
214 _cause=cause;
215 }
216
217 public Throwable getCause()
218 {
219 return _cause;
220 }
221 }
222
223
224
225
226
227
228 private static class CompletingState extends State
229 {
230 private CompletingState()
231 {
232 super(StateType.COMPLETING);
233 }
234 }
235
236
237
238
239
240 private class PendingState extends State
241 {
242 private final Callback _callback;
243 private final ByteBuffer[] _buffers;
244
245 private PendingState(ByteBuffer[] buffers, Callback callback)
246 {
247 super(StateType.PENDING);
248 _buffers = compact(buffers);
249 _callback = callback;
250 }
251
252 public ByteBuffer[] getBuffers()
253 {
254 return _buffers;
255 }
256
257 protected boolean fail(Throwable cause)
258 {
259 if (_callback!=null)
260 {
261 _callback.failed(cause);
262 return true;
263 }
264 return false;
265 }
266
267 protected void complete()
268 {
269 if (_callback!=null)
270 _callback.succeeded();
271 }
272
273
274
275
276
277
278
279
280
281
282
283
284 private ByteBuffer[] compact(ByteBuffer[] buffers)
285 {
286 int length = buffers.length;
287
288
289 if (length < 2)
290 return buffers;
291
292
293 int consumed = 0;
294 while (consumed < length && BufferUtil.isEmpty(buffers[consumed]))
295 ++consumed;
296
297
298 if (consumed == 0)
299 return buffers;
300
301
302 if (consumed == length)
303 return EMPTY_BUFFERS;
304
305 return Arrays.copyOfRange(buffers,consumed,length);
306 }
307 }
308
309
310
311
312
313 abstract protected void onIncompleteFlushed();
314
315
316
317
318
319
320
321
322
323
324
325
326
327 public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
328 {
329 if (DEBUG)
330 LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
331
332 if (!updateState(__IDLE,__WRITING))
333 throw new WritePendingException();
334
335 try
336 {
337 buffers=flush(buffers);
338
339
340 if (buffers!=null)
341 {
342 if (DEBUG)
343 LOG.debug("flushed incomplete");
344 PendingState pending=new PendingState(buffers, callback);
345 if (updateState(__WRITING,pending))
346 onIncompleteFlushed();
347 else
348 fail(pending);
349 return;
350 }
351
352
353 if (!updateState(__WRITING,__IDLE))
354 ignoreFail();
355 if (callback!=null)
356 callback.succeeded();
357 }
358 catch (IOException e)
359 {
360 if (DEBUG)
361 LOG.debug("write exception", e);
362 if (updateState(__WRITING,__IDLE))
363 {
364 if (callback!=null)
365 callback.failed(e);
366 }
367 else
368 fail(new PendingState(buffers, callback));
369 }
370 }
371
372
373
374
375
376
377
378
379
380
381 public void completeWrite()
382 {
383 if (DEBUG)
384 LOG.debug("completeWrite: {}", this);
385
386 State previous = _state.get();
387
388 if (previous.getType()!=StateType.PENDING)
389 return;
390
391 PendingState pending = (PendingState)previous;
392 if (!updateState(pending,__COMPLETING))
393 return;
394
395 try
396 {
397 ByteBuffer[] buffers = pending.getBuffers();
398
399 buffers=flush(buffers);
400
401
402 if (buffers!=null)
403 {
404 if (DEBUG)
405 LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers));
406 if (buffers!=pending.getBuffers())
407 pending=new PendingState(buffers, pending._callback);
408 if (updateState(__COMPLETING,pending))
409 onIncompleteFlushed();
410 else
411 fail(pending);
412 return;
413 }
414
415
416 if (!updateState(__COMPLETING,__IDLE))
417 ignoreFail();
418 pending.complete();
419 }
420 catch (IOException e)
421 {
422 if (DEBUG)
423 LOG.debug("completeWrite exception", e);
424 if(updateState(__COMPLETING,__IDLE))
425 pending.fail(e);
426 else
427 fail(pending);
428 }
429 }
430
431
432
433
434
435
436
437 protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
438 {
439
440
441 if (_endPoint.flush(buffers))
442 return null;
443
444
445
446 boolean progress=true;
447 while(true)
448 {
449
450 int not_empty=0;
451 while(not_empty<buffers.length && BufferUtil.isEmpty(buffers[not_empty]))
452 not_empty++;
453 if (not_empty==buffers.length)
454 return null;
455 if (not_empty>0)
456 buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
457
458 if (!progress)
459 break;
460
461
462 int r=buffers[0].remaining();
463 if (_endPoint.flush(buffers))
464 return null;
465 progress=r!=buffers[0].remaining();
466 }
467
468 return buffers;
469 }
470
471
472
473
474
475
476 public boolean onFail(Throwable cause)
477 {
478
479 while(true)
480 {
481 State current=_state.get();
482 switch(current.getType())
483 {
484 case IDLE:
485 case FAILED:
486 if (DEBUG)
487 LOG.debug("ignored: {} {}", this, cause);
488 return false;
489
490 case PENDING:
491 if (DEBUG)
492 LOG.debug("failed: {} {}", this, cause);
493
494 PendingState pending = (PendingState)current;
495 if (updateState(pending,__IDLE))
496 return pending.fail(cause);
497 break;
498
499 default:
500 if (DEBUG)
501 LOG.debug("failed: {} {}", this, cause);
502
503 if (updateState(current,new FailedState(cause)))
504 return false;
505 break;
506 }
507 }
508 }
509
510 public void onClose()
511 {
512 if (_state.get()==__IDLE)
513 return;
514 onFail(new ClosedChannelException());
515 }
516
517 boolean isIdle()
518 {
519 return _state.get().getType() == StateType.IDLE;
520 }
521
522 public boolean isInProgress()
523 {
524 switch(_state.get().getType())
525 {
526 case WRITING:
527 case PENDING:
528 case COMPLETING:
529 return true;
530 default:
531 return false;
532 }
533 }
534
535 @Override
536 public String toString()
537 {
538 return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
539 }
540 }