View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.util.List;
22  import java.util.concurrent.CopyOnWriteArrayList;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.RejectedExecutionException;
25  import java.util.concurrent.TimeoutException;
26  import java.util.concurrent.atomic.AtomicReference;
27  
28  import org.eclipse.jetty.util.Callback;
29  import org.eclipse.jetty.util.log.Log;
30  import org.eclipse.jetty.util.log.Logger;
31  import org.eclipse.jetty.util.thread.NonBlockingThread;
32  
33  /**
34   * <p>A convenience base implementation of {@link Connection}.</p>
35   * <p>This class uses the capabilities of the {@link EndPoint} API to provide a
36   * more traditional style of async reading.  A call to {@link #fillInterested()}
37   * will schedule a callback to {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
38   * as appropriate.</p>
39   */
40  public abstract class AbstractConnection implements Connection
41  {
42      private static final Logger LOG = Log.getLogger(AbstractConnection.class);
43      
44      public static final boolean EXECUTE_ONFILLABLE=true;
45  
46      private final List<Listener> listeners = new CopyOnWriteArrayList<>();
47      private final AtomicReference<State> _state = new AtomicReference<>(IDLE);
48      private final long _created=System.currentTimeMillis();
49      private final EndPoint _endPoint;
50      private final Executor _executor;
51      private final Callback _readCallback;
52      private final boolean _executeOnfillable;
53      private int _inputBufferSize=2048;
54  
55      protected AbstractConnection(EndPoint endp, Executor executor)
56      {
57          this(endp,executor,EXECUTE_ONFILLABLE);
58      }
59      
60      protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
61      {
62          if (executor == null)
63              throw new IllegalArgumentException("Executor must not be null!");
64          _endPoint = endp;
65          _executor = executor;
66          _readCallback = new ReadCallback();
67          _executeOnfillable=executeOnfillable;
68          _state.set(IDLE);
69      }
70  
71      @Override
72      public void addListener(Listener listener)
73      {
74          listeners.add(listener);
75      }
76  
77      public int getInputBufferSize()
78      {
79          return _inputBufferSize;
80      }
81  
82      public void setInputBufferSize(int inputBufferSize)
83      {
84          _inputBufferSize = inputBufferSize;
85      }
86  
87      protected Executor getExecutor()
88      {
89          return _executor;
90      }
91      
92      protected void failedCallback(final Callback callback, final Throwable x)
93      {
94          if (NonBlockingThread.isNonBlockingThread())
95          {
96              try
97              {
98                  getExecutor().execute(new Runnable()
99                  {
100                     @Override
101                     public void run()
102                     {
103                         callback.failed(x);
104                     }
105                 });
106             }
107             catch(RejectedExecutionException e)
108             {
109                 LOG.debug(e);
110                 callback.failed(x);
111             }
112         }
113         else
114         {
115             callback.failed(x);
116         }
117     }
118     
119     /**
120      * <p>Utility method to be called to register read interest.</p>
121      * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
122      * will be called back as appropriate.</p>
123      * @see #onFillable()
124      */
125     public void fillInterested()
126     {
127         if (LOG.isDebugEnabled())
128             LOG.debug("fillInterested {}",this);
129         
130         while(true)
131         {
132             State state=_state.get();
133             if (next(state,state.fillInterested()))
134                 break;
135         }
136     }
137     
138     public void fillInterested(Callback callback)
139     {
140         if (LOG.isDebugEnabled())
141             LOG.debug("fillInterested {}",this);
142 
143         while(true)
144         {
145             State state=_state.get();
146             // TODO yuck
147             if (state instanceof FillingInterestedCallback && ((FillingInterestedCallback)state)._callback==callback)
148                 break;
149             State next=new FillingInterestedCallback(callback,state);
150             if (next(state,next))
151                 break;
152         }
153     }
154     
155     /**
156      * <p>Callback method invoked when the endpoint is ready to be read.</p>
157      * @see #fillInterested()
158      */
159     public abstract void onFillable();
160 
161     /**
162      * <p>Callback method invoked when the endpoint failed to be ready to be read.</p>
163      * @param cause the exception that caused the failure
164      */
165     protected void onFillInterestedFailed(Throwable cause)
166     {
167         if (LOG.isDebugEnabled())
168             LOG.debug("{} onFillInterestedFailed {}", this, cause);
169         if (_endPoint.isOpen())
170         {
171             boolean close = true;
172             if (cause instanceof TimeoutException)
173                 close = onReadTimeout();
174             if (close)
175             {
176                 if (_endPoint.isOutputShutdown())
177                     _endPoint.close();
178                 else
179                     _endPoint.shutdownOutput();
180             }
181         }
182 
183         if (_endPoint.isOpen())
184             fillInterested();        
185     }
186 
187     /**
188      * <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p>
189      * @return true to signal that the endpoint must be closed, false to keep the endpoint open
190      */
191     protected boolean onReadTimeout()
192     {
193         return true;
194     }
195 
196     @Override
197     public void onOpen()
198     {
199         if (LOG.isDebugEnabled())
200             LOG.debug("onOpen {}", this);
201 
202         for (Listener listener : listeners)
203             listener.onOpened(this);
204     }
205 
206     @Override
207     public void onClose()
208     {
209         if (LOG.isDebugEnabled())
210             LOG.debug("onClose {}",this);
211 
212         for (Listener listener : listeners)
213             listener.onClosed(this);
214     }
215 
216     @Override
217     public EndPoint getEndPoint()
218     {
219         return _endPoint;
220     }
221 
222     @Override
223     public void close()
224     {
225         getEndPoint().close();
226     }
227 
228     @Override
229     public int getMessagesIn()
230     {
231         return -1;
232     }
233 
234     @Override
235     public int getMessagesOut()
236     {
237         return -1;
238     }
239 
240     @Override
241     public long getBytesIn()
242     {
243         return -1;
244     }
245 
246     @Override
247     public long getBytesOut()
248     {
249         return -1;
250     }
251 
252     @Override
253     public long getCreatedTimeStamp()
254     {
255         return _created;
256     }
257 
258     @Override
259     public String toString()
260     {
261         return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), _state.get());
262     }
263     
264     public boolean next(State state, State next)
265     {
266         if (next==null)
267             return true;
268         if(_state.compareAndSet(state,next))
269         {
270             if (LOG.isDebugEnabled())
271                 LOG.debug("{}-->{} {}",state,next,this);
272             if (next!=state)
273                 next.onEnter(AbstractConnection.this);
274             return true;
275         }
276         return false;
277     }
278     
279     private static final class IdleState extends State
280     {
281         private IdleState()
282         {
283             super("IDLE");
284         }
285 
286         @Override
287         State fillInterested()
288         {
289             return FILL_INTERESTED;
290         }
291     }
292 
293 
294     private static final class FillInterestedState extends State
295     {
296         private FillInterestedState()
297         {
298             super("FILL_INTERESTED");
299         }
300 
301         @Override
302         public void onEnter(AbstractConnection connection)
303         {
304             connection.getEndPoint().fillInterested(connection._readCallback);
305         }
306 
307         @Override
308         State fillInterested()
309         {
310             return this;
311         }
312 
313         @Override
314         public State onFillable()
315         {
316             return FILLING;
317         }
318 
319         @Override
320         State onFailed()
321         {
322             return IDLE;
323         }
324     }
325 
326 
327     private static final class RefillingState extends State
328     {
329         private RefillingState()
330         {
331             super("REFILLING");
332         }
333 
334         @Override
335         State fillInterested()
336         {
337             return FILLING_FILL_INTERESTED;
338         }
339 
340         @Override
341         public State onFilled()
342         {
343             return IDLE;
344         }
345     }
346 
347 
348     private static final class FillingFillInterestedState extends State
349     {
350         private FillingFillInterestedState(String name)
351         {
352             super(name);
353         }
354 
355         @Override
356         State fillInterested()
357         {
358             return this;
359         }
360 
361         State onFilled()
362         {
363             return FILL_INTERESTED;
364         }
365     }
366 
367 
368     private static final class FillingState extends State
369     {
370         private FillingState()
371         {
372             super("FILLING");
373         }
374 
375         @Override
376         public void onEnter(AbstractConnection connection)
377         {
378             if (connection._executeOnfillable)
379                 connection.getExecutor().execute(connection._runOnFillable);
380             else
381                 connection._runOnFillable.run();
382         }
383 
384         @Override
385         State fillInterested()
386         {
387             return FILLING_FILL_INTERESTED;
388         }
389 
390         @Override
391         public State onFilled()
392         {
393             return IDLE;
394         }
395     }
396 
397 
398     public static class State
399     {
400         private final String _name;
401         State(String name)
402         {
403             _name=name;
404         }
405 
406         @Override
407         public String toString()
408         {
409             return _name;
410         }
411         
412         void onEnter(AbstractConnection connection)
413         {
414         }
415         
416         State fillInterested()
417         {
418             throw new IllegalStateException(this.toString());
419         }
420 
421         State onFillable()
422         {
423             throw new IllegalStateException(this.toString());
424         }
425 
426         State onFilled()
427         {
428             throw new IllegalStateException(this.toString());
429         }
430         
431         State onFailed()
432         {
433             throw new IllegalStateException(this.toString());
434         }
435     }
436     
437 
438     public static final State IDLE=new IdleState();
439     
440     public static final State FILL_INTERESTED=new FillInterestedState();
441     
442     public static final State FILLING=new FillingState();
443     
444     public static final State REFILLING=new RefillingState();
445 
446     public static final State FILLING_FILL_INTERESTED=new FillingFillInterestedState("FILLING_FILL_INTERESTED");
447     
448     public class NestedState extends State
449     {
450         private final State _nested;
451         
452         NestedState(State nested)
453         {
454             super("NESTED("+nested+")");
455             _nested=nested;
456         }
457         NestedState(String name,State nested)
458         {
459             super(name+"("+nested+")");
460             _nested=nested;
461         }
462 
463         @Override
464         State fillInterested()
465         {
466             return new NestedState(_nested.fillInterested());
467         }
468 
469         @Override
470         State onFillable()
471         {
472             return new NestedState(_nested.onFillable());
473         }
474         
475         @Override
476         State onFilled()
477         {
478             return new NestedState(_nested.onFilled());
479         }
480     }
481     
482     
483     public class FillingInterestedCallback extends NestedState
484     {
485         private final Callback _callback;
486         
487         FillingInterestedCallback(Callback callback,State nested)
488         {
489             super("FILLING_INTERESTED_CALLBACK",nested==FILLING?REFILLING:nested);
490             _callback=callback;
491         }
492 
493         @Override
494         void onEnter(final AbstractConnection connection)
495         {
496             Callback callback=new Callback()
497             {
498                 @Override
499                 public void succeeded()
500                 {
501                     while(true)
502                     {
503                         State state = connection._state.get();
504                         if (!(state instanceof NestedState))
505                             break;
506                         State nested=((NestedState)state)._nested;
507                         if (connection.next(state,nested))
508                             break;
509                     }
510                     _callback.succeeded();
511                 }
512 
513                 @Override
514                 public void failed(Throwable x)
515                 {
516                     while(true)
517                     {
518                         State state = connection._state.get();
519                         if (!(state instanceof NestedState))
520                             break;
521                         State nested=((NestedState)state)._nested;
522                         if (connection.next(state,nested))
523                             break;
524                     }
525                     _callback.failed(x);
526                 }  
527             };
528             
529             connection.getEndPoint().fillInterested(callback);
530         }
531     }
532     
533     private final Runnable _runOnFillable = new Runnable()
534     {
535         @Override
536         public void run()
537         {
538             try
539             {
540                 onFillable();
541             }
542             finally
543             {
544                 while(true)
545                 {
546                     State state=_state.get();
547                     if (next(state,state.onFilled()))
548                         break;
549                 }
550             }
551         }
552     };
553     
554     
555     private class ReadCallback implements Callback
556     {   
557         @Override
558         public void succeeded()
559         {
560             while(true)
561             {
562                 State state=_state.get();
563                 if (next(state,state.onFillable()))
564                     break;
565             }
566         }
567 
568         @Override
569         public void failed(final Throwable x)
570         {
571             _executor.execute(new Runnable()
572             {
573                 @Override
574                 public void run()
575                 {
576                     while(true)
577                     {
578                         State state=_state.get();
579                         if (next(state,state.onFailed()))
580                             break;
581                     }
582                     onFillInterestedFailed(x);
583                 }
584             });
585         }
586         
587         @Override
588         public String toString()
589         {
590             return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this);
591         }
592     };
593 }