1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.WritePendingException;
25 import java.util.Arrays;
26 import java.util.EnumMap;
27 import java.util.EnumSet;
28 import java.util.Set;
29 import java.util.concurrent.atomic.AtomicReference;
30
31 import org.eclipse.jetty.util.BufferUtil;
32 import org.eclipse.jetty.util.Callback;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35
36
37
38
39
40
41
42
43
44
45 abstract public class WriteFlusher
46 {
47 private static final Logger LOG = Log.getLogger(WriteFlusher.class);
48 private static final boolean DEBUG = LOG.isDebugEnabled();
49 private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[]{BufferUtil.EMPTY_BUFFER};
50 private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
51 private static final State __IDLE = new IdleState();
52 private static final State __WRITING = new WritingState();
53 private static final State __COMPLETING = new CompletingState();
54 private final EndPoint _endPoint;
55 private final AtomicReference<State> _state = new AtomicReference<>();
56
57 static
58 {
59
60 __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING));
61 __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
62 __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE));
63 __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
64 __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
65 }
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 protected WriteFlusher(EndPoint endPoint)
90 {
91 _state.set(__IDLE);
92 _endPoint = endPoint;
93 }
94
95 private enum StateType
96 {
97 IDLE,
98 WRITING,
99 PENDING,
100 COMPLETING,
101 FAILED
102 }
103
104
105
106
107
108
109
110
111 private boolean updateState(State previous,State next)
112 {
113 if (!isTransitionAllowed(previous,next))
114 throw new IllegalStateException();
115
116 boolean updated = _state.compareAndSet(previous, next);
117 if (DEBUG)
118 LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next);
119 return updated;
120 }
121
122 private void fail(PendingState pending)
123 {
124 State current = _state.get();
125 if (current.getType()==StateType.FAILED)
126 {
127 FailedState failed=(FailedState)current;
128 if (updateState(failed,__IDLE))
129 {
130 pending.fail(failed.getCause());
131 return;
132 }
133 }
134 throw new IllegalStateException();
135 }
136
137 private void ignoreFail()
138 {
139 State current = _state.get();
140 while (current.getType()==StateType.FAILED)
141 {
142 if (updateState(current,__IDLE))
143 return;
144 current = _state.get();
145 }
146 }
147
148 private boolean isTransitionAllowed(State currentState, State newState)
149 {
150 Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
151 if (!allowedNewStateTypes.contains(newState.getType()))
152 {
153 LOG.warn("{}: {} -> {} not allowed", this, currentState, newState);
154 return false;
155 }
156 return true;
157 }
158
159
160
161
162 private static class State
163 {
164 private final StateType _type;
165
166 private State(StateType stateType)
167 {
168 _type = stateType;
169 }
170
171 public StateType getType()
172 {
173 return _type;
174 }
175
176 @Override
177 public String toString()
178 {
179 return String.format("%s", _type);
180 }
181 }
182
183
184
185
186 private static class IdleState extends State
187 {
188 private IdleState()
189 {
190 super(StateType.IDLE);
191 }
192 }
193
194
195
196
197 private static class WritingState extends State
198 {
199 private WritingState()
200 {
201 super(StateType.WRITING);
202 }
203 }
204
205
206
207
208 private static class FailedState extends State
209 {
210 private final Throwable _cause;
211 private FailedState(Throwable cause)
212 {
213 super(StateType.FAILED);
214 _cause=cause;
215 }
216
217 public Throwable getCause()
218 {
219 return _cause;
220 }
221 }
222
223
224
225
226
227
228 private static class CompletingState extends State
229 {
230 private CompletingState()
231 {
232 super(StateType.COMPLETING);
233 }
234 }
235
236
237
238
239
240 private class PendingState extends State
241 {
242 private final Callback _callback;
243 private final ByteBuffer[] _buffers;
244
245 private PendingState(ByteBuffer[] buffers, Callback callback)
246 {
247 super(StateType.PENDING);
248 _buffers = buffers;
249 _callback = callback;
250 }
251
252 public ByteBuffer[] getBuffers()
253 {
254 return _buffers;
255 }
256
257 protected boolean fail(Throwable cause)
258 {
259 if (_callback!=null)
260 {
261 _callback.failed(cause);
262 return true;
263 }
264 return false;
265 }
266
267 protected void complete()
268 {
269 if (_callback!=null)
270 _callback.succeeded();
271 }
272 }
273
274
275
276
277
278 abstract protected void onIncompleteFlushed();
279
280
281
282
283
284
285
286
287
288
289
290
291
292 public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
293 {
294 if (DEBUG)
295 LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
296
297 if (!updateState(__IDLE,__WRITING))
298 throw new WritePendingException();
299
300 try
301 {
302 buffers=flush(buffers);
303
304
305 if (buffers!=null)
306 {
307 if (DEBUG)
308 LOG.debug("flushed incomplete");
309 PendingState pending=new PendingState(buffers, callback);
310 if (updateState(__WRITING,pending))
311 onIncompleteFlushed();
312 else
313 fail(pending);
314 return;
315 }
316
317
318 if (!updateState(__WRITING,__IDLE))
319 ignoreFail();
320 if (callback!=null)
321 callback.succeeded();
322 }
323 catch (IOException e)
324 {
325 if (DEBUG)
326 LOG.debug("write exception", e);
327 if (updateState(__WRITING,__IDLE))
328 {
329 if (callback!=null)
330 callback.failed(e);
331 }
332 else
333 fail(new PendingState(buffers, callback));
334 }
335 }
336
337
338
339
340
341
342
343
344
345
346 public void completeWrite()
347 {
348 if (DEBUG)
349 LOG.debug("completeWrite: {}", this);
350
351 State previous = _state.get();
352
353 if (previous.getType()!=StateType.PENDING)
354 return;
355
356 PendingState pending = (PendingState)previous;
357 if (!updateState(pending,__COMPLETING))
358 return;
359
360 try
361 {
362 ByteBuffer[] buffers = pending.getBuffers();
363
364 buffers=flush(buffers);
365
366
367 if (buffers!=null)
368 {
369 if (DEBUG)
370 LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers));
371 if (buffers!=pending.getBuffers())
372 pending=new PendingState(buffers, pending._callback);
373 if (updateState(__COMPLETING,pending))
374 onIncompleteFlushed();
375 else
376 fail(pending);
377 return;
378 }
379
380
381 if (!updateState(__COMPLETING,__IDLE))
382 ignoreFail();
383 pending.complete();
384 }
385 catch (IOException e)
386 {
387 if (DEBUG)
388 LOG.debug("completeWrite exception", e);
389 if(updateState(__COMPLETING,__IDLE))
390 pending.fail(e);
391 else
392 fail(pending);
393 }
394 }
395
396
397
398
399
400
401
402 protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
403 {
404 boolean progress=true;
405 while(progress && buffers!=null)
406 {
407 int before=buffers.length==0?0:buffers[0].remaining();
408 boolean flushed=_endPoint.flush(buffers);
409 int r=buffers.length==0?0:buffers[0].remaining();
410
411 if (flushed)
412 return null;
413
414 progress=before!=r;
415
416 int not_empty=0;
417 while(r==0)
418 {
419 if (++not_empty==buffers.length)
420 {
421 buffers=null;
422 not_empty=0;
423 break;
424 }
425 progress=true;
426 r=buffers[not_empty].remaining();
427 }
428
429 if (not_empty>0)
430 buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
431 }
432
433
434
435
436 return buffers==null?EMPTY_BUFFERS:buffers;
437 }
438
439
440
441
442
443
444 public boolean onFail(Throwable cause)
445 {
446
447 while(true)
448 {
449 State current=_state.get();
450 switch(current.getType())
451 {
452 case IDLE:
453 case FAILED:
454 if (DEBUG)
455 LOG.debug("ignored: {} {}", this, cause);
456 return false;
457
458 case PENDING:
459 if (DEBUG)
460 LOG.debug("failed: {} {}", this, cause);
461
462 PendingState pending = (PendingState)current;
463 if (updateState(pending,__IDLE))
464 return pending.fail(cause);
465 break;
466
467 default:
468 if (DEBUG)
469 LOG.debug("failed: {} {}", this, cause);
470
471 if (updateState(current,new FailedState(cause)))
472 return false;
473 break;
474 }
475 }
476 }
477
478 public void onClose()
479 {
480 if (_state.get()==__IDLE)
481 return;
482 onFail(new ClosedChannelException());
483 }
484
485 boolean isIdle()
486 {
487 return _state.get().getType() == StateType.IDLE;
488 }
489
490 public boolean isInProgress()
491 {
492 switch(_state.get().getType())
493 {
494 case WRITING:
495 case PENDING:
496 case COMPLETING:
497 return true;
498 default:
499 return false;
500 }
501 }
502
503 @Override
504 public String toString()
505 {
506 return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
507 }
508 }