View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.util.List;
22  import java.util.concurrent.CopyOnWriteArrayList;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.RejectedExecutionException;
25  import java.util.concurrent.TimeoutException;
26  
27  import org.eclipse.jetty.util.Callback;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  
31  /**
32   * <p>A convenience base implementation of {@link Connection}.</p>
33   * <p>This class uses the capabilities of the {@link EndPoint} API to provide a
34   * more traditional style of async reading.  A call to {@link #fillInterested()}
35   * will schedule a callback to {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
36   * as appropriate.</p>
37   */
38  public abstract class AbstractConnection implements Connection
39  {
40      private static final Logger LOG = Log.getLogger(AbstractConnection.class);
41  
42      private final List<Listener> listeners = new CopyOnWriteArrayList<>();
43      private final long _created=System.currentTimeMillis();
44      private final EndPoint _endPoint;
45      private final Executor _executor;
46      private final Callback _readCallback;
47      private int _inputBufferSize=2048;
48  
49      protected AbstractConnection(EndPoint endp, Executor executor)
50      {
51          if (executor == null)
52              throw new IllegalArgumentException("Executor must not be null!");
53          _endPoint = endp;
54          _executor = executor;
55          _readCallback = new ReadCallback();
56      }
57  
58      @Override
59      public void addListener(Listener listener)
60      {
61          listeners.add(listener);
62      }
63  
64      @Override
65      public void removeListener(Listener listener)
66      {
67          listeners.remove(listener);
68      }
69  
70      public int getInputBufferSize()
71      {
72          return _inputBufferSize;
73      }
74  
75      public void setInputBufferSize(int inputBufferSize)
76      {
77          _inputBufferSize = inputBufferSize;
78      }
79  
80      protected Executor getExecutor()
81      {
82          return _executor;
83      }
84  
85      @Deprecated
86      public boolean isDispatchIO()
87      {
88          return false;
89      }
90  
91      protected void failedCallback(final Callback callback, final Throwable x)
92      {
93          if (callback.isNonBlocking())
94          {
95              try
96              {
97                  callback.failed(x);
98              }
99              catch (Exception e)
100             {
101                 LOG.warn(e);
102             }
103         }
104         else
105         {
106             try
107             {
108                 getExecutor().execute(new Runnable()
109                 {
110                     @Override
111                     public void run()
112                     {
113                         try
114                         {
115                             callback.failed(x);
116                         }
117                         catch (Exception e)
118                         {
119                             LOG.warn(e);
120                         }
121                     }
122                 });
123             }
124             catch(RejectedExecutionException e)
125             {
126                 LOG.debug(e);
127                 callback.failed(x);
128             }
129         }
130     }
131 
132     /**
133      * <p>Utility method to be called to register read interest.</p>
134      * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
135      * will be called back as appropriate.</p>
136      * @see #onFillable()
137      */
138     public void fillInterested()
139     {
140         if (LOG.isDebugEnabled())
141             LOG.debug("fillInterested {}",this);
142         getEndPoint().fillInterested(_readCallback);
143     }
144 
145     public boolean isFillInterested()
146     {
147         return getEndPoint().isFillInterested();
148     }
149 
150     /**
151      * <p>Callback method invoked when the endpoint is ready to be read.</p>
152      * @see #fillInterested()
153      */
154     public abstract void onFillable();
155 
156     /**
157      * <p>Callback method invoked when the endpoint failed to be ready to be read.</p>
158      * @param cause the exception that caused the failure
159      */
160     protected void onFillInterestedFailed(Throwable cause)
161     {
162         if (LOG.isDebugEnabled())
163             LOG.debug("{} onFillInterestedFailed {}", this, cause);
164         if (_endPoint.isOpen())
165         {
166             boolean close = true;
167             if (cause instanceof TimeoutException)
168                 close = onReadTimeout();
169             if (close)
170             {
171                 if (_endPoint.isOutputShutdown())
172                     _endPoint.close();
173                 else
174                 {
175                     _endPoint.shutdownOutput();
176                     fillInterested();
177                 }
178             }
179         }
180     }
181 
182     /**
183      * <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p>
184      * @return true to signal that the endpoint must be closed, false to keep the endpoint open
185      */
186     protected boolean onReadTimeout()
187     {
188         return true;
189     }
190 
191     @Override
192     public void onOpen()
193     {
194         if (LOG.isDebugEnabled())
195             LOG.debug("onOpen {}", this);
196 
197         for (Listener listener : listeners)
198             listener.onOpened(this);
199     }
200 
201     @Override
202     public void onClose()
203     {
204         if (LOG.isDebugEnabled())
205             LOG.debug("onClose {}",this);
206 
207         for (Listener listener : listeners)
208             listener.onClosed(this);
209     }
210 
211     @Override
212     public EndPoint getEndPoint()
213     {
214         return _endPoint;
215     }
216 
217     @Override
218     public void close()
219     {
220         getEndPoint().close();
221     }
222 
223     @Override
224     public boolean onIdleExpired()
225     {
226         return true;
227     }
228 
229     @Override
230     public int getMessagesIn()
231     {
232         return -1;
233     }
234 
235     @Override
236     public int getMessagesOut()
237     {
238         return -1;
239     }
240 
241     @Override
242     public long getBytesIn()
243     {
244         return -1;
245     }
246 
247     @Override
248     public long getBytesOut()
249     {
250         return -1;
251     }
252 
253     @Override
254     public long getCreatedTimeStamp()
255     {
256         return _created;
257     }
258 
259     @Override
260     public String toString()
261     {
262         return String.format("%s@%x[%s]",
263                 getClass().getSimpleName(),
264                 hashCode(),
265                 _endPoint);
266     }
267 
268     private class ReadCallback implements Callback
269     {
270         @Override
271         public void succeeded()
272         {
273             onFillable();
274         }
275 
276         @Override
277         public void failed(final Throwable x)
278         {
279             onFillInterestedFailed(x);
280         }
281 
282         @Override
283         public String toString()
284         {
285             return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this);
286         }
287     }
288 }