View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.util.List;
22  import java.util.concurrent.CopyOnWriteArrayList;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.RejectedExecutionException;
25  import java.util.concurrent.TimeoutException;
26  
27  import org.eclipse.jetty.util.Callback;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  
31  /**
32   * <p>A convenience base implementation of {@link Connection}.</p>
33   * <p>This class uses the capabilities of the {@link EndPoint} API to provide a
34   * more traditional style of async reading.  A call to {@link #fillInterested()}
35   * will schedule a callback to {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
36   * as appropriate.</p>
37   */
38  public abstract class AbstractConnection implements Connection
39  {
40      private static final Logger LOG = Log.getLogger(AbstractConnection.class);
41  
42      private final List<Listener> listeners = new CopyOnWriteArrayList<>();
43      private final long _created=System.currentTimeMillis();
44      private final EndPoint _endPoint;
45      private final Executor _executor;
46      private final Callback _readCallback;
47      private int _inputBufferSize=2048;
48  
49      protected AbstractConnection(EndPoint endp, Executor executor)
50      {
51          if (executor == null)
52              throw new IllegalArgumentException("Executor must not be null!");
53          _endPoint = endp;
54          _executor = executor;
55          _readCallback = new ReadCallback();
56      }
57  
58      @Override
59      public void addListener(Listener listener)
60      {
61          listeners.add(listener);
62      }
63  
64      public int getInputBufferSize()
65      {
66          return _inputBufferSize;
67      }
68  
69      public void setInputBufferSize(int inputBufferSize)
70      {
71          _inputBufferSize = inputBufferSize;
72      }
73  
74      protected Executor getExecutor()
75      {
76          return _executor;
77      }
78  
79      @Deprecated
80      public boolean isDispatchIO()
81      {
82          return false;
83      }
84  
85      protected void failedCallback(final Callback callback, final Throwable x)
86      {
87          if (callback.isNonBlocking())
88          {
89              try
90              {
91                  callback.failed(x);
92              }
93              catch (Exception e)
94              {
95                  LOG.warn(e);
96              }
97          }
98          else
99          {
100             try
101             {
102                 getExecutor().execute(new Runnable()
103                 {
104                     @Override
105                     public void run()
106                     {
107                         try
108                         {
109                             callback.failed(x);
110                         }
111                         catch (Exception e)
112                         {
113                             LOG.warn(e);
114                         }
115                     }
116                 });
117             }
118             catch(RejectedExecutionException e)
119             {
120                 LOG.debug(e);
121                 callback.failed(x);
122             }
123         }
124     }
125 
126     /**
127      * <p>Utility method to be called to register read interest.</p>
128      * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
129      * will be called back as appropriate.</p>
130      * @see #onFillable()
131      */
132     public void fillInterested()
133     {
134         if (LOG.isDebugEnabled())
135             LOG.debug("fillInterested {}",this);
136         getEndPoint().fillInterested(_readCallback);
137     }
138 
139     public boolean isFillInterested()
140     {
141         return getEndPoint().isFillInterested();
142     }
143 
144     /**
145      * <p>Callback method invoked when the endpoint is ready to be read.</p>
146      * @see #fillInterested()
147      */
148     public abstract void onFillable();
149 
150     /**
151      * <p>Callback method invoked when the endpoint failed to be ready to be read.</p>
152      * @param cause the exception that caused the failure
153      */
154     protected void onFillInterestedFailed(Throwable cause)
155     {
156         if (LOG.isDebugEnabled())
157             LOG.debug("{} onFillInterestedFailed {}", this, cause);
158         if (_endPoint.isOpen())
159         {
160             boolean close = true;
161             if (cause instanceof TimeoutException)
162                 close = onReadTimeout();
163             if (close)
164             {
165                 if (_endPoint.isOutputShutdown())
166                     _endPoint.close();
167                 else
168                 {
169                     _endPoint.shutdownOutput();
170                     fillInterested();
171                 }
172             }
173         }
174     }
175 
176     /**
177      * <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p>
178      * @return true to signal that the endpoint must be closed, false to keep the endpoint open
179      */
180     protected boolean onReadTimeout()
181     {
182         return true;
183     }
184 
185     @Override
186     public void onOpen()
187     {
188         if (LOG.isDebugEnabled())
189             LOG.debug("onOpen {}", this);
190 
191         for (Listener listener : listeners)
192             listener.onOpened(this);
193     }
194 
195     @Override
196     public void onClose()
197     {
198         if (LOG.isDebugEnabled())
199             LOG.debug("onClose {}",this);
200 
201         for (Listener listener : listeners)
202             listener.onClosed(this);
203     }
204 
205     @Override
206     public EndPoint getEndPoint()
207     {
208         return _endPoint;
209     }
210 
211     @Override
212     public void close()
213     {
214         getEndPoint().close();
215     }
216 
217     @Override
218     public int getMessagesIn()
219     {
220         return -1;
221     }
222 
223     @Override
224     public int getMessagesOut()
225     {
226         return -1;
227     }
228 
229     @Override
230     public long getBytesIn()
231     {
232         return -1;
233     }
234 
235     @Override
236     public long getBytesOut()
237     {
238         return -1;
239     }
240 
241     @Override
242     public long getCreatedTimeStamp()
243     {
244         return _created;
245     }
246 
247     @Override
248     public String toString()
249     {
250         return String.format("%s@%x[%s]",
251                 getClass().getSimpleName(),
252                 hashCode(),
253                 _endPoint);
254     }
255 
256     private class ReadCallback implements Callback
257     {
258         @Override
259         public void succeeded()
260         {
261             onFillable();
262         }
263 
264         @Override
265         public void failed(final Throwable x)
266         {
267             onFillInterestedFailed(x);
268         }
269 
270         @Override
271         public String toString()
272         {
273             return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this);
274         }
275     }
276 }