View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.ClosedChannelException;
24  import java.nio.channels.WritePendingException;
25  import java.util.Arrays;
26  import java.util.EnumMap;
27  import java.util.EnumSet;
28  import java.util.Set;
29  import java.util.concurrent.atomic.AtomicReference;
30  
31  import org.eclipse.jetty.util.BufferUtil;
32  import org.eclipse.jetty.util.Callback;
33  import org.eclipse.jetty.util.log.Log;
34  import org.eclipse.jetty.util.log.Logger;
35  
36  
37  /**
38   * A Utility class to help implement {@link EndPoint#write(Callback, ByteBuffer...)} by calling
39   * {@link EndPoint#flush(ByteBuffer...)} until all content is written.
40   * The abstract method {@link #onIncompleteFlushed()} is called when not all content has been written after a call to
41   * flush and should organise for the {@link #completeWrite()} method to be called when a subsequent call to flush
42   * should  be able to make more progress.
43   * <p>
44   */
45  abstract public class WriteFlusher
46  {
47      private static final Logger LOG = Log.getLogger(WriteFlusher.class);
48      private static final boolean DEBUG = LOG.isDebugEnabled(); // Easy for the compiler to remove the code if DEBUG==false
49      private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[0];
50      private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
51      private static final State __IDLE = new IdleState();
52      private static final State __WRITING = new WritingState();
53      private static final State __COMPLETING = new CompletingState();
54      private final EndPoint _endPoint;
55      private final AtomicReference<State> _state = new AtomicReference<>();
56  
57      static
58      {
59          // fill the state machine
60          __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING));
61          __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
62          __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE));
63          __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
64          __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
65      }
66  
67      // A write operation may either complete immediately:
68      //     IDLE-->WRITING-->IDLE
69      // Or it may not completely flush and go via the PENDING state
70      //     IDLE-->WRITING-->PENDING-->COMPLETING-->IDLE
71      // Or it may take several cycles to complete
72      //     IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE
73      //
74      // If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure.
75      // If a failure happens while in WRITING, but the the write has finished successfully or with an IOExceptions,
76      // the callback's complete or respectively failed methods will be called.
77      // If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state
78      //
79      //   IDLE--(fail)-->IDLE
80      //   IDLE-->WRITING--(fail)-->FAILED-->IDLE
81      //   IDLE-->WRITING-->PENDING--(fail)-->IDLE
82      //   IDLE-->WRITING-->PENDING-->COMPLETING--(fail)-->FAILED-->IDLE
83      //
84      // So a call to fail in the PENDING state will be directly handled and the state changed to IDLE
85      // A call to fail in the WRITING or COMPLETING states will just set the state to FAILED and the failure will be
86      // handled with the write or completeWrite methods try to move the state from what they thought it was.
87      //
88  
89      protected WriteFlusher(EndPoint endPoint)
90      {
91          _state.set(__IDLE);
92          _endPoint = endPoint;
93      }
94  
95      private enum StateType
96      {
97          IDLE,
98          WRITING,
99          PENDING,
100         COMPLETING,
101         FAILED
102     }
103 
104     /**
105      * Tries to update the current state to the given new state.
106      * @param previous the expected current state
107      * @param next the desired new state
108      * @return the previous state or null if the state transition failed
109      * @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error)
110      */
111     private boolean updateState(State previous,State next)
112     {
113         if (!isTransitionAllowed(previous,next))
114             throw new IllegalStateException();
115 
116         boolean updated = _state.compareAndSet(previous, next);
117         if (DEBUG)
118             LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next);
119         return updated;
120     }
121 
122     private void fail(PendingState pending)
123     {
124         State current = _state.get();
125         if (current.getType()==StateType.FAILED)
126         {
127             FailedState failed=(FailedState)current;
128             if (updateState(failed,__IDLE))
129             {
130                 pending.fail(failed.getCause());
131                 return;
132             }
133         }
134         throw new IllegalStateException();
135     }
136 
137     private void ignoreFail()
138     {
139         State current = _state.get();
140         while (current.getType()==StateType.FAILED)
141         {
142             if (updateState(current,__IDLE))
143                 return;
144             current = _state.get();
145         }
146     }
147 
148     private boolean isTransitionAllowed(State currentState, State newState)
149     {
150         Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
151         if (!allowedNewStateTypes.contains(newState.getType()))
152         {
153             LOG.warn("{}: {} -> {} not allowed", this, currentState, newState);
154             return false;
155         }
156         return true;
157     }
158 
159     /**
160      * State represents a State of WriteFlusher.
161      */
162     private static class State
163     {
164         private final StateType _type;
165 
166         private State(StateType stateType)
167         {
168             _type = stateType;
169         }
170 
171         public StateType getType()
172         {
173             return _type;
174         }
175 
176         @Override
177         public String toString()
178         {
179             return String.format("%s", _type);
180         }
181     }
182 
183     /**
184      * In IdleState WriteFlusher is idle and accepts new writes
185      */
186     private static class IdleState extends State
187     {
188         private IdleState()
189         {
190             super(StateType.IDLE);
191         }
192     }
193 
194     /**
195      * In WritingState WriteFlusher is currently writing.
196      */
197     private static class WritingState extends State
198     {
199         private WritingState()
200         {
201             super(StateType.WRITING);
202         }
203     }
204 
205     /**
206      * In FailedState no more operations are allowed. The current implementation will never recover from this state.
207      */
208     private static class FailedState extends State
209     {
210         private final Throwable _cause;
211         private FailedState(Throwable cause)
212         {
213             super(StateType.FAILED);
214             _cause=cause;
215         }
216 
217         public Throwable getCause()
218         {
219             return _cause;
220         }
221     }
222 
223     /**
224      * In CompletingState WriteFlusher is flushing buffers that have not been fully written in write(). If write()
225      * didn't flush all buffers in one go, it'll switch the State to PendingState. completeWrite() will then switch to
226      * this state and try to flush the remaining buffers.
227      */
228     private static class CompletingState extends State
229     {
230         private CompletingState()
231         {
232             super(StateType.COMPLETING);
233         }
234     }
235 
236     /**
237      * In PendingState not all buffers could be written in one go. Then write() will switch to PendingState() and
238      * preserve the state by creating a new PendingState object with the given parameters.
239      */
240     private class PendingState extends State
241     {
242         private final Callback _callback;
243         private final ByteBuffer[] _buffers;
244 
245         private PendingState(ByteBuffer[] buffers, Callback callback)
246         {
247             super(StateType.PENDING);
248             _buffers = compact(buffers);
249             _callback = callback;
250         }
251 
252         public ByteBuffer[] getBuffers()
253         {
254             return _buffers;
255         }
256 
257         protected boolean fail(Throwable cause)
258         {
259             if (_callback!=null)
260             {
261                 _callback.failed(cause);
262                 return true;
263             }
264             return false;
265         }
266 
267         protected void complete()
268         {
269             if (_callback!=null)
270                 _callback.succeeded();
271         }
272 
273         /**
274          * Compacting the buffers is needed because the semantic of WriteFlusher is
275          * to write the buffers and if the caller sees that the buffer is consumed,
276          * then it can recycle it.
277          * If we do not compact, then it is possible that we store a consumed buffer,
278          * which is then recycled and refilled; when the WriteFlusher is invoked to
279          * complete the write, it will write the refilled bytes, garbling the content.
280          *
281          * @param buffers the buffers to compact
282          * @return the compacted buffers
283          */
284         private ByteBuffer[] compact(ByteBuffer[] buffers)
285         {
286             int length = buffers.length;
287 
288             // Just one element, no need to compact
289             if (length < 2)
290                 return buffers;
291 
292             // How many still have content ?
293             int consumed = 0;
294             while (consumed < length && BufferUtil.isEmpty(buffers[consumed]))
295                 ++consumed;
296 
297             // All of them still have content, no need to compact
298             if (consumed == 0)
299                 return buffers;
300 
301             // None has content, return empty
302             if (consumed == length)
303                 return EMPTY_BUFFERS;
304 
305             return Arrays.copyOfRange(buffers,consumed,length);
306         }
307     }
308 
309     /**
310      * Abstract call to be implemented by specific WriteFlushers. It should schedule a call to {@link #completeWrite()}
311      * or {@link #onFail(Throwable)} when appropriate.
312      */
313     abstract protected void onIncompleteFlushed();
314 
315     /**
316      * Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition
317      * fails it'll fail the callback.
318      *
319      * If not all buffers can be written in one go it creates a new <code>PendingState</code> object to preserve the state
320      * and then calls {@link #onIncompleteFlushed()}. The remaining buffers will be written in {@link #completeWrite()}.
321      *
322      * If all buffers have been written it calls callback.complete().
323      *
324      * @param callback the callback to call on either failed or complete
325      * @param buffers the buffers to flush to the endpoint
326      */
327     public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
328     {
329         if (DEBUG)
330             LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
331 
332         if (!updateState(__IDLE,__WRITING))
333             throw new WritePendingException();
334 
335         try
336         {
337             buffers=flush(buffers);
338 
339             // if we are incomplete?
340             if (buffers!=null)
341             {
342                 if (DEBUG)
343                     LOG.debug("flushed incomplete");
344                 PendingState pending=new PendingState(buffers, callback);
345                 if (updateState(__WRITING,pending))
346                     onIncompleteFlushed();
347                 else
348                     fail(pending);
349                 return;
350             }
351 
352             // If updateState didn't succeed, we don't care as our buffers have been written
353             if (!updateState(__WRITING,__IDLE))
354                 ignoreFail();
355             if (callback!=null)
356                 callback.succeeded();
357         }
358         catch (IOException e)
359         {
360             if (DEBUG)
361                 LOG.debug("write exception", e);
362             if (updateState(__WRITING,__IDLE))
363             {
364                 if (callback!=null)
365                     callback.failed(e);
366             }
367             else
368                 fail(new PendingState(buffers, callback));
369         }
370     }
371 
372 
373     /**
374      * Complete a write that has not completed and that called {@link #onIncompleteFlushed()} to request a call to this
375      * method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress.
376      *
377      * It tries to switch from PENDING to COMPLETING. If state transition fails, then it does nothing as the callback
378      * should have been already failed. That's because the only way to switch from PENDING outside this method is
379      * {@link #onFail(Throwable)} or {@link #onClose()}
380      */
381     public void completeWrite()
382     {         
383         if (DEBUG)
384             LOG.debug("completeWrite: {}", this);
385 
386         State previous = _state.get();
387 
388         if (previous.getType()!=StateType.PENDING)
389             return; // failure already handled.
390 
391         PendingState pending = (PendingState)previous;
392         if (!updateState(pending,__COMPLETING))
393             return; // failure already handled.
394 
395         try
396         {
397             ByteBuffer[] buffers = pending.getBuffers();
398 
399             buffers=flush(buffers);
400 
401             // if we are incomplete?
402             if (buffers!=null)
403             {
404                 if (DEBUG)
405                     LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers));
406                 if (buffers!=pending.getBuffers())
407                     pending=new PendingState(buffers, pending._callback);
408                 if (updateState(__COMPLETING,pending))
409                     onIncompleteFlushed();
410                 else
411                     fail(pending);
412                 return;
413             }
414 
415             // If updateState didn't succeed, we don't care as our buffers have been written
416             if (!updateState(__COMPLETING,__IDLE))
417                 ignoreFail();
418             pending.complete();
419         }
420         catch (IOException e)
421         {
422             if (DEBUG)
423                 LOG.debug("completeWrite exception", e);
424             if(updateState(__COMPLETING,__IDLE))
425                 pending.fail(e);
426             else
427                 fail(pending);
428         }
429     }
430 
431     /* ------------------------------------------------------------ */
432     /** Flush the buffers iteratively until no progress is made
433      * @param buffers The buffers to flush
434      * @return The unflushed buffers, or null if all flushed
435      * @throws IOException
436      */
437     protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
438     {
439         // try the simple direct flush first, which also ensures that any null buffer
440         // flushes are passed through (for commits etc.)
441         if (_endPoint.flush(buffers))
442             return null;
443         
444         // We were not fully flushed, so let's try again iteratively while we can make
445         // some progress
446         boolean progress=true;
447         while(true)
448         {
449             // Compact buffers array?
450             int not_empty=0;
451             while(not_empty<buffers.length && BufferUtil.isEmpty(buffers[not_empty]))
452                 not_empty++;
453             if (not_empty==buffers.length)
454                 return null;
455             if (not_empty>0)
456                 buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
457             
458             if (!progress)
459                 break;
460             
461             // try to flush the remainder
462             int r=buffers[0].remaining();
463             if (_endPoint.flush(buffers))
464                 return null;
465             progress=r!=buffers[0].remaining();
466         }
467         
468         return buffers;
469     }
470     
471     /* ------------------------------------------------------------ */
472     /** Notify the flusher of a failure
473      * @param cause The cause of the failure
474      * @return true if the flusher passed the failure to a {@link Callback} instance
475      */
476     public boolean onFail(Throwable cause)
477     {
478         // Keep trying to handle the failure until we get to IDLE or FAILED state
479         while(true)
480         {
481             State current=_state.get();
482             switch(current.getType())
483             {
484                 case IDLE:
485                 case FAILED:
486                     if (DEBUG)
487                         LOG.debug("ignored: {} {}", this, cause);
488                     return false;
489 
490                 case PENDING:
491                     if (DEBUG)
492                         LOG.debug("failed: {} {}", this, cause);
493 
494                     PendingState pending = (PendingState)current;
495                     if (updateState(pending,__IDLE))
496                         return pending.fail(cause);
497                     break;
498 
499                 default:
500                     if (DEBUG)
501                         LOG.debug("failed: {} {}", this, cause);
502 
503                     if (updateState(current,new FailedState(cause)))
504                         return false;
505                     break;
506             }
507         }
508     }
509 
510     public void onClose()
511     {
512         if (_state.get()==__IDLE)
513             return;
514         onFail(new ClosedChannelException());
515     }
516 
517     boolean isIdle()
518     {
519         return _state.get().getType() == StateType.IDLE;
520     }
521 
522     public boolean isInProgress()
523     {
524         switch(_state.get().getType())
525         {
526             case WRITING:
527             case PENDING:
528             case COMPLETING:
529                 return true;
530             default:
531                 return false;
532         }
533     }
534 
535     @Override
536     public String toString()
537     {
538         return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
539     }
540 }