1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.WritePendingException;
25 import java.util.Arrays;
26 import java.util.EnumMap;
27 import java.util.EnumSet;
28 import java.util.Set;
29 import java.util.concurrent.atomic.AtomicReference;
30
31 import org.eclipse.jetty.util.BufferUtil;
32 import org.eclipse.jetty.util.Callback;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35
36
37
38
39
40
41
42
43
44 abstract public class WriteFlusher
45 {
46 private static final Logger LOG = Log.getLogger(WriteFlusher.class);
47 private static final boolean DEBUG = LOG.isDebugEnabled();
48 private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[]{BufferUtil.EMPTY_BUFFER};
49 private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
50 private static final State __IDLE = new IdleState();
51 private static final State __WRITING = new WritingState();
52 private static final State __COMPLETING = new CompletingState();
53 private final EndPoint _endPoint;
54 private final AtomicReference<State> _state = new AtomicReference<>();
55
56 static
57 {
58
59 __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING));
60 __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
61 __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE));
62 __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
63 __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
64 }
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 protected WriteFlusher(EndPoint endPoint)
89 {
90 _state.set(__IDLE);
91 _endPoint = endPoint;
92 }
93
94 private enum StateType
95 {
96 IDLE,
97 WRITING,
98 PENDING,
99 COMPLETING,
100 FAILED
101 }
102
103
104
105
106
107
108
109
110 private boolean updateState(State previous,State next)
111 {
112 if (!isTransitionAllowed(previous,next))
113 throw new IllegalStateException();
114
115 boolean updated = _state.compareAndSet(previous, next);
116 if (DEBUG)
117 LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next);
118 return updated;
119 }
120
121 private void fail(PendingState pending)
122 {
123 State current = _state.get();
124 if (current.getType()==StateType.FAILED)
125 {
126 FailedState failed=(FailedState)current;
127 if (updateState(failed,__IDLE))
128 {
129 pending.fail(failed.getCause());
130 return;
131 }
132 }
133 throw new IllegalStateException();
134 }
135
136 private void ignoreFail()
137 {
138 State current = _state.get();
139 while (current.getType()==StateType.FAILED)
140 {
141 if (updateState(current,__IDLE))
142 return;
143 current = _state.get();
144 }
145 }
146
147 private boolean isTransitionAllowed(State currentState, State newState)
148 {
149 Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
150 if (!allowedNewStateTypes.contains(newState.getType()))
151 {
152 LOG.warn("{}: {} -> {} not allowed", this, currentState, newState);
153 return false;
154 }
155 return true;
156 }
157
158
159
160
161 private static class State
162 {
163 private final StateType _type;
164
165 private State(StateType stateType)
166 {
167 _type = stateType;
168 }
169
170 public StateType getType()
171 {
172 return _type;
173 }
174
175 @Override
176 public String toString()
177 {
178 return String.format("%s", _type);
179 }
180 }
181
182
183
184
185 private static class IdleState extends State
186 {
187 private IdleState()
188 {
189 super(StateType.IDLE);
190 }
191 }
192
193
194
195
196 private static class WritingState extends State
197 {
198 private WritingState()
199 {
200 super(StateType.WRITING);
201 }
202 }
203
204
205
206
207 private static class FailedState extends State
208 {
209 private final Throwable _cause;
210 private FailedState(Throwable cause)
211 {
212 super(StateType.FAILED);
213 _cause=cause;
214 }
215
216 public Throwable getCause()
217 {
218 return _cause;
219 }
220 }
221
222
223
224
225
226
227 private static class CompletingState extends State
228 {
229 private CompletingState()
230 {
231 super(StateType.COMPLETING);
232 }
233 }
234
235
236
237
238
239 private class PendingState extends State
240 {
241 private final Callback _callback;
242 private final ByteBuffer[] _buffers;
243
244 private PendingState(ByteBuffer[] buffers, Callback callback)
245 {
246 super(StateType.PENDING);
247 _buffers = buffers;
248 _callback = callback;
249 }
250
251 public ByteBuffer[] getBuffers()
252 {
253 return _buffers;
254 }
255
256 protected boolean fail(Throwable cause)
257 {
258 if (_callback!=null)
259 {
260 _callback.failed(cause);
261 return true;
262 }
263 return false;
264 }
265
266 protected void complete()
267 {
268 if (_callback!=null)
269 _callback.succeeded();
270 }
271
272 boolean isCallbackNonBlocking()
273 {
274 return _callback!=null && _callback.isNonBlocking();
275 }
276 }
277
278 public boolean isCallbackNonBlocking()
279 {
280 State s = _state.get();
281 return (s instanceof PendingState) && ((PendingState)s).isCallbackNonBlocking();
282 }
283
284
285
286
287
288 abstract protected void onIncompleteFlush();
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303 public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
304 {
305 if (DEBUG)
306 LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
307
308 if (!updateState(__IDLE,__WRITING))
309 throw new WritePendingException();
310
311 try
312 {
313 buffers=flush(buffers);
314
315
316 if (buffers!=null)
317 {
318 if (DEBUG)
319 LOG.debug("flushed incomplete");
320 PendingState pending=new PendingState(buffers, callback);
321 if (updateState(__WRITING,pending))
322 onIncompleteFlush();
323 else
324 fail(pending);
325 return;
326 }
327
328
329 if (!updateState(__WRITING,__IDLE))
330 ignoreFail();
331 if (callback!=null)
332 callback.succeeded();
333 }
334 catch (IOException e)
335 {
336 if (DEBUG)
337 LOG.debug("write exception", e);
338 if (updateState(__WRITING,__IDLE))
339 {
340 if (callback!=null)
341 callback.failed(e);
342 }
343 else
344 fail(new PendingState(buffers, callback));
345 }
346 }
347
348
349
350
351
352
353
354
355
356
357 public void completeWrite()
358 {
359 if (DEBUG)
360 LOG.debug("completeWrite: {}", this);
361
362 State previous = _state.get();
363
364 if (previous.getType()!=StateType.PENDING)
365 return;
366
367 PendingState pending = (PendingState)previous;
368 if (!updateState(pending,__COMPLETING))
369 return;
370
371 try
372 {
373 ByteBuffer[] buffers = pending.getBuffers();
374
375 buffers=flush(buffers);
376
377
378 if (buffers!=null)
379 {
380 if (DEBUG)
381 LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers));
382 if (buffers!=pending.getBuffers())
383 pending=new PendingState(buffers, pending._callback);
384 if (updateState(__COMPLETING,pending))
385 onIncompleteFlush();
386 else
387 fail(pending);
388 return;
389 }
390
391
392 if (!updateState(__COMPLETING,__IDLE))
393 ignoreFail();
394 pending.complete();
395 }
396 catch (IOException e)
397 {
398 if (DEBUG)
399 LOG.debug("completeWrite exception", e);
400 if(updateState(__COMPLETING,__IDLE))
401 pending.fail(e);
402 else
403 fail(pending);
404 }
405 }
406
407
408
409
410
411
412
413 protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
414 {
415 boolean progress=true;
416 while(progress && buffers!=null)
417 {
418 int before=buffers.length==0?0:buffers[0].remaining();
419 boolean flushed=_endPoint.flush(buffers);
420 int r=buffers.length==0?0:buffers[0].remaining();
421
422 if (LOG.isDebugEnabled())
423 LOG.debug("Flushed={} {}/{}+{} {}",flushed,before-r,before,buffers.length-1,this);
424
425 if (flushed)
426 return null;
427
428 progress=before!=r;
429
430 int not_empty=0;
431 while(r==0)
432 {
433 if (++not_empty==buffers.length)
434 {
435 buffers=null;
436 not_empty=0;
437 break;
438 }
439 progress=true;
440 r=buffers[not_empty].remaining();
441 }
442
443 if (not_empty>0)
444 buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
445 }
446
447 if (LOG.isDebugEnabled())
448 LOG.debug("!fully flushed {}",this);
449
450
451
452
453 return buffers==null?EMPTY_BUFFERS:buffers;
454 }
455
456
457
458
459
460
461 public boolean onFail(Throwable cause)
462 {
463
464 while(true)
465 {
466 State current=_state.get();
467 switch(current.getType())
468 {
469 case IDLE:
470 case FAILED:
471 if (DEBUG)
472 LOG.debug("ignored: {} {}", this, cause);
473 return false;
474
475 case PENDING:
476 if (DEBUG)
477 LOG.debug("failed: {} {}", this, cause);
478
479 PendingState pending = (PendingState)current;
480 if (updateState(pending,__IDLE))
481 return pending.fail(cause);
482 break;
483
484 default:
485 if (DEBUG)
486 LOG.debug("failed: {} {}", this, cause);
487
488 if (updateState(current,new FailedState(cause)))
489 return false;
490 break;
491 }
492 }
493 }
494
495 public void onClose()
496 {
497 if (_state.get()==__IDLE)
498 return;
499 onFail(new ClosedChannelException());
500 }
501
502 boolean isIdle()
503 {
504 return _state.get().getType() == StateType.IDLE;
505 }
506
507 public boolean isInProgress()
508 {
509 switch(_state.get().getType())
510 {
511 case WRITING:
512 case PENDING:
513 case COMPLETING:
514 return true;
515 default:
516 return false;
517 }
518 }
519
520 @Override
521 public String toString()
522 {
523 return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
524 }
525
526 public String toStateString()
527 {
528 switch(_state.get().getType())
529 {
530 case WRITING:
531 return "W";
532 case PENDING:
533 return "P";
534 case COMPLETING:
535 return "C";
536 case IDLE:
537 return "-";
538 case FAILED:
539 return "F";
540 default:
541 return "?";
542 }
543 }
544 }