View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.util.List;
22  import java.util.concurrent.CopyOnWriteArrayList;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.TimeoutException;
25  import java.util.concurrent.atomic.AtomicReference;
26  
27  import org.eclipse.jetty.util.Callback;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  
31  /**
32   * <p>A convenience base implementation of {@link Connection}.</p>
33   * <p>This class uses the capabilities of the {@link EndPoint} API to provide a
34   * more traditional style of async reading.  A call to {@link #fillInterested()}
35   * will schedule a callback to {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
36   * as appropriate.</p>
37   */
38  public abstract class AbstractConnection implements Connection
39  {
40      private static final Logger LOG = Log.getLogger(AbstractConnection.class);
41      
42      public static final boolean EXECUTE_ONFILLABLE=true;
43  
44      private final List<Listener> listeners = new CopyOnWriteArrayList<>();
45      private final AtomicReference<State> _state = new AtomicReference<>(IDLE);
46      private final long _created=System.currentTimeMillis();
47      private final EndPoint _endPoint;
48      private final Executor _executor;
49      private final Callback _readCallback;
50      private final boolean _executeOnfillable;
51      private int _inputBufferSize=2048;
52  
53      protected AbstractConnection(EndPoint endp, Executor executor)
54      {
55          this(endp,executor,EXECUTE_ONFILLABLE);
56      }
57      
58      protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
59      {
60          if (executor == null)
61              throw new IllegalArgumentException("Executor must not be null!");
62          _endPoint = endp;
63          _executor = executor;
64          _readCallback = new ReadCallback();
65          _executeOnfillable=executeOnfillable;
66          _state.set(IDLE);
67      }
68  
69      @Override
70      public void addListener(Listener listener)
71      {
72          listeners.add(listener);
73      }
74  
75      public int getInputBufferSize()
76      {
77          return _inputBufferSize;
78      }
79  
80      public void setInputBufferSize(int inputBufferSize)
81      {
82          _inputBufferSize = inputBufferSize;
83      }
84  
85      protected Executor getExecutor()
86      {
87          return _executor;
88      }
89      
90      /**
91       * <p>Utility method to be called to register read interest.</p>
92       * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
93       * will be called back as appropriate.</p>
94       * @see #onFillable()
95       */
96      public void fillInterested()
97      {
98          LOG.debug("fillInterested {}",this);           
99          
100         while(true)
101         {
102             State state=_state.get();
103             if (next(state,state.fillInterested()))
104                 break;
105         }
106     }
107     
108     public void fillInterested(Callback callback)
109     {
110         LOG.debug("fillInterested {}",this);
111 
112         while(true)
113         {
114             State state=_state.get();
115             // TODO yuck
116             if (state instanceof FillingInterestedCallback && ((FillingInterestedCallback)state)._callback==callback)
117                 break;
118             State next=new FillingInterestedCallback(callback,state);
119             if (next(state,next))
120                 break;
121         }
122     }
123     
124     /**
125      * <p>Callback method invoked when the endpoint is ready to be read.</p>
126      * @see #fillInterested()
127      */
128     public abstract void onFillable();
129 
130     /**
131      * <p>Callback method invoked when the endpoint failed to be ready to be read.</p>
132      * @param cause the exception that caused the failure
133      */
134     protected void onFillInterestedFailed(Throwable cause)
135     {
136         LOG.debug("{} onFillInterestedFailed {}", this, cause);
137         if (_endPoint.isOpen())
138         {
139             boolean close = true;
140             if (cause instanceof TimeoutException)
141                 close = onReadTimeout();
142             if (close)
143             {
144                 if (_endPoint.isOutputShutdown())
145                     _endPoint.close();
146                 else
147                     _endPoint.shutdownOutput();
148             }
149         }
150 
151         if (_endPoint.isOpen())
152             fillInterested();        
153     }
154 
155     /**
156      * <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p>
157      * @return true to signal that the endpoint must be closed, false to keep the endpoint open
158      */
159     protected boolean onReadTimeout()
160     {
161         return true;
162     }
163 
164     @Override
165     public void onOpen()
166     {
167         LOG.debug("onOpen {}", this);
168 
169         for (Listener listener : listeners)
170             listener.onOpened(this);
171     }
172 
173     @Override
174     public void onClose()
175     {
176         LOG.debug("onClose {}",this);
177 
178         for (Listener listener : listeners)
179             listener.onClosed(this);
180     }
181 
182     @Override
183     public EndPoint getEndPoint()
184     {
185         return _endPoint;
186     }
187 
188     @Override
189     public void close()
190     {
191         getEndPoint().close();
192     }
193 
194     @Override
195     public int getMessagesIn()
196     {
197         return -1;
198     }
199 
200     @Override
201     public int getMessagesOut()
202     {
203         return -1;
204     }
205 
206     @Override
207     public long getBytesIn()
208     {
209         return -1;
210     }
211 
212     @Override
213     public long getBytesOut()
214     {
215         return -1;
216     }
217 
218     @Override
219     public long getCreatedTimeStamp()
220     {
221         return _created;
222     }
223 
224     @Override
225     public String toString()
226     {
227         return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), _state.get());
228     }
229     
230     public boolean next(State state, State next)
231     {
232         if (next==null)
233             return true;
234         if(_state.compareAndSet(state,next))
235         {
236             LOG.debug("{}-->{} {}",state,next,this);
237             if (next!=state)
238                 next.onEnter(AbstractConnection.this);
239             return true;
240         }
241         return false;
242     }
243     
244     private static final class IdleState extends State
245     {
246         private IdleState()
247         {
248             super("IDLE");
249         }
250 
251         @Override
252         State fillInterested()
253         {
254             return FILL_INTERESTED;
255         }
256     }
257 
258 
259     private static final class FillInterestedState extends State
260     {
261         private FillInterestedState()
262         {
263             super("FILL_INTERESTED");
264         }
265 
266         @Override
267         public void onEnter(AbstractConnection connection)
268         {
269             connection.getEndPoint().fillInterested(connection._readCallback);
270         }
271 
272         @Override
273         State fillInterested()
274         {
275             return this;
276         }
277 
278         @Override
279         public State onFillable()
280         {
281             return FILLING;
282         }
283 
284         @Override
285         State onFailed()
286         {
287             return IDLE;
288         }
289     }
290 
291 
292     private static final class RefillingState extends State
293     {
294         private RefillingState()
295         {
296             super("REFILLING");
297         }
298 
299         @Override
300         State fillInterested()
301         {
302             return FILLING_FILL_INTERESTED;
303         }
304 
305         @Override
306         public State onFilled()
307         {
308             return IDLE;
309         }
310     }
311 
312 
313     private static final class FillingFillInterestedState extends State
314     {
315         private FillingFillInterestedState(String name)
316         {
317             super(name);
318         }
319 
320         @Override
321         State fillInterested()
322         {
323             return this;
324         }
325 
326         State onFilled()
327         {
328             return FILL_INTERESTED;
329         }
330     }
331 
332 
333     private static final class FillingState extends State
334     {
335         private FillingState()
336         {
337             super("FILLING");
338         }
339 
340         @Override
341         public void onEnter(AbstractConnection connection)
342         {
343             if (connection._executeOnfillable)
344                 connection.getExecutor().execute(connection._runOnFillable);
345             else
346                 connection._runOnFillable.run();
347         }
348 
349         @Override
350         State fillInterested()
351         {
352             return FILLING_FILL_INTERESTED;
353         }
354 
355         @Override
356         public State onFilled()
357         {
358             return IDLE;
359         }
360     }
361 
362 
363     public static class State
364     {
365         private final String _name;
366         State(String name)
367         {
368             _name=name;
369         }
370 
371         @Override
372         public String toString()
373         {
374             return _name;
375         }
376         
377         void onEnter(AbstractConnection connection)
378         {
379         }
380         
381         State fillInterested()
382         {
383             throw new IllegalStateException(this.toString());
384         }
385 
386         State onFillable()
387         {
388             throw new IllegalStateException(this.toString());
389         }
390 
391         State onFilled()
392         {
393             throw new IllegalStateException(this.toString());
394         }
395         
396         State onFailed()
397         {
398             throw new IllegalStateException(this.toString());
399         }
400     }
401     
402 
403     public static final State IDLE=new IdleState();
404     
405     public static final State FILL_INTERESTED=new FillInterestedState();
406     
407     public static final State FILLING=new FillingState();
408     
409     public static final State REFILLING=new RefillingState();
410 
411     public static final State FILLING_FILL_INTERESTED=new FillingFillInterestedState("FILLING_FILL_INTERESTED");
412     
413     public class NestedState extends State
414     {
415         private final State _nested;
416         
417         NestedState(State nested)
418         {
419             super("NESTED("+nested+")");
420             _nested=nested;
421         }
422         NestedState(String name,State nested)
423         {
424             super(name+"("+nested+")");
425             _nested=nested;
426         }
427 
428         @Override
429         State fillInterested()
430         {
431             return new NestedState(_nested.fillInterested());
432         }
433 
434         @Override
435         State onFillable()
436         {
437             return new NestedState(_nested.onFillable());
438         }
439         
440         @Override
441         State onFilled()
442         {
443             return new NestedState(_nested.onFilled());
444         }
445     }
446     
447     
448     public class FillingInterestedCallback extends NestedState
449     {
450         private final Callback _callback;
451         
452         FillingInterestedCallback(Callback callback,State nested)
453         {
454             super("FILLING_INTERESTED_CALLBACK",nested==FILLING?REFILLING:nested);
455             _callback=callback;
456         }
457 
458         @Override
459         void onEnter(final AbstractConnection connection)
460         {
461             Callback callback=new Callback()
462             {
463                 @Override
464                 public void succeeded()
465                 {
466                     while(true)
467                     {
468                         State state = connection._state.get();
469                         if (!(state instanceof NestedState))
470                             break;
471                         State nested=((NestedState)state)._nested;
472                         if (connection.next(state,nested))
473                             break;
474                     }
475                     _callback.succeeded();
476                 }
477 
478                 @Override
479                 public void failed(Throwable x)
480                 {
481                     while(true)
482                     {
483                         State state = connection._state.get();
484                         if (!(state instanceof NestedState))
485                             break;
486                         State nested=((NestedState)state)._nested;
487                         if (connection.next(state,nested))
488                             break;
489                     }
490                     _callback.failed(x);
491                 }  
492             };
493             
494             connection.getEndPoint().fillInterested(callback);
495         }
496     }
497     
498     private final Runnable _runOnFillable = new Runnable()
499     {
500         @Override
501         public void run()
502         {
503             try
504             {
505                 onFillable();
506             }
507             finally
508             {
509                 while(true)
510                 {
511                     State state=_state.get();
512                     if (next(state,state.onFilled()))
513                         break;
514                 }
515             }
516         }
517     };
518     
519     
520     private class ReadCallback implements Callback
521     {   
522         @Override
523         public void succeeded()
524         {
525             while(true)
526             {
527                 State state=_state.get();
528                 if (next(state,state.onFillable()))
529                     break;
530             }
531         }
532 
533         @Override
534         public void failed(final Throwable x)
535         {
536             _executor.execute(new Runnable()
537             {
538                 @Override
539                 public void run()
540                 {
541             while(true)
542             {
543                 State state=_state.get();
544                 if (next(state,state.onFailed()))
545                     break;
546             }
547                     onFillInterestedFailed(x);
548                 }
549             });
550         }
551         
552         @Override
553         public String toString()
554         {
555             return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this);
556         }
557     };
558 }