1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.WritePendingException;
25 import java.util.Arrays;
26 import java.util.EnumMap;
27 import java.util.EnumSet;
28 import java.util.Set;
29 import java.util.concurrent.atomic.AtomicReference;
30
31 import org.eclipse.jetty.util.BufferUtil;
32 import org.eclipse.jetty.util.Callback;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35
36
37
38
39
40
41
42
43
44 abstract public class WriteFlusher
45 {
46 private static final Logger LOG = Log.getLogger(WriteFlusher.class);
47 private static final boolean DEBUG = LOG.isDebugEnabled();
48 private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[]{BufferUtil.EMPTY_BUFFER};
49 private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
50 private static final State __IDLE = new IdleState();
51 private static final State __WRITING = new WritingState();
52 private static final State __COMPLETING = new CompletingState();
53 private final EndPoint _endPoint;
54 private final AtomicReference<State> _state = new AtomicReference<>();
55
56 static
57 {
58
59 __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING));
60 __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
61 __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE));
62 __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
63 __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
64 }
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 protected WriteFlusher(EndPoint endPoint)
89 {
90 _state.set(__IDLE);
91 _endPoint = endPoint;
92 }
93
94 private enum StateType
95 {
96 IDLE,
97 WRITING,
98 PENDING,
99 COMPLETING,
100 FAILED
101 }
102
103
104
105
106
107
108
109
110 private boolean updateState(State previous,State next)
111 {
112 if (!isTransitionAllowed(previous,next))
113 throw new IllegalStateException();
114
115 boolean updated = _state.compareAndSet(previous, next);
116 if (DEBUG)
117 LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next);
118 return updated;
119 }
120
121 private void fail(PendingState pending)
122 {
123 State current = _state.get();
124 if (current.getType()==StateType.FAILED)
125 {
126 FailedState failed=(FailedState)current;
127 if (updateState(failed,__IDLE))
128 {
129 pending.fail(failed.getCause());
130 return;
131 }
132 }
133 throw new IllegalStateException();
134 }
135
136 private void ignoreFail()
137 {
138 State current = _state.get();
139 while (current.getType()==StateType.FAILED)
140 {
141 if (updateState(current,__IDLE))
142 return;
143 current = _state.get();
144 }
145 }
146
147 private boolean isTransitionAllowed(State currentState, State newState)
148 {
149 Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
150 if (!allowedNewStateTypes.contains(newState.getType()))
151 {
152 LOG.warn("{}: {} -> {} not allowed", this, currentState, newState);
153 return false;
154 }
155 return true;
156 }
157
158
159
160
161 private static class State
162 {
163 private final StateType _type;
164
165 private State(StateType stateType)
166 {
167 _type = stateType;
168 }
169
170 public StateType getType()
171 {
172 return _type;
173 }
174
175 @Override
176 public String toString()
177 {
178 return String.format("%s", _type);
179 }
180 }
181
182
183
184
185 private static class IdleState extends State
186 {
187 private IdleState()
188 {
189 super(StateType.IDLE);
190 }
191 }
192
193
194
195
196 private static class WritingState extends State
197 {
198 private WritingState()
199 {
200 super(StateType.WRITING);
201 }
202 }
203
204
205
206
207 private static class FailedState extends State
208 {
209 private final Throwable _cause;
210 private FailedState(Throwable cause)
211 {
212 super(StateType.FAILED);
213 _cause=cause;
214 }
215
216 public Throwable getCause()
217 {
218 return _cause;
219 }
220 }
221
222
223
224
225
226
227 private static class CompletingState extends State
228 {
229 private CompletingState()
230 {
231 super(StateType.COMPLETING);
232 }
233 }
234
235
236
237
238
239 private class PendingState extends State
240 {
241 private final Callback _callback;
242 private final ByteBuffer[] _buffers;
243
244 private PendingState(ByteBuffer[] buffers, Callback callback)
245 {
246 super(StateType.PENDING);
247 _buffers = buffers;
248 _callback = callback;
249 }
250
251 public ByteBuffer[] getBuffers()
252 {
253 return _buffers;
254 }
255
256 protected boolean fail(Throwable cause)
257 {
258 if (_callback!=null)
259 {
260 _callback.failed(cause);
261 return true;
262 }
263 return false;
264 }
265
266 protected void complete()
267 {
268 if (_callback!=null)
269 _callback.succeeded();
270 }
271
272 boolean isCallbackNonBlocking()
273 {
274 return _callback!=null && _callback.isNonBlocking();
275 }
276 }
277
278 public boolean isCallbackNonBlocking()
279 {
280 State s = _state.get();
281 return (s instanceof PendingState) && ((PendingState)s).isCallbackNonBlocking();
282 }
283
284
285
286
287
288 abstract protected void onIncompleteFlush();
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303 public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
304 {
305 if (DEBUG)
306 LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
307
308 if (!updateState(__IDLE,__WRITING))
309 throw new WritePendingException();
310
311 try
312 {
313 buffers=flush(buffers);
314
315
316 if (buffers!=null)
317 {
318 if (DEBUG)
319 LOG.debug("flushed incomplete");
320 PendingState pending=new PendingState(buffers, callback);
321 if (updateState(__WRITING,pending))
322 onIncompleteFlush();
323 else
324 fail(pending);
325 return;
326 }
327
328
329 if (!updateState(__WRITING,__IDLE))
330 ignoreFail();
331 if (callback!=null)
332 callback.succeeded();
333 }
334 catch (IOException e)
335 {
336 if (DEBUG)
337 LOG.debug("write exception", e);
338 if (updateState(__WRITING,__IDLE))
339 {
340 if (callback!=null)
341 callback.failed(e);
342 }
343 else
344 fail(new PendingState(buffers, callback));
345 }
346 }
347
348
349
350
351
352
353
354
355
356
357 public void completeWrite()
358 {
359 if (DEBUG)
360 LOG.debug("completeWrite: {}", this);
361
362 State previous = _state.get();
363
364 if (previous.getType()!=StateType.PENDING)
365 return;
366
367 PendingState pending = (PendingState)previous;
368 if (!updateState(pending,__COMPLETING))
369 return;
370
371 try
372 {
373 ByteBuffer[] buffers = pending.getBuffers();
374
375 buffers=flush(buffers);
376
377
378 if (buffers!=null)
379 {
380 if (DEBUG)
381 LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers));
382 if (buffers!=pending.getBuffers())
383 pending=new PendingState(buffers, pending._callback);
384 if (updateState(__COMPLETING,pending))
385 onIncompleteFlush();
386 else
387 fail(pending);
388 return;
389 }
390
391
392 if (!updateState(__COMPLETING,__IDLE))
393 ignoreFail();
394 pending.complete();
395 }
396 catch (IOException e)
397 {
398 if (DEBUG)
399 LOG.debug("completeWrite exception", e);
400 if(updateState(__COMPLETING,__IDLE))
401 pending.fail(e);
402 else
403 fail(pending);
404 }
405 }
406
407
408
409
410
411
412
413
414 protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
415 {
416 boolean progress=true;
417 while(progress && buffers!=null)
418 {
419 int before=buffers.length==0?0:buffers[0].remaining();
420 boolean flushed=_endPoint.flush(buffers);
421 int r=buffers.length==0?0:buffers[0].remaining();
422
423 if (LOG.isDebugEnabled())
424 LOG.debug("Flushed={} {}/{}+{} {}",flushed,before-r,before,buffers.length-1,this);
425
426 if (flushed)
427 return null;
428
429 progress=before!=r;
430
431 int not_empty=0;
432 while(r==0)
433 {
434 if (++not_empty==buffers.length)
435 {
436 buffers=null;
437 not_empty=0;
438 break;
439 }
440 progress=true;
441 r=buffers[not_empty].remaining();
442 }
443
444 if (not_empty>0)
445 buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
446 }
447
448 if (LOG.isDebugEnabled())
449 LOG.debug("!fully flushed {}",this);
450
451
452
453
454 return buffers==null?EMPTY_BUFFERS:buffers;
455 }
456
457
458
459
460
461
462 public boolean onFail(Throwable cause)
463 {
464
465 while(true)
466 {
467 State current=_state.get();
468 switch(current.getType())
469 {
470 case IDLE:
471 case FAILED:
472 if (DEBUG)
473 LOG.debug("ignored: {} {}", this, cause);
474 return false;
475
476 case PENDING:
477 if (DEBUG)
478 LOG.debug("failed: {} {}", this, cause);
479
480 PendingState pending = (PendingState)current;
481 if (updateState(pending,__IDLE))
482 return pending.fail(cause);
483 break;
484
485 default:
486 if (DEBUG)
487 LOG.debug("failed: {} {}", this, cause);
488
489 if (updateState(current,new FailedState(cause)))
490 return false;
491 break;
492 }
493 }
494 }
495
496 public void onClose()
497 {
498 onFail(new ClosedChannelException());
499 }
500
501 boolean isIdle()
502 {
503 return _state.get().getType() == StateType.IDLE;
504 }
505
506 public boolean isInProgress()
507 {
508 switch(_state.get().getType())
509 {
510 case WRITING:
511 case PENDING:
512 case COMPLETING:
513 return true;
514 default:
515 return false;
516 }
517 }
518
519 @Override
520 public String toString()
521 {
522 return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
523 }
524
525 public String toStateString()
526 {
527 switch(_state.get().getType())
528 {
529 case WRITING:
530 return "W";
531 case PENDING:
532 return "P";
533 case COMPLETING:
534 return "C";
535 case IDLE:
536 return "-";
537 case FAILED:
538 return "F";
539 default:
540 return "?";
541 }
542 }
543 }