View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.nio.channels.CancelledKeyException;
22  import java.nio.channels.SelectionKey;
23  import java.nio.channels.SocketChannel;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  import org.eclipse.jetty.util.thread.Scheduler;
31  
32  /**
33   * An ChannelEndpoint that can be scheduled by {@link SelectorManager}.
34   */
35  public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorManager.SelectableEndPoint
36  {
37      public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
38  
39      private final Runnable _updateTask = new Runnable()
40      {
41          @Override
42          public void run()
43          {
44              try
45              {
46                  if (getChannel().isOpen())
47                  {
48                      int oldInterestOps = _key.interestOps();
49                      int newInterestOps = _interestOps.get();
50                      if (newInterestOps != oldInterestOps)
51                          setKeyInterests(oldInterestOps, newInterestOps);
52                  }
53              }
54              catch (CancelledKeyException x)
55              {
56                  LOG.debug("Ignoring key update for concurrently closed channel {}", this);
57                  close();
58              }
59              catch (Exception x)
60              {
61                  LOG.warn("Ignoring key update for " + this, x);
62                  close();
63              }
64          }
65      };
66  
67      /**
68       * true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called
69       */
70      private final AtomicBoolean _open = new AtomicBoolean();
71      private final SelectorManager.ManagedSelector _selector;
72      private final SelectionKey _key;
73      /**
74       * The desired value for {@link SelectionKey#interestOps()}
75       */
76      private final AtomicInteger _interestOps = new AtomicInteger();
77  
78      public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
79      {
80          super(scheduler,channel);
81          _selector = selector;
82          _key = key;
83          setIdleTimeout(idleTimeout);
84      }
85  
86      @Override
87      protected boolean needsFill()
88      {
89          updateLocalInterests(SelectionKey.OP_READ, true);
90          return false;
91      }
92  
93      @Override
94      protected void onIncompleteFlush()
95      {
96          updateLocalInterests(SelectionKey.OP_WRITE, true);
97      }
98  
99      @Override
100     public void onSelected()
101     {
102         assert _selector.isSelectorThread();
103         int oldInterestOps = _key.interestOps();
104         int readyOps = _key.readyOps();
105         int newInterestOps = oldInterestOps & ~readyOps;
106         setKeyInterests(oldInterestOps, newInterestOps);
107         updateLocalInterests(readyOps, false);
108         if (_key.isReadable())
109             getFillInterest().fillable();
110         if (_key.isWritable())
111             getWriteFlusher().completeWrite();
112     }
113 
114 
115     private void updateLocalInterests(int operation, boolean add)
116     {
117         while (true)
118         {
119             int oldInterestOps = _interestOps.get();
120             int newInterestOps;
121             if (add)
122                 newInterestOps = oldInterestOps | operation;
123             else
124                 newInterestOps = oldInterestOps & ~operation;
125 
126             if (isInputShutdown())
127                 newInterestOps &= ~SelectionKey.OP_READ;
128             if (isOutputShutdown())
129                 newInterestOps &= ~SelectionKey.OP_WRITE;
130 
131             if (newInterestOps != oldInterestOps)
132             {
133                 if (_interestOps.compareAndSet(oldInterestOps, newInterestOps))
134                 {
135                     LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this);
136                     _selector.submit(_updateTask);
137                 }
138                 else
139                 {
140                     LOG.debug("Local interests update conflict: now {}, was {}, attempted {} for {}", _interestOps.get(), oldInterestOps, newInterestOps, this);
141                     continue;
142                 }
143             }
144             else
145             {
146                 LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this);
147             }
148             break;
149         }
150     }
151 
152 
153     private void setKeyInterests(int oldInterestOps, int newInterestOps)
154     {
155         assert _selector.isSelectorThread();
156         LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps);
157         _key.interestOps(newInterestOps);
158     }
159 
160     @Override
161     public void close()
162     {
163         if (_open.compareAndSet(true, false))
164         {
165             super.close();
166             _selector.destroyEndPoint(this);
167         }
168     }
169 
170     @Override
171     public boolean isOpen()
172     {
173         // We cannot rely on super.isOpen(), because there is a race between calls to close() and isOpen():
174         // a thread may call close(), which flips the boolean but has not yet called super.close(), and
175         // another thread calls isOpen() which would return true - wrong - if based on super.isOpen().
176         return _open.get();
177     }
178 
179     @Override
180     public void onOpen()
181     {
182         if (_open.compareAndSet(false, true))
183             super.onOpen();
184     }
185 
186     @Override
187     public String toString()
188     {
189         // Do NOT use synchronized (this)
190         // because it's very easy to deadlock when debugging is enabled.
191         // We do a best effort to print the right toString() and that's it.
192         try
193         {
194             boolean valid = _key!=null && _key.isValid();
195             int keyInterests = valid ? _key.interestOps() : -1;
196             int keyReadiness = valid ? _key.readyOps() : -1;
197             return String.format("%s{io=%d,kio=%d,kro=%d}",
198                     super.toString(),
199                     _interestOps.get(),
200                     keyInterests,
201                     keyReadiness);
202         }
203         catch (CancelledKeyException x)
204         {
205             return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _interestOps.get());
206         }
207     }
208 }