View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.ClosedChannelException;
24  import java.nio.channels.WritePendingException;
25  import java.util.EnumMap;
26  import java.util.EnumSet;
27  import java.util.Set;
28  import java.util.concurrent.atomic.AtomicReference;
29  
30  import org.eclipse.jetty.util.BufferUtil;
31  import org.eclipse.jetty.util.Callback;
32  import org.eclipse.jetty.util.log.Log;
33  import org.eclipse.jetty.util.log.Logger;
34  
35  
36  /**
37   * A Utility class to help implement {@link EndPoint#write(Callback, ByteBuffer...)} by calling
38   * {@link EndPoint#flush(ByteBuffer...)} until all content is written.
39   * The abstract method {@link #onIncompleteFlushed()} is called when not all content has been written after a call to
40   * flush and should organise for the {@link #completeWrite()} method to be called when a subsequent call to flush
41   * should  be able to make more progress.
42   * <p>
43   */
44  abstract public class WriteFlusher
45  {
46      private static final Logger LOG = Log.getLogger(WriteFlusher.class);
47      private static final boolean DEBUG = LOG.isDebugEnabled(); // Easy for the compiler to remove the code if DEBUG==false
48      private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
49      private static final State __IDLE = new IdleState();
50      private static final State __WRITING = new WritingState();
51      private static final State __COMPLETING = new CompletingState();
52      private final EndPoint _endPoint;
53      private final AtomicReference<State> _state = new AtomicReference<>();
54  
55      static
56      {
57          // fill the state machine
58          __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING));
59          __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
60          __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE));
61          __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
62          __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
63      }
64  
65      // A write operation may either complete immediately:
66      //     IDLE-->WRITING-->IDLE
67      // Or it may not completely flush and go via the PENDING state
68      //     IDLE-->WRITING-->PENDING-->COMPLETING-->IDLE
69      // Or it may take several cycles to complete
70      //     IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE
71      //
72      // If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure.
73      // If a failure happens while in WRITING, but the the write has finished successfully or with an IOExceptions,
74      // the callback's complete or respectively failed methods will be called.
75      // If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state
76      //
77      //   IDLE--(fail)-->IDLE
78      //   IDLE-->WRITING--(fail)-->FAILED-->IDLE
79      //   IDLE-->WRITING-->PENDING--(fail)-->IDLE
80      //   IDLE-->WRITING-->PENDING-->COMPLETING--(fail)-->FAILED-->IDLE
81      //
82      // So a call to fail in the PENDING state will be directly handled and the state changed to IDLE
83      // A call to fail in the WRITING or COMPLETING states will just set the state to FAILED and the failure will be
84      // handled with the write or completeWrite methods try to move the state from what they thought it was.
85      //
86  
87      protected WriteFlusher(EndPoint endPoint)
88      {
89          _state.set(__IDLE);
90          _endPoint = endPoint;
91      }
92  
93      private enum StateType
94      {
95          IDLE,
96          WRITING,
97          PENDING,
98          COMPLETING,
99          FAILED
100     }
101 
102     /**
103      * Tries to update the current state to the given new state.
104      * @param previous the expected current state
105      * @param next the desired new state
106      * @return the previous state or null if the state transition failed
107      * @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error)
108      */
109     private boolean updateState(State previous,State next)
110     {
111         if (!isTransitionAllowed(previous,next))
112             throw new IllegalStateException();
113 
114         boolean updated = _state.compareAndSet(previous, next);
115         if (DEBUG)
116             LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next);
117         return updated;
118     }
119 
120     private void fail(PendingState pending)
121     {
122         State current = _state.get();
123         if (current.getType()==StateType.FAILED)
124         {
125             FailedState failed=(FailedState)current;
126             if (updateState(failed,__IDLE))
127             {
128                 pending.fail(failed.getCause());
129                 return;
130             }
131         }
132         throw new IllegalStateException();
133     }
134 
135     private void ignoreFail()
136     {
137         State current = _state.get();
138         while (current.getType()==StateType.FAILED)
139         {
140             if (updateState(current,__IDLE))
141                 return;
142             current = _state.get();
143         }
144     }
145 
146     private boolean isTransitionAllowed(State currentState, State newState)
147     {
148         Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
149         if (!allowedNewStateTypes.contains(newState.getType()))
150         {
151             LOG.warn("{}: {} -> {} not allowed", this, currentState, newState);
152             return false;
153         }
154         return true;
155     }
156 
157     /**
158      * State represents a State of WriteFlusher.
159      */
160     private static class State
161     {
162         private final StateType _type;
163 
164         private State(StateType stateType)
165         {
166             _type = stateType;
167         }
168 
169         public StateType getType()
170         {
171             return _type;
172         }
173 
174         @Override
175         public String toString()
176         {
177             return String.format("%s", _type);
178         }
179     }
180 
181     /**
182      * In IdleState WriteFlusher is idle and accepts new writes
183      */
184     private static class IdleState extends State
185     {
186         private IdleState()
187         {
188             super(StateType.IDLE);
189         }
190     }
191 
192     /**
193      * In WritingState WriteFlusher is currently writing.
194      */
195     private static class WritingState extends State
196     {
197         private WritingState()
198         {
199             super(StateType.WRITING);
200         }
201     }
202 
203     /**
204      * In FailedState no more operations are allowed. The current implementation will never recover from this state.
205      */
206     private static class FailedState extends State
207     {
208         private final Throwable _cause;
209         private FailedState(Throwable cause)
210         {
211             super(StateType.FAILED);
212             _cause=cause;
213         }
214 
215         public Throwable getCause()
216         {
217             return _cause;
218         }
219     }
220 
221     /**
222      * In CompletingState WriteFlusher is flushing buffers that have not been fully written in write(). If write()
223      * didn't flush all buffers in one go, it'll switch the State to PendingState. completeWrite() will then switch to
224      * this state and try to flush the remaining buffers.
225      */
226     private static class CompletingState extends State
227     {
228         private CompletingState()
229         {
230             super(StateType.COMPLETING);
231         }
232     }
233 
234     /**
235      * In PendingState not all buffers could be written in one go. Then write() will switch to PendingState() and
236      * preserve the state by creating a new PendingState object with the given parameters.
237      */
238     private class PendingState extends State
239     {
240         private final Callback _callback;
241         private final ByteBuffer[] _buffers;
242 
243         private PendingState(ByteBuffer[] buffers, Callback callback)
244         {
245             super(StateType.PENDING);
246             _buffers = buffers;
247             _callback = callback;
248         }
249 
250         public ByteBuffer[] getBuffers()
251         {
252             return _buffers;
253         }
254 
255         protected void fail(Throwable cause)
256         {
257             if (_callback!=null)
258                 _callback.failed(cause);
259         }
260 
261         protected void complete()
262         {
263             if (_callback!=null)
264                 _callback.succeeded();
265         }
266     }
267 
268     /**
269      * Abstract call to be implemented by specific WriteFlushers. It should schedule a call to {@link #completeWrite()}
270      * or {@link #onFail(Throwable)} when appropriate.
271      */
272     abstract protected void onIncompleteFlushed();
273 
274     /**
275      * Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition
276      * fails it'll fail the callback.
277      *
278      * If not all buffers can be written in one go it creates a new {@link PendingState} object to preserve the state
279      * and then calls {@link #onIncompleteFlushed()}. The remaining buffers will be written in {@link #completeWrite()}.
280      *
281      * If all buffers have been written it calls callback.complete().
282      *
283      * @param callback the callback to call on either failed or complete
284      * @param buffers the buffers to flush to the endpoint
285      */
286     public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
287     {
288         if (DEBUG)
289             LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
290 
291         if (!updateState(__IDLE,__WRITING))
292             throw new WritePendingException();
293 
294         try
295         {
296             boolean flushed=_endPoint.flush(buffers);
297             if (DEBUG)
298                 LOG.debug("flushed {}", flushed);
299 
300             // Are we complete?
301             for (ByteBuffer b : buffers)
302             {
303                 if (!flushed||BufferUtil.hasContent(b))
304                 {
305                     PendingState pending=new PendingState(buffers, callback);
306                     if (updateState(__WRITING,pending))
307                         onIncompleteFlushed();
308                     else
309                         fail(new PendingState(buffers, callback));
310                     return;
311                 }
312             }
313 
314             // If updateState didn't succeed, we don't care as our buffers have been written
315             if (!updateState(__WRITING,__IDLE))
316                 ignoreFail();
317             if (callback!=null)
318                 callback.succeeded();
319         }
320         catch (IOException e)
321         {
322             if (DEBUG)
323                 LOG.debug("write exception", e);
324             if (updateState(__WRITING,__IDLE))
325             {
326                 if (callback!=null)
327                     callback.failed(e);
328             }
329             else
330                 fail(new PendingState(buffers, callback));
331         }
332     }
333 
334 
335     /**
336      * Complete a write that has not completed and that called {@link #onIncompleteFlushed()} to request a call to this
337      * method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress.
338      *
339      * It tries to switch from PENDING to COMPLETING. If state transition fails, then it does nothing as the callback
340      * should have been already failed. That's because the only way to switch from PENDING outside this method is
341      * {@link #onFail(Throwable)} or {@link #onClose()}
342      */
343     public void completeWrite()
344     {
345         if (DEBUG)
346             LOG.debug("completeWrite: {}", this);
347 
348         State previous = _state.get();
349 
350         if (previous.getType()!=StateType.PENDING)
351             return; // failure already handled.
352 
353         PendingState pending = (PendingState)previous;
354         if (!updateState(pending,__COMPLETING))
355             return; // failure already handled.
356 
357         try
358         {
359             ByteBuffer[] buffers = pending.getBuffers();
360 
361             boolean flushed=_endPoint.flush(buffers);
362             if (DEBUG)
363                 LOG.debug("flushed {}", flushed);
364 
365             // Are we complete?
366             for (ByteBuffer b : buffers)
367             {
368                 if (!flushed || BufferUtil.hasContent(b))
369                 {
370                     if (updateState(__COMPLETING,pending))
371                         onIncompleteFlushed();
372                     else
373                         fail(pending);
374                     return;
375                 }
376             }
377 
378             // If updateState didn't succeed, we don't care as our buffers have been written
379             if (!updateState(__COMPLETING,__IDLE))
380                 ignoreFail();
381             pending.complete();
382         }
383         catch (IOException e)
384         {
385             if (DEBUG)
386                 LOG.debug("completeWrite exception", e);
387             if(updateState(__COMPLETING,__IDLE))
388                 pending.fail(e);
389             else
390                 fail(pending);
391         }
392     }
393 
394     public void onFail(Throwable cause)
395     {
396         if (DEBUG)
397             LOG.debug("failed: {} {}", this, cause);
398 
399         // Keep trying to handle the failure until we get to IDLE or FAILED state
400         while(true)
401         {
402             State current=_state.get();
403             switch(current.getType())
404             {
405                 case IDLE:
406                 case FAILED:
407                     return;
408 
409                 case PENDING:
410                     PendingState pending = (PendingState)current;
411                     if (updateState(pending,__IDLE))
412                     {
413                         pending.fail(cause);
414                         return;
415                     }
416                     break;
417 
418                 default:
419                     if (updateState(current,new FailedState(cause)))
420                         return;
421                     break;
422             }
423         }
424     }
425 
426     public void onClose()
427     {
428         if (_state.get()==__IDLE)
429             return;
430         onFail(new ClosedChannelException());
431     }
432 
433     boolean isIdle()
434     {
435         return _state.get().getType() == StateType.IDLE;
436     }
437 
438     public boolean isInProgress()
439     {
440         switch(_state.get().getType())
441         {
442             case WRITING:
443             case PENDING:
444             case COMPLETING:
445                 return true;
446             default:
447                 return false;
448         }
449     }
450 
451     @Override
452     public String toString()
453     {
454         return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
455     }
456 }