View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.ClosedChannelException;
24  import java.nio.channels.WritePendingException;
25  import java.util.Arrays;
26  import java.util.EnumMap;
27  import java.util.EnumSet;
28  import java.util.Set;
29  import java.util.concurrent.atomic.AtomicReference;
30  
31  import org.eclipse.jetty.util.BufferUtil;
32  import org.eclipse.jetty.util.Callback;
33  import org.eclipse.jetty.util.log.Log;
34  import org.eclipse.jetty.util.log.Logger;
35  
36  
37  /**
38   * A Utility class to help implement {@link EndPoint#write(Callback, ByteBuffer...)} by calling
39   * {@link EndPoint#flush(ByteBuffer...)} until all content is written.
40   * The abstract method {@link #onIncompleteFlush()} is called when not all content has been written after a call to
41   * flush and should organize for the {@link #completeWrite()} method to be called when a subsequent call to flush
42   * should  be able to make more progress.
43   */
44  abstract public class WriteFlusher
45  {
46      private static final Logger LOG = Log.getLogger(WriteFlusher.class);
47      private static final boolean DEBUG = LOG.isDebugEnabled(); // Easy for the compiler to remove the code if DEBUG==false
48      private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[]{BufferUtil.EMPTY_BUFFER};
49      private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
50      private static final State __IDLE = new IdleState();
51      private static final State __WRITING = new WritingState();
52      private static final State __COMPLETING = new CompletingState();
53      private final EndPoint _endPoint;
54      private final AtomicReference<State> _state = new AtomicReference<>();
55  
56      static
57      {
58          // fill the state machine
59          __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING));
60          __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
61          __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE));
62          __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
63          __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
64      }
65  
66      // A write operation may either complete immediately:
67      //     IDLE-->WRITING-->IDLE
68      // Or it may not completely flush and go via the PENDING state
69      //     IDLE-->WRITING-->PENDING-->COMPLETING-->IDLE
70      // Or it may take several cycles to complete
71      //     IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE
72      //
73      // If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure.
74      // If a failure happens while in WRITING, but the the write has finished successfully or with an IOExceptions,
75      // the callback's complete or respectively failed methods will be called.
76      // If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state
77      //
78      //   IDLE--(fail)-->IDLE
79      //   IDLE-->WRITING--(fail)-->FAILED-->IDLE
80      //   IDLE-->WRITING-->PENDING--(fail)-->IDLE
81      //   IDLE-->WRITING-->PENDING-->COMPLETING--(fail)-->FAILED-->IDLE
82      //
83      // So a call to fail in the PENDING state will be directly handled and the state changed to IDLE
84      // A call to fail in the WRITING or COMPLETING states will just set the state to FAILED and the failure will be
85      // handled with the write or completeWrite methods try to move the state from what they thought it was.
86      //
87  
88      protected WriteFlusher(EndPoint endPoint)
89      {
90          _state.set(__IDLE);
91          _endPoint = endPoint;
92      }
93  
94      private enum StateType
95      {
96          IDLE,
97          WRITING,
98          PENDING,
99          COMPLETING,
100         FAILED
101     }
102 
103     /**
104      * Tries to update the current state to the given new state.
105      * @param previous the expected current state
106      * @param next the desired new state
107      * @return the previous state or null if the state transition failed
108      * @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error)
109      */
110     private boolean updateState(State previous,State next)
111     {
112         if (!isTransitionAllowed(previous,next))
113             throw new IllegalStateException();
114 
115         boolean updated = _state.compareAndSet(previous, next);
116         if (DEBUG)
117             LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next);
118         return updated;
119     }
120 
121     private void fail(PendingState pending)
122     {
123         State current = _state.get();
124         if (current.getType()==StateType.FAILED)
125         {
126             FailedState failed=(FailedState)current;
127             if (updateState(failed,__IDLE))
128             {
129                 pending.fail(failed.getCause());
130                 return;
131             }
132         }
133         throw new IllegalStateException();
134     }
135 
136     private void ignoreFail()
137     {
138         State current = _state.get();
139         while (current.getType()==StateType.FAILED)
140         {
141             if (updateState(current,__IDLE))
142                 return;
143             current = _state.get();
144         }
145     }
146 
147     private boolean isTransitionAllowed(State currentState, State newState)
148     {
149         Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
150         if (!allowedNewStateTypes.contains(newState.getType()))
151         {
152             LOG.warn("{}: {} -> {} not allowed", this, currentState, newState);
153             return false;
154         }
155         return true;
156     }
157 
158     /**
159      * State represents a State of WriteFlusher.
160      */
161     private static class State
162     {
163         private final StateType _type;
164 
165         private State(StateType stateType)
166         {
167             _type = stateType;
168         }
169 
170         public StateType getType()
171         {
172             return _type;
173         }
174 
175         @Override
176         public String toString()
177         {
178             return String.format("%s", _type);
179         }
180     }
181 
182     /**
183      * In IdleState WriteFlusher is idle and accepts new writes
184      */
185     private static class IdleState extends State
186     {
187         private IdleState()
188         {
189             super(StateType.IDLE);
190         }
191     }
192 
193     /**
194      * In WritingState WriteFlusher is currently writing.
195      */
196     private static class WritingState extends State
197     {
198         private WritingState()
199         {
200             super(StateType.WRITING);
201         }
202     }
203 
204     /**
205      * In FailedState no more operations are allowed. The current implementation will never recover from this state.
206      */
207     private static class FailedState extends State
208     {
209         private final Throwable _cause;
210         private FailedState(Throwable cause)
211         {
212             super(StateType.FAILED);
213             _cause=cause;
214         }
215 
216         public Throwable getCause()
217         {
218             return _cause;
219         }
220     }
221 
222     /**
223      * In CompletingState WriteFlusher is flushing buffers that have not been fully written in write(). If write()
224      * didn't flush all buffers in one go, it'll switch the State to PendingState. completeWrite() will then switch to
225      * this state and try to flush the remaining buffers.
226      */
227     private static class CompletingState extends State
228     {
229         private CompletingState()
230         {
231             super(StateType.COMPLETING);
232         }
233     }
234 
235     /**
236      * In PendingState not all buffers could be written in one go. Then write() will switch to PendingState() and
237      * preserve the state by creating a new PendingState object with the given parameters.
238      */
239     private class PendingState extends State
240     {
241         private final Callback _callback;
242         private final ByteBuffer[] _buffers;
243 
244         private PendingState(ByteBuffer[] buffers, Callback callback)
245         {
246             super(StateType.PENDING);
247             _buffers = buffers;
248             _callback = callback;
249         }
250 
251         public ByteBuffer[] getBuffers()
252         {
253             return _buffers;
254         }
255 
256         protected boolean fail(Throwable cause)
257         {
258             if (_callback!=null)
259             {
260                 _callback.failed(cause);
261                 return true;
262             }
263             return false;
264         }
265 
266         protected void complete()
267         {
268             if (_callback!=null)
269                 _callback.succeeded();
270         }
271         
272         boolean isCallbackNonBlocking()
273         {
274             return _callback!=null && _callback.isNonBlocking();
275         }
276     }
277 
278     public boolean isCallbackNonBlocking()
279     {
280         State s = _state.get();
281         return (s instanceof PendingState) && ((PendingState)s).isCallbackNonBlocking();
282     }
283     
284     /**
285      * Abstract call to be implemented by specific WriteFlushers. It should schedule a call to {@link #completeWrite()}
286      * or {@link #onFail(Throwable)} when appropriate.
287      */
288     abstract protected void onIncompleteFlush();
289 
290     /**
291      * Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition
292      * fails it'll fail the callback.
293      *
294      * If not all buffers can be written in one go it creates a new <code>PendingState</code> object to preserve the state
295      * and then calls {@link #onIncompleteFlush()}. The remaining buffers will be written in {@link #completeWrite()}.
296      *
297      * If all buffers have been written it calls callback.complete().
298      *
299      * @param callback the callback to call on either failed or complete
300      * @param buffers the buffers to flush to the endpoint
301      * @throws WritePendingException if unable to write due to prior pending write
302      */
303     public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
304     {
305         if (DEBUG)
306             LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
307 
308         if (!updateState(__IDLE,__WRITING))
309             throw new WritePendingException();
310 
311         try
312         {
313             buffers=flush(buffers);
314 
315             // if we are incomplete?
316             if (buffers!=null)
317             {
318                 if (DEBUG)
319                     LOG.debug("flushed incomplete");
320                 PendingState pending=new PendingState(buffers, callback);
321                 if (updateState(__WRITING,pending))
322                     onIncompleteFlush();
323                 else
324                     fail(pending);
325                 return;
326             }
327 
328             // If updateState didn't succeed, we don't care as our buffers have been written
329             if (!updateState(__WRITING,__IDLE))
330                 ignoreFail();
331             if (callback!=null)
332                 callback.succeeded();
333         }
334         catch (IOException e)
335         {
336             if (DEBUG)
337                 LOG.debug("write exception", e);
338             if (updateState(__WRITING,__IDLE))
339             {
340                 if (callback!=null)
341                     callback.failed(e);
342             }
343             else
344                 fail(new PendingState(buffers, callback));
345         }
346     }
347 
348 
349     /**
350      * Complete a write that has not completed and that called {@link #onIncompleteFlush()} to request a call to this
351      * method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress.
352      *
353      * It tries to switch from PENDING to COMPLETING. If state transition fails, then it does nothing as the callback
354      * should have been already failed. That's because the only way to switch from PENDING outside this method is
355      * {@link #onFail(Throwable)} or {@link #onClose()}
356      */
357     public void completeWrite()
358     {         
359         if (DEBUG)
360             LOG.debug("completeWrite: {}", this);
361 
362         State previous = _state.get();
363 
364         if (previous.getType()!=StateType.PENDING)
365             return; // failure already handled.
366 
367         PendingState pending = (PendingState)previous;
368         if (!updateState(pending,__COMPLETING))
369             return; // failure already handled.
370 
371         try
372         {
373             ByteBuffer[] buffers = pending.getBuffers();
374 
375             buffers=flush(buffers);
376 
377             // if we are incomplete?
378             if (buffers!=null)
379             {
380                 if (DEBUG)
381                     LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers));
382                 if (buffers!=pending.getBuffers())
383                     pending=new PendingState(buffers, pending._callback);
384                 if (updateState(__COMPLETING,pending))
385                     onIncompleteFlush();
386                 else
387                     fail(pending);
388                 return;
389             }
390 
391             // If updateState didn't succeed, we don't care as our buffers have been written
392             if (!updateState(__COMPLETING,__IDLE))
393                 ignoreFail();
394             pending.complete();
395         }
396         catch (IOException e)
397         {
398             if (DEBUG)
399                 LOG.debug("completeWrite exception", e);
400             if(updateState(__COMPLETING,__IDLE))
401                 pending.fail(e);
402             else
403                 fail(pending);
404         }
405     }
406 
407     /* ------------------------------------------------------------ */
408     /** Flush the buffers iteratively until no progress is made
409      * @param buffers The buffers to flush
410      * @return The unflushed buffers, or null if all flushed
411      * @throws IOException if unable to flush
412      */
413     protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
414     {
415         boolean progress=true;
416         while(progress && buffers!=null)
417         {
418             int before=buffers.length==0?0:buffers[0].remaining();
419             boolean flushed=_endPoint.flush(buffers);
420             int r=buffers.length==0?0:buffers[0].remaining();
421             
422             if (LOG.isDebugEnabled())
423                 LOG.debug("Flushed={} {}/{}+{} {}",flushed,before-r,before,buffers.length-1,this);
424             
425             if (flushed)
426                 return null;
427             
428             progress=before!=r;
429             
430             int not_empty=0;
431             while(r==0)
432             {
433                 if (++not_empty==buffers.length)
434                 {
435                     buffers=null;
436                     not_empty=0;
437                     break;
438                 }
439                 progress=true;
440                 r=buffers[not_empty].remaining();
441             }
442 
443             if (not_empty>0)
444                 buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
445         }        
446 
447         if (LOG.isDebugEnabled())
448             LOG.debug("!fully flushed {}",this);
449         
450         // If buffers is null, then flush has returned false but has consumed all the data!
451         // This is probably SSL being unable to flush the encrypted buffer, so return EMPTY_BUFFERS
452         // and that will keep this WriteFlusher pending.
453         return buffers==null?EMPTY_BUFFERS:buffers;
454     }
455     
456     /* ------------------------------------------------------------ */
457     /** Notify the flusher of a failure
458      * @param cause The cause of the failure
459      * @return true if the flusher passed the failure to a {@link Callback} instance
460      */
461     public boolean onFail(Throwable cause)
462     {
463         // Keep trying to handle the failure until we get to IDLE or FAILED state
464         while(true)
465         {
466             State current=_state.get();
467             switch(current.getType())
468             {
469                 case IDLE:
470                 case FAILED:
471                     if (DEBUG)
472                         LOG.debug("ignored: {} {}", this, cause);
473                     return false;
474 
475                 case PENDING:
476                     if (DEBUG)
477                         LOG.debug("failed: {} {}", this, cause);
478 
479                     PendingState pending = (PendingState)current;
480                     if (updateState(pending,__IDLE))
481                         return pending.fail(cause);
482                     break;
483 
484                 default:
485                     if (DEBUG)
486                         LOG.debug("failed: {} {}", this, cause);
487 
488                     if (updateState(current,new FailedState(cause)))
489                         return false;
490                     break;
491             }
492         }
493     }
494 
495     public void onClose()
496     {
497         if (_state.get()==__IDLE)
498             return;
499         onFail(new ClosedChannelException());
500     }
501 
502     boolean isIdle()
503     {
504         return _state.get().getType() == StateType.IDLE;
505     }
506 
507     public boolean isInProgress()
508     {
509         switch(_state.get().getType())
510         {
511             case WRITING:
512             case PENDING:
513             case COMPLETING:
514                 return true;
515             default:
516                 return false;
517         }
518     }
519 
520     @Override
521     public String toString()
522     {
523         return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
524     }
525     
526     public String toStateString()
527     {
528         switch(_state.get().getType())
529         {
530             case WRITING:
531                 return "W";
532             case PENDING:
533                 return "P";
534             case COMPLETING:
535                 return "C";
536             case IDLE:
537                 return "-";
538             case FAILED:
539                 return "F";
540             default:
541                 return "?";
542         }
543     }
544 }