1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.WritePendingException;
25 import java.util.EnumMap;
26 import java.util.EnumSet;
27 import java.util.Set;
28 import java.util.concurrent.atomic.AtomicReference;
29
30 import org.eclipse.jetty.util.BufferUtil;
31 import org.eclipse.jetty.util.Callback;
32 import org.eclipse.jetty.util.log.Log;
33 import org.eclipse.jetty.util.log.Logger;
34
35
36
37
38
39
40
41
42
43
44 abstract public class WriteFlusher
45 {
46 private static final Logger LOG = Log.getLogger(WriteFlusher.class);
47 private static final boolean DEBUG = LOG.isDebugEnabled();
48 private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[0];
49 private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
50 private static final State __IDLE = new IdleState();
51 private static final State __WRITING = new WritingState();
52 private static final State __COMPLETING = new CompletingState();
53 private final EndPoint _endPoint;
54 private final AtomicReference<State> _state = new AtomicReference<>();
55
56 static
57 {
58
59 __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING));
60 __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
61 __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE));
62 __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
63 __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
64 }
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 protected WriteFlusher(EndPoint endPoint)
89 {
90 _state.set(__IDLE);
91 _endPoint = endPoint;
92 }
93
94 private enum StateType
95 {
96 IDLE,
97 WRITING,
98 PENDING,
99 COMPLETING,
100 FAILED
101 }
102
103
104
105
106
107
108
109
110 private boolean updateState(State previous,State next)
111 {
112 if (!isTransitionAllowed(previous,next))
113 throw new IllegalStateException();
114
115 boolean updated = _state.compareAndSet(previous, next);
116 if (DEBUG)
117 LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next);
118 return updated;
119 }
120
121 private void fail(PendingState pending)
122 {
123 State current = _state.get();
124 if (current.getType()==StateType.FAILED)
125 {
126 FailedState failed=(FailedState)current;
127 if (updateState(failed,__IDLE))
128 {
129 pending.fail(failed.getCause());
130 return;
131 }
132 }
133 throw new IllegalStateException();
134 }
135
136 private void ignoreFail()
137 {
138 State current = _state.get();
139 while (current.getType()==StateType.FAILED)
140 {
141 if (updateState(current,__IDLE))
142 return;
143 current = _state.get();
144 }
145 }
146
147 private boolean isTransitionAllowed(State currentState, State newState)
148 {
149 Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
150 if (!allowedNewStateTypes.contains(newState.getType()))
151 {
152 LOG.warn("{}: {} -> {} not allowed", this, currentState, newState);
153 return false;
154 }
155 return true;
156 }
157
158
159
160
161 private static class State
162 {
163 private final StateType _type;
164
165 private State(StateType stateType)
166 {
167 _type = stateType;
168 }
169
170 public StateType getType()
171 {
172 return _type;
173 }
174
175 @Override
176 public String toString()
177 {
178 return String.format("%s", _type);
179 }
180 }
181
182
183
184
185 private static class IdleState extends State
186 {
187 private IdleState()
188 {
189 super(StateType.IDLE);
190 }
191 }
192
193
194
195
196 private static class WritingState extends State
197 {
198 private WritingState()
199 {
200 super(StateType.WRITING);
201 }
202 }
203
204
205
206
207 private static class FailedState extends State
208 {
209 private final Throwable _cause;
210 private FailedState(Throwable cause)
211 {
212 super(StateType.FAILED);
213 _cause=cause;
214 }
215
216 public Throwable getCause()
217 {
218 return _cause;
219 }
220 }
221
222
223
224
225
226
227 private static class CompletingState extends State
228 {
229 private CompletingState()
230 {
231 super(StateType.COMPLETING);
232 }
233 }
234
235
236
237
238
239 private class PendingState extends State
240 {
241 private final Callback _callback;
242 private final ByteBuffer[] _buffers;
243
244 private PendingState(ByteBuffer[] buffers, Callback callback)
245 {
246 super(StateType.PENDING);
247 _buffers = compact(buffers);
248 _callback = callback;
249 }
250
251 public ByteBuffer[] getBuffers()
252 {
253 return _buffers;
254 }
255
256 protected void fail(Throwable cause)
257 {
258 if (_callback!=null)
259 _callback.failed(cause);
260 }
261
262 protected void complete()
263 {
264 if (_callback!=null)
265 _callback.succeeded();
266 }
267
268
269
270
271
272
273
274
275
276
277
278
279 private ByteBuffer[] compact(ByteBuffer[] buffers)
280 {
281 int length = buffers.length;
282
283
284 if (length < 2)
285 return buffers;
286
287
288 int consumed = 0;
289 while (consumed < length && BufferUtil.isEmpty(buffers[consumed]))
290 ++consumed;
291
292
293 if (consumed == 0)
294 return buffers;
295
296
297 if (consumed == length)
298 return EMPTY_BUFFERS;
299
300 int newLength = length - consumed;
301 ByteBuffer[] result = new ByteBuffer[newLength];
302 System.arraycopy(buffers, consumed, result, 0, newLength);
303 return result;
304 }
305 }
306
307
308
309
310
311 abstract protected void onIncompleteFlushed();
312
313
314
315
316
317
318
319
320
321
322
323
324
325 public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
326 {
327 if (DEBUG)
328 LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
329
330 if (!updateState(__IDLE,__WRITING))
331 throw new WritePendingException();
332
333 try
334 {
335 boolean flushed=_endPoint.flush(buffers);
336 if (DEBUG)
337 LOG.debug("flushed {}", flushed);
338
339
340 for (ByteBuffer b : buffers)
341 {
342 if (!flushed||BufferUtil.hasContent(b))
343 {
344 PendingState pending=new PendingState(buffers, callback);
345 if (updateState(__WRITING,pending))
346 onIncompleteFlushed();
347 else
348 fail(pending);
349 return;
350 }
351 }
352
353
354 if (!updateState(__WRITING,__IDLE))
355 ignoreFail();
356 if (callback!=null)
357 callback.succeeded();
358 }
359 catch (IOException e)
360 {
361 if (DEBUG)
362 LOG.debug("write exception", e);
363 if (updateState(__WRITING,__IDLE))
364 {
365 if (callback!=null)
366 callback.failed(e);
367 }
368 else
369 fail(new PendingState(buffers, callback));
370 }
371 }
372
373
374
375
376
377
378
379
380
381
382 public void completeWrite()
383 {
384 if (DEBUG)
385 LOG.debug("completeWrite: {}", this);
386
387 State previous = _state.get();
388
389 if (previous.getType()!=StateType.PENDING)
390 return;
391
392 PendingState pending = (PendingState)previous;
393 if (!updateState(pending,__COMPLETING))
394 return;
395
396 try
397 {
398 ByteBuffer[] buffers = pending.getBuffers();
399
400 boolean flushed=_endPoint.flush(buffers);
401 if (DEBUG)
402 LOG.debug("flushed {}", flushed);
403
404
405 for (ByteBuffer b : buffers)
406 {
407 if (!flushed || BufferUtil.hasContent(b))
408 {
409 if (updateState(__COMPLETING,pending))
410 onIncompleteFlushed();
411 else
412 fail(pending);
413 return;
414 }
415 }
416
417
418 if (!updateState(__COMPLETING,__IDLE))
419 ignoreFail();
420 pending.complete();
421 }
422 catch (IOException e)
423 {
424 if (DEBUG)
425 LOG.debug("completeWrite exception", e);
426 if(updateState(__COMPLETING,__IDLE))
427 pending.fail(e);
428 else
429 fail(pending);
430 }
431 }
432
433 public void onFail(Throwable cause)
434 {
435 if (DEBUG)
436 LOG.debug("failed: {} {}", this, cause);
437
438
439 while(true)
440 {
441 State current=_state.get();
442 switch(current.getType())
443 {
444 case IDLE:
445 case FAILED:
446 return;
447
448 case PENDING:
449 PendingState pending = (PendingState)current;
450 if (updateState(pending,__IDLE))
451 {
452 pending.fail(cause);
453 return;
454 }
455 break;
456
457 default:
458 if (updateState(current,new FailedState(cause)))
459 return;
460 break;
461 }
462 }
463 }
464
465 public void onClose()
466 {
467 if (_state.get()==__IDLE)
468 return;
469 onFail(new ClosedChannelException());
470 }
471
472 boolean isIdle()
473 {
474 return _state.get().getType() == StateType.IDLE;
475 }
476
477 public boolean isInProgress()
478 {
479 switch(_state.get().getType())
480 {
481 case WRITING:
482 case PENDING:
483 case COMPLETING:
484 return true;
485 default:
486 return false;
487 }
488 }
489
490 @Override
491 public String toString()
492 {
493 return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
494 }
495 }