View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.util.List;
22  import java.util.concurrent.CopyOnWriteArrayList;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.TimeoutException;
25  import java.util.concurrent.atomic.AtomicReference;
26  
27  import org.eclipse.jetty.util.BlockingCallback;
28  import org.eclipse.jetty.util.Callback;
29  import org.eclipse.jetty.util.log.Log;
30  import org.eclipse.jetty.util.log.Logger;
31  
32  /**
33   * <p>A convenience base implementation of {@link Connection}.</p>
34   * <p>This class uses the capabilities of the {@link EndPoint} API to provide a
35   * more traditional style of async reading.  A call to {@link #fillInterested()}
36   * will schedule a callback to {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
37   * as appropriate.</p>
38   */
39  public abstract class AbstractConnection implements Connection
40  {
41      private static final Logger LOG = Log.getLogger(AbstractConnection.class);
42      
43      public static final boolean EXECUTE_ONFILLABLE=true;
44  
45      private final List<Listener> listeners = new CopyOnWriteArrayList<>();
46      private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE);
47      private final long _created=System.currentTimeMillis();
48      private final EndPoint _endPoint;
49      private final Executor _executor;
50      private final Callback _readCallback;
51      private final boolean _executeOnfillable;
52      private int _inputBufferSize=2048;
53  
54      protected AbstractConnection(EndPoint endp, Executor executor)
55      {
56          this(endp,executor,EXECUTE_ONFILLABLE);
57      }
58      
59      protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
60      {
61          if (executor == null)
62              throw new IllegalArgumentException("Executor must not be null!");
63          _endPoint = endp;
64          _executor = executor;
65          _readCallback = new ReadCallback();
66          _executeOnfillable=executeOnfillable;
67      }
68  
69      @Override
70      public void addListener(Listener listener)
71      {
72          listeners.add(listener);
73      }
74  
75      public int getInputBufferSize()
76      {
77          return _inputBufferSize;
78      }
79  
80      public void setInputBufferSize(int inputBufferSize)
81      {
82          _inputBufferSize = inputBufferSize;
83      }
84  
85      protected Executor getExecutor()
86      {
87          return _executor;
88      }
89      
90      /**
91       * <p>Utility method to be called to register read interest.</p>
92       * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
93       * will be called back as appropriate.</p>
94       * @see #onFillable()
95       */
96      public void fillInterested()
97      {
98          LOG.debug("fillInterested {}",this);
99  
100         loop:while(true)
101         {
102             switch(_state.get())
103             {
104                 case IDLE:
105                     if (_state.compareAndSet(State.IDLE,State.INTERESTED))
106                     {
107                         getEndPoint().fillInterested(_readCallback);
108                         break loop;
109                     }
110                     break;
111 
112                 case FILLING:
113                     if (_state.compareAndSet(State.FILLING,State.FILLING_INTERESTED))
114                         break loop;
115                     break;
116                     
117                 case FILLING_BLOCKED:
118                     if (_state.compareAndSet(State.FILLING_BLOCKED,State.FILLING_BLOCKED_INTERESTED))
119                         break loop;
120                     break;
121                     
122                 case BLOCKED:
123                     if (_state.compareAndSet(State.BLOCKED,State.BLOCKED_INTERESTED))
124                         break loop;
125                     break;
126 
127                 case FILLING_BLOCKED_INTERESTED:
128                 case FILLING_INTERESTED:
129                 case BLOCKED_INTERESTED:
130                 case INTERESTED:
131                     break loop;
132             }
133         }
134     }
135     
136 
137     private void unblock()
138     {
139         LOG.debug("unblock {}",this);
140 
141         loop:while(true)
142         {
143             switch(_state.get())
144             {
145                 case FILLING_BLOCKED:
146                     if (_state.compareAndSet(State.FILLING_BLOCKED,State.FILLING))
147                         break loop;
148                     break;
149                     
150                 case FILLING_BLOCKED_INTERESTED:
151                     if (_state.compareAndSet(State.FILLING_BLOCKED_INTERESTED,State.FILLING_INTERESTED))
152                         break loop;
153                     break;
154                     
155                 case BLOCKED_INTERESTED:
156                     if (_state.compareAndSet(State.BLOCKED_INTERESTED,State.INTERESTED))
157                     {
158                         getEndPoint().fillInterested(_readCallback);
159                         break loop;
160                     }
161                     break;
162                     
163                 case BLOCKED:
164                     if (_state.compareAndSet(State.BLOCKED,State.IDLE))
165                         break loop;
166                     break;
167 
168                 case FILLING:
169                 case IDLE:
170                 case FILLING_INTERESTED:
171                 case INTERESTED:
172                     break loop;
173             }
174         }
175     }
176     
177     
178     /**
179      */
180     protected void block(final BlockingCallback callback)
181     {
182         LOG.debug("block {}",this);
183         
184         final Callback blocked=new Callback()
185         {
186             @Override
187             public void succeeded()
188             {
189                 unblock();
190                 callback.succeeded();
191             }
192 
193             @Override
194             public void failed(Throwable x)
195             {
196                 unblock();
197                 callback.failed(x);                
198             }
199         };
200 
201         loop:while(true)
202         {
203             switch(_state.get())
204             {
205                 case IDLE:
206                     if (_state.compareAndSet(State.IDLE,State.BLOCKED))
207                     {
208                         getEndPoint().fillInterested(blocked);
209                         break loop;
210                     }
211                     break;
212 
213                 case FILLING:
214                     if (_state.compareAndSet(State.FILLING,State.FILLING_BLOCKED))
215                     {
216                         getEndPoint().fillInterested(blocked);
217                         break loop;
218                     }
219                     break;
220                     
221                 case FILLING_INTERESTED:
222                     if (_state.compareAndSet(State.FILLING_INTERESTED,State.FILLING_BLOCKED_INTERESTED))
223                     {
224                         getEndPoint().fillInterested(blocked);
225                         break loop;
226                     }
227                     break;
228 
229                 case BLOCKED:
230                 case BLOCKED_INTERESTED:
231                 case FILLING_BLOCKED:
232                 case FILLING_BLOCKED_INTERESTED:
233                     throw new IllegalStateException("Already Blocked");
234                     
235                 case INTERESTED:
236                     throw new IllegalStateException();
237             }
238         }
239     }
240 
241     /**
242      * <p>Callback method invoked when the endpoint is ready to be read.</p>
243      * @see #fillInterested()
244      */
245     public abstract void onFillable();
246 
247     /**
248      * <p>Callback method invoked when the endpoint failed to be ready to be read.</p>
249      * @param cause the exception that caused the failure
250      */
251     protected void onFillInterestedFailed(Throwable cause)
252     {
253         LOG.debug("{} onFillInterestedFailed {}", this, cause);
254         if (_endPoint.isOpen())
255         {
256             boolean close = true;
257             if (cause instanceof TimeoutException)
258                 close = onReadTimeout();
259             if (close)
260             {
261                 if (_endPoint.isOutputShutdown())
262                     _endPoint.close();
263                 else
264                     _endPoint.shutdownOutput();
265             }
266         }
267     }
268 
269     /**
270      * <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p>
271      * @return true to signal that the endpoint must be closed, false to keep the endpoint open
272      */
273     protected boolean onReadTimeout()
274     {
275         return true;
276     }
277 
278     @Override
279     public void onOpen()
280     {
281         LOG.debug("onOpen {}", this);
282 
283         for (Listener listener : listeners)
284             listener.onOpened(this);
285     }
286 
287     @Override
288     public void onClose()
289     {
290         LOG.debug("onClose {}",this);
291 
292         for (Listener listener : listeners)
293             listener.onClosed(this);
294     }
295 
296     @Override
297     public EndPoint getEndPoint()
298     {
299         return _endPoint;
300     }
301 
302     @Override
303     public void close()
304     {
305         getEndPoint().close();
306     }
307 
308     @Override
309     public int getMessagesIn()
310     {
311         return -1;
312     }
313 
314     @Override
315     public int getMessagesOut()
316     {
317         return -1;
318     }
319 
320     @Override
321     public long getBytesIn()
322     {
323         return -1;
324     }
325 
326     @Override
327     public long getBytesOut()
328     {
329         return -1;
330     }
331 
332     @Override
333     public long getCreatedTimeStamp()
334     {
335         return _created;
336     }
337 
338     @Override
339     public String toString()
340     {
341         return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), _state.get());
342     }
343 
344     private enum State
345     {
346         IDLE, INTERESTED, FILLING, FILLING_INTERESTED, FILLING_BLOCKED, BLOCKED, FILLING_BLOCKED_INTERESTED, BLOCKED_INTERESTED
347     }
348     
349     private class ReadCallback implements Callback, Runnable
350     {
351         @Override
352         public void run()
353         {
354             if (_state.compareAndSet(State.INTERESTED,State.FILLING))
355             {
356                 try
357                 {
358                     onFillable();
359                 }
360                 finally
361                 {
362                     loop:while(true)
363                     {
364                         switch(_state.get())
365                         {
366                             case IDLE:
367                             case INTERESTED:
368                             case BLOCKED:
369                             case BLOCKED_INTERESTED:
370                                 LOG.warn(new IllegalStateException());
371                                 return;
372 
373                             case FILLING:
374                                 if (_state.compareAndSet(State.FILLING,State.IDLE))
375                                     break loop;
376                                 break;
377                                 
378                             case FILLING_BLOCKED:
379                                 if (_state.compareAndSet(State.FILLING_BLOCKED,State.BLOCKED))
380                                     break loop;
381                                 break;
382                                 
383                             case FILLING_BLOCKED_INTERESTED:
384                                 if (_state.compareAndSet(State.FILLING_BLOCKED_INTERESTED,State.BLOCKED_INTERESTED))
385                                     break loop;
386                                 break;
387 
388                             case FILLING_INTERESTED:
389                                 if (_state.compareAndSet(State.FILLING_INTERESTED,State.INTERESTED))
390                                 {
391                                     getEndPoint().fillInterested(_readCallback);
392                                     break loop;
393                                 }
394                                 break;
395                         }
396                     }
397                 }
398             }
399             else
400                 LOG.warn(new IllegalStateException());
401         }
402         
403         @Override
404         public void succeeded()
405         {
406             if (_executeOnfillable)
407                 _executor.execute(this);
408             else
409                 run();
410         }
411 
412         @Override
413         public void failed(final Throwable x)
414         {
415             _executor.execute(new Runnable()
416             {
417                 @Override
418                 public void run()
419                 {
420                     onFillInterestedFailed(x);
421                 }
422             });
423         }
424         
425         @Override
426         public String toString()
427         {
428             return String.format("AC.ExReadCB@%x", AbstractConnection.this.hashCode());
429         }
430     };
431 }