1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.WritePendingException;
25 import java.util.EnumMap;
26 import java.util.EnumSet;
27 import java.util.Set;
28 import java.util.concurrent.atomic.AtomicReference;
29
30 import org.eclipse.jetty.util.BufferUtil;
31 import org.eclipse.jetty.util.Callback;
32 import org.eclipse.jetty.util.log.Log;
33 import org.eclipse.jetty.util.log.Logger;
34
35
36
37
38
39
40
41
42
43
44 abstract public class WriteFlusher
45 {
46 private static final Logger LOG = Log.getLogger(WriteFlusher.class);
47 private static final boolean DEBUG = LOG.isDebugEnabled();
48 private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
49 private static final State __IDLE = new IdleState();
50 private static final State __WRITING = new WritingState();
51 private static final State __COMPLETING = new CompletingState();
52 private final EndPoint _endPoint;
53 private final AtomicReference<State> _state = new AtomicReference<>();
54
55 static
56 {
57
58 __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING));
59 __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
60 __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE));
61 __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
62 __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
63 }
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 protected WriteFlusher(EndPoint endPoint)
88 {
89 _state.set(__IDLE);
90 _endPoint = endPoint;
91 }
92
93 private enum StateType
94 {
95 IDLE,
96 WRITING,
97 PENDING,
98 COMPLETING,
99 FAILED
100 }
101
102
103
104
105
106
107
108
109 private boolean updateState(State previous,State next)
110 {
111 if (!isTransitionAllowed(previous,next))
112 throw new IllegalStateException();
113
114 boolean updated = _state.compareAndSet(previous, next);
115 if (DEBUG)
116 LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next);
117 return updated;
118 }
119
120 private void fail(PendingState pending)
121 {
122 State current = _state.get();
123 if (current.getType()==StateType.FAILED)
124 {
125 FailedState failed=(FailedState)current;
126 if (updateState(failed,__IDLE))
127 {
128 pending.fail(failed.getCause());
129 return;
130 }
131 }
132 throw new IllegalStateException();
133 }
134
135 private void ignoreFail()
136 {
137 State current = _state.get();
138 while (current.getType()==StateType.FAILED)
139 {
140 if (updateState(current,__IDLE))
141 return;
142 current = _state.get();
143 }
144 }
145
146 private boolean isTransitionAllowed(State currentState, State newState)
147 {
148 Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
149 if (!allowedNewStateTypes.contains(newState.getType()))
150 {
151 LOG.warn("{}: {} -> {} not allowed", this, currentState, newState);
152 return false;
153 }
154 return true;
155 }
156
157
158
159
160 private static class State
161 {
162 private final StateType _type;
163
164 private State(StateType stateType)
165 {
166 _type = stateType;
167 }
168
169 public StateType getType()
170 {
171 return _type;
172 }
173
174 @Override
175 public String toString()
176 {
177 return String.format("%s", _type);
178 }
179 }
180
181
182
183
184 private static class IdleState extends State
185 {
186 private IdleState()
187 {
188 super(StateType.IDLE);
189 }
190 }
191
192
193
194
195 private static class WritingState extends State
196 {
197 private WritingState()
198 {
199 super(StateType.WRITING);
200 }
201 }
202
203
204
205
206 private static class FailedState extends State
207 {
208 private final Throwable _cause;
209 private FailedState(Throwable cause)
210 {
211 super(StateType.FAILED);
212 _cause=cause;
213 }
214
215 public Throwable getCause()
216 {
217 return _cause;
218 }
219 }
220
221
222
223
224
225
226 private static class CompletingState extends State
227 {
228 private CompletingState()
229 {
230 super(StateType.COMPLETING);
231 }
232 }
233
234
235
236
237
238 private class PendingState extends State
239 {
240 private final Callback _callback;
241 private final ByteBuffer[] _buffers;
242
243 private PendingState(ByteBuffer[] buffers, Callback callback)
244 {
245 super(StateType.PENDING);
246 _buffers = buffers;
247 _callback = callback;
248 }
249
250 public ByteBuffer[] getBuffers()
251 {
252 return _buffers;
253 }
254
255 protected void fail(Throwable cause)
256 {
257 if (_callback!=null)
258 _callback.failed(cause);
259 }
260
261 protected void complete()
262 {
263 if (_callback!=null)
264 _callback.succeeded();
265 }
266 }
267
268
269
270
271
272 abstract protected void onIncompleteFlushed();
273
274
275
276
277
278
279
280
281
282
283
284
285
286 public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
287 {
288 if (DEBUG)
289 LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
290
291 if (!updateState(__IDLE,__WRITING))
292 throw new WritePendingException();
293
294 try
295 {
296 boolean flushed=_endPoint.flush(buffers);
297 if (DEBUG)
298 LOG.debug("flushed {}", flushed);
299
300
301 for (ByteBuffer b : buffers)
302 {
303 if (!flushed||BufferUtil.hasContent(b))
304 {
305 PendingState pending=new PendingState(buffers, callback);
306 if (updateState(__WRITING,pending))
307 onIncompleteFlushed();
308 else
309 fail(new PendingState(buffers, callback));
310 return;
311 }
312 }
313
314
315 if (!updateState(__WRITING,__IDLE))
316 ignoreFail();
317 if (callback!=null)
318 callback.succeeded();
319 }
320 catch (IOException e)
321 {
322 if (DEBUG)
323 LOG.debug("write exception", e);
324 if (updateState(__WRITING,__IDLE))
325 {
326 if (callback!=null)
327 callback.failed(e);
328 }
329 else
330 fail(new PendingState(buffers, callback));
331 }
332 }
333
334
335
336
337
338
339
340
341
342
343 public void completeWrite()
344 {
345 if (DEBUG)
346 LOG.debug("completeWrite: {}", this);
347
348 State previous = _state.get();
349
350 if (previous.getType()!=StateType.PENDING)
351 return;
352
353 PendingState pending = (PendingState)previous;
354 if (!updateState(pending,__COMPLETING))
355 return;
356
357 try
358 {
359 ByteBuffer[] buffers = pending.getBuffers();
360
361 boolean flushed=_endPoint.flush(buffers);
362 if (DEBUG)
363 LOG.debug("flushed {}", flushed);
364
365
366 for (ByteBuffer b : buffers)
367 {
368 if (!flushed || BufferUtil.hasContent(b))
369 {
370 if (updateState(__COMPLETING,pending))
371 onIncompleteFlushed();
372 else
373 fail(pending);
374 return;
375 }
376 }
377
378
379 if (!updateState(__COMPLETING,__IDLE))
380 ignoreFail();
381 pending.complete();
382 }
383 catch (IOException e)
384 {
385 if (DEBUG)
386 LOG.debug("completeWrite exception", e);
387 if(updateState(__COMPLETING,__IDLE))
388 pending.fail(e);
389 else
390 fail(pending);
391 }
392 }
393
394 public void onFail(Throwable cause)
395 {
396 if (DEBUG)
397 LOG.debug("failed: {} {}", this, cause);
398
399
400 while(true)
401 {
402 State current=_state.get();
403 switch(current.getType())
404 {
405 case IDLE:
406 case FAILED:
407 return;
408
409 case PENDING:
410 PendingState pending = (PendingState)current;
411 if (updateState(pending,__IDLE))
412 {
413 pending.fail(cause);
414 return;
415 }
416 break;
417
418 default:
419 if (updateState(current,new FailedState(cause)))
420 return;
421 break;
422 }
423 }
424 }
425
426 public void onClose()
427 {
428 if (_state.get()==__IDLE)
429 return;
430 onFail(new ClosedChannelException());
431 }
432
433 boolean isIdle()
434 {
435 return _state.get().getType() == StateType.IDLE;
436 }
437
438 public boolean isInProgress()
439 {
440 switch(_state.get().getType())
441 {
442 case WRITING:
443 case PENDING:
444 case COMPLETING:
445 return true;
446 default:
447 return false;
448 }
449 }
450
451 @Override
452 public String toString()
453 {
454 return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
455 }
456 }