View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.io.IOException;
22  import java.nio.channels.ClosedChannelException;
23  import java.nio.channels.ReadPendingException;
24  import java.text.SimpleDateFormat;
25  import java.util.Date;
26  import java.util.concurrent.atomic.AtomicReference;
27  
28  import org.eclipse.jetty.util.Callback;
29  import org.eclipse.jetty.util.log.Log;
30  import org.eclipse.jetty.util.log.Logger;
31  
32  /**
33   * A Utility class to help implement {@link EndPoint#fillInterested(Callback)}
34   * by keeping state and calling the context and callback objects.
35   */
36  public abstract class FillInterest
37  {
38      private final static Logger LOG = Log.getLogger(FillInterest.class);
39      private final AtomicReference<Callback> _interested = new AtomicReference<>(null);
40      private Throwable _lastSet;
41  
42      protected FillInterest()
43      {
44      }
45  
46      /**
47       * Call to register interest in a callback when a read is possible.
48       * The callback will be called either immediately if {@link #needsFillInterest()}
49       * returns true or eventually once {@link #fillable()} is called.
50       *
51       * @param callback the callback to register
52       * @throws ReadPendingException if unable to read due to pending read op
53       */
54      public void register(Callback callback) throws ReadPendingException
55      {
56          if (callback == null)
57              throw new IllegalArgumentException();
58  
59          if (_interested.compareAndSet(null, callback))
60          {
61              if (LOG.isDebugEnabled())
62              {
63                  LOG.debug("{} register {}",this,callback);
64                  _lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName());
65              }
66          }
67          else
68          {
69              LOG.warn("Read pending for {} prevented {}", _interested, callback);
70              if (LOG.isDebugEnabled())
71                  LOG.warn("callback set at ",_lastSet);
72              throw new ReadPendingException();
73          }
74          try
75          {
76              if (LOG.isDebugEnabled())
77                  LOG.debug("{} register {}",this,callback);
78              needsFillInterest();
79          }
80          catch (Throwable e)
81          {
82              onFail(e);
83          }
84      }
85  
86      /**
87       * Call to signal that a read is now possible.
88       */
89      public void fillable()
90      {
91          Callback callback = _interested.get();
92          if (LOG.isDebugEnabled())
93              LOG.debug("{} fillable {}",this,callback);
94          if (callback != null && _interested.compareAndSet(callback, null))
95              callback.succeeded();
96          else if (LOG.isDebugEnabled())
97              LOG.debug("{} lost race {}",this,callback);
98      }
99  
100     /**
101      * @return True if a read callback has been registered
102      */
103     public boolean isInterested()
104     {
105         return _interested.get() != null;
106     }
107     
108     public boolean isCallbackNonBlocking()
109     {
110         Callback callback = _interested.get();
111         return callback!=null && callback.isNonBlocking();
112     }
113 
114     /**
115      * Call to signal a failure to a registered interest
116      *
117      * @param cause the cause of the failure
118      * @return true if the cause was passed to a {@link Callback} instance
119      */
120     public boolean onFail(Throwable cause)
121     {
122         Callback callback = _interested.get();
123         if (callback != null && _interested.compareAndSet(callback, null))
124         {
125             callback.failed(cause);
126             return true;
127         }
128         return false;
129     }
130 
131     public void onClose()
132     {
133         Callback callback = _interested.get();
134         if (callback != null && _interested.compareAndSet(callback, null))
135             callback.failed(new ClosedChannelException());
136     }
137 
138     @Override
139     public String toString()
140     {
141         return String.format("FillInterest@%x{%b,%s}", hashCode(), _interested.get()!=null, _interested.get());
142     }
143 
144     
145     public String toStateString()
146     {
147         return _interested.get()==null?"-":"FI";
148     }
149 
150     /**
151      * Register the read interest
152      * Abstract method to be implemented by the Specific ReadInterest to
153      * schedule a future call to {@link #fillable()} or {@link #onFail(Throwable)}
154      *
155      * @throws IOException if unable to fulfill interest in fill
156      */
157     abstract protected void needsFillInterest() throws IOException;
158 }