View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.nio.channels.CancelledKeyException;
22  import java.nio.channels.SelectionKey;
23  import java.nio.channels.SocketChannel;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  
26  import org.eclipse.jetty.util.log.Log;
27  import org.eclipse.jetty.util.log.Logger;
28  import org.eclipse.jetty.util.thread.Locker;
29  import org.eclipse.jetty.util.thread.Scheduler;
30  
31  /**
32   * An ChannelEndpoint that can be scheduled by {@link SelectorManager}.
33   */
34  public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSelector.SelectableEndPoint
35  {
36      public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
37  
38      private final Locker _locker = new Locker();
39      private boolean _updatePending;
40  
41      /**
42       * true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called
43       */
44      private final AtomicBoolean _open = new AtomicBoolean();
45      private final ManagedSelector _selector;
46      private final SelectionKey _key;
47      /**
48       * The current value for {@link SelectionKey#interestOps()}.
49       */
50      private int _currentInterestOps;
51      /**
52       * The desired value for {@link SelectionKey#interestOps()}.
53       */
54      private int _desiredInterestOps;
55  
56      private final Runnable _runUpdateKey = new Runnable()
57      {
58          @Override
59          public void run()
60          {
61              updateKey();
62          }
63  
64          @Override
65          public String toString()
66          {
67              return SelectChannelEndPoint.this.toString()+":runUpdateKey";
68          }
69      };
70      private final Runnable _runFillable = new Runnable()
71      {
72          @Override
73          public void run()
74          {
75              getFillInterest().fillable();
76          }
77  
78          @Override
79          public String toString()
80          {
81              return SelectChannelEndPoint.this.toString()+":runFillable";
82          }
83      };
84      private final Runnable _runCompleteWrite = new Runnable()
85      {
86          @Override
87          public void run()
88          {
89              getWriteFlusher().completeWrite();
90          }
91  
92          @Override
93          public String toString()
94          {
95              return SelectChannelEndPoint.this.toString()+":runCompleteWrite";
96          }
97      };
98      private final Runnable _runFillableCompleteWrite = new Runnable()
99      {
100         @Override
101         public void run()
102         {
103             getFillInterest().fillable();
104             getWriteFlusher().completeWrite();
105         }
106 
107         @Override
108         public String toString()
109         {
110             return SelectChannelEndPoint.this.toString()+":runFillableCompleteWrite";
111         }
112     };
113 
114     public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
115     {
116         super(scheduler, channel);
117         _selector = selector;
118         _key = key;
119         setIdleTimeout(idleTimeout);
120     }
121 
122     @Override
123     protected void needsFillInterest()
124     {
125         changeInterests(SelectionKey.OP_READ);
126     }
127 
128     @Override
129     protected void onIncompleteFlush()
130     {
131         changeInterests(SelectionKey.OP_WRITE);
132     }
133 
134     @Override
135     public Runnable onSelected()
136     {
137         /**
138          * This method may run concurrently with {@link #changeInterests(int)}.
139          */
140 
141         int readyOps = _key.readyOps();
142         int oldInterestOps;
143         int newInterestOps;
144         try (Locker.Lock lock = _locker.lock())
145         {
146             _updatePending = true;
147             // Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
148             oldInterestOps = _desiredInterestOps;
149             newInterestOps = oldInterestOps & ~readyOps;
150             _desiredInterestOps = newInterestOps;
151         }
152 
153 
154         boolean readable = (readyOps & SelectionKey.OP_READ) != 0;
155         boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0;
156 
157 
158         if (LOG.isDebugEnabled())
159             LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, readable, writable, this);
160         
161         // Run non-blocking code immediately.
162         // This producer knows that this non-blocking code is special
163         // and that it must be run in this thread and not fed to the
164         // ExecutionStrategy, which could not have any thread to run these
165         // tasks (or it may starve forever just after having run them).
166         if (readable && getFillInterest().isCallbackNonBlocking())
167         {
168             if (LOG.isDebugEnabled())
169                 LOG.debug("Direct readable run {}",this);
170             _runFillable.run();
171             readable = false;
172         }
173         if (writable && getWriteFlusher().isCallbackNonBlocking())
174         {
175             if (LOG.isDebugEnabled())
176                 LOG.debug("Direct writable run {}",this);
177             _runCompleteWrite.run();
178             writable = false;
179         }
180 
181         // return task to complete the job
182         Runnable task= readable ? (writable ? _runFillableCompleteWrite : _runFillable)
183                 : (writable ? _runCompleteWrite : null);
184 
185         if (LOG.isDebugEnabled())
186             LOG.debug("task {}",task);
187         return task;
188     }
189 
190     @Override
191     public void updateKey()
192     {
193         /**
194          * This method may run concurrently with {@link #changeInterests(int)}.
195          */
196 
197         try
198         {
199             int oldInterestOps;
200             int newInterestOps;
201             try (Locker.Lock lock = _locker.lock())
202             {
203                 _updatePending = false;
204                 oldInterestOps = _currentInterestOps;
205                 newInterestOps = _desiredInterestOps;
206                 if (oldInterestOps != newInterestOps)
207                 {
208                     _currentInterestOps = newInterestOps;
209                     _key.interestOps(newInterestOps);
210                 }
211             }
212 
213             if (LOG.isDebugEnabled())
214                 LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this);
215         }
216         catch (CancelledKeyException x)
217         {
218             LOG.debug("Ignoring key update for concurrently closed channel {}", this);
219             close();
220         }
221         catch (Throwable x)
222         {
223             LOG.warn("Ignoring key update for " + this, x);
224             close();
225         }
226     }
227 
228     private void changeInterests(int operation)
229     {
230         /**
231          * This method may run concurrently with
232          * {@link #updateKey()} and {@link #onSelected()}.
233          */
234 
235         int oldInterestOps;
236         int newInterestOps;
237         boolean pending;
238         try (Locker.Lock lock = _locker.lock())
239         {
240             pending = _updatePending;
241             oldInterestOps = _desiredInterestOps;
242             newInterestOps = oldInterestOps | operation;
243             if (newInterestOps != oldInterestOps)
244                 _desiredInterestOps = newInterestOps;
245         }
246 
247         if (LOG.isDebugEnabled())
248             LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this);
249 
250         if (!pending)
251             _selector.submit(_runUpdateKey);
252     }
253 
254 
255     @Override
256     public void close()
257     {
258         if (_open.compareAndSet(true, false))
259         {
260             super.close();
261             _selector.destroyEndPoint(this);
262         }
263     }
264 
265     @Override
266     public boolean isOpen()
267     {
268         // We cannot rely on super.isOpen(), because there is a race between calls to close() and isOpen():
269         // a thread may call close(), which flips the boolean but has not yet called super.close(), and
270         // another thread calls isOpen() which would return true - wrong - if based on super.isOpen().
271         return _open.get();
272     }
273 
274     @Override
275     public void onOpen()
276     {
277         if (_open.compareAndSet(false, true))
278             super.onOpen();
279     }
280 
281     @Override
282     public String toString()
283     {
284         // We do a best effort to print the right toString() and that's it.
285         try
286         {
287             boolean valid = _key != null && _key.isValid();
288             int keyInterests = valid ? _key.interestOps() : -1;
289             int keyReadiness = valid ? _key.readyOps() : -1;
290             return String.format("%s{io=%d/%d,kio=%d,kro=%d}",
291                     super.toString(),
292                     _currentInterestOps,
293                     _desiredInterestOps,
294                     keyInterests,
295                     keyReadiness);
296         }
297         catch (Throwable x)
298         {
299             return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _desiredInterestOps);
300         }
301     }
302 }