View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.util.List;
22  import java.util.concurrent.CopyOnWriteArrayList;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.TimeoutException;
25  import java.util.concurrent.atomic.AtomicReference;
26  
27  import org.eclipse.jetty.util.Callback;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  
31  /**
32   * <p>A convenience base implementation of {@link Connection}.</p>
33   * <p>This class uses the capabilities of the {@link EndPoint} API to provide a
34   * more traditional style of async reading.  A call to {@link #fillInterested()}
35   * will schedule a callback to {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
36   * as appropriate.</p>
37   */
38  public abstract class AbstractConnection implements Connection
39  {
40      private static final Logger LOG = Log.getLogger(AbstractConnection.class);
41      
42      public static final boolean EXECUTE_ONFILLABLE=true;
43  
44      private final List<Listener> listeners = new CopyOnWriteArrayList<>();
45      private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE);
46      private final long _created=System.currentTimeMillis();
47      private final EndPoint _endPoint;
48      private final Executor _executor;
49      private final Callback _readCallback;
50      private final boolean _executeOnfillable;
51      private int _inputBufferSize=2048;
52  
53      protected AbstractConnection(EndPoint endp, Executor executor)
54      {
55          this(endp,executor,EXECUTE_ONFILLABLE);
56      }
57      
58      protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
59      {
60          if (executor == null)
61              throw new IllegalArgumentException("Executor must not be null!");
62          _endPoint = endp;
63          _executor = executor;
64          _readCallback = new ReadCallback();
65          _executeOnfillable=executeOnfillable;
66      }
67  
68      @Override
69      public void addListener(Listener listener)
70      {
71          listeners.add(listener);
72      }
73  
74      public int getInputBufferSize()
75      {
76          return _inputBufferSize;
77      }
78  
79      public void setInputBufferSize(int inputBufferSize)
80      {
81          _inputBufferSize = inputBufferSize;
82      }
83  
84      protected Executor getExecutor()
85      {
86          return _executor;
87      }
88      
89      /**
90       * <p>Utility method to be called to register read interest.</p>
91       * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
92       * will be called back as appropriate.</p>
93       * @see #onFillable()
94       */
95      public void fillInterested()
96      {
97          LOG.debug("fillInterested {}",this);
98  
99          loop:while(true)
100         {
101             switch(_state.get())
102             {
103                 case IDLE:
104                     if (_state.compareAndSet(State.IDLE,State.INTERESTED))
105                     {
106                         getEndPoint().fillInterested(_readCallback);
107                         break loop;
108                     }
109                     break;
110 
111                 case FILLING:
112                     if (_state.compareAndSet(State.FILLING,State.FILLING_INTERESTED))
113                         break loop;
114                     break;
115 
116                 case FILLING_INTERESTED:
117                 case INTERESTED:
118                     break loop;
119             }
120         }
121     }
122 
123     /**
124      * <p>Callback method invoked when the endpoint is ready to be read.</p>
125      * @see #fillInterested()
126      */
127     public abstract void onFillable();
128 
129     /**
130      * <p>Callback method invoked when the endpoint failed to be ready to be read.</p>
131      * @param cause the exception that caused the failure
132      */
133     protected void onFillInterestedFailed(Throwable cause)
134     {
135         LOG.debug("{} onFillInterestedFailed {}", this, cause);
136         if (_endPoint.isOpen())
137         {
138             boolean close = true;
139             if (cause instanceof TimeoutException)
140                 close = onReadTimeout();
141             if (close)
142             {
143                 if (_endPoint.isOutputShutdown())
144                     _endPoint.close();
145                 else
146                     _endPoint.shutdownOutput();
147             }
148         }
149     }
150 
151     /**
152      * <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p>
153      * @return true to signal that the endpoint must be closed, false to keep the endpoint open
154      */
155     protected boolean onReadTimeout()
156     {
157         return true;
158     }
159 
160     @Override
161     public void onOpen()
162     {
163         LOG.debug("onOpen {}", this);
164 
165         for (Listener listener : listeners)
166             listener.onOpened(this);
167     }
168 
169     @Override
170     public void onClose()
171     {
172         LOG.debug("onClose {}",this);
173 
174         for (Listener listener : listeners)
175             listener.onClosed(this);
176     }
177 
178     @Override
179     public EndPoint getEndPoint()
180     {
181         return _endPoint;
182     }
183 
184     @Override
185     public void close()
186     {
187         getEndPoint().close();
188     }
189 
190     @Override
191     public int getMessagesIn()
192     {
193         return -1;
194     }
195 
196     @Override
197     public int getMessagesOut()
198     {
199         return -1;
200     }
201 
202     @Override
203     public long getBytesIn()
204     {
205         return -1;
206     }
207 
208     @Override
209     public long getBytesOut()
210     {
211         return -1;
212     }
213 
214     @Override
215     public long getCreatedTimeStamp()
216     {
217         return _created;
218     }
219 
220     @Override
221     public String toString()
222     {
223         return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), _state.get());
224     }
225 
226     private enum State
227     {
228         IDLE, INTERESTED, FILLING, FILLING_INTERESTED
229     }
230     
231     private class ReadCallback implements Callback, Runnable
232     {
233         @Override
234         public void run()
235         {
236             if (_state.compareAndSet(State.INTERESTED,State.FILLING))
237             {
238                 try
239                 {
240                     onFillable();
241                 }
242                 finally
243                 {
244                     loop:while(true)
245                     {
246                         switch(_state.get())
247                         {
248                             case IDLE:
249                             case INTERESTED:
250                                 throw new IllegalStateException();
251 
252                             case FILLING:
253                                 if (_state.compareAndSet(State.FILLING,State.IDLE))
254                                     break loop;
255                                 break;
256 
257                             case FILLING_INTERESTED:
258                                 if (_state.compareAndSet(State.FILLING_INTERESTED,State.INTERESTED))
259                                 {
260                                     getEndPoint().fillInterested(_readCallback);
261                                     break loop;
262                                 }
263                                 break;
264                         }
265                     }
266                 }
267             }
268             else
269                 LOG.warn(new Throwable());
270         }
271         
272         @Override
273         public void succeeded()
274         {
275             if (_executeOnfillable)
276                 _executor.execute(this);
277             else
278                 run();
279         }
280 
281         @Override
282         public void failed(Throwable x)
283         {
284             onFillInterestedFailed(x);
285         }
286         
287         @Override
288         public String toString()
289         {
290             return String.format("AC.ExReadCB@%x", AbstractConnection.this.hashCode());
291         }
292     };
293 }