View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.Socket;
24  import java.net.SocketAddress;
25  import java.nio.channels.SelectionKey;
26  import java.nio.channels.ServerSocketChannel;
27  import java.nio.channels.SocketChannel;
28  import java.util.concurrent.Executor;
29  
30  import org.eclipse.jetty.util.TypeUtil;
31  import org.eclipse.jetty.util.component.AbstractLifeCycle;
32  import org.eclipse.jetty.util.component.ContainerLifeCycle;
33  import org.eclipse.jetty.util.component.Dumpable;
34  import org.eclipse.jetty.util.log.Log;
35  import org.eclipse.jetty.util.log.Logger;
36  import org.eclipse.jetty.util.thread.Scheduler;
37  
38  /**
39   * <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that
40   * simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.</p>
41   * <p>{@link SelectorManager} subclasses implement methods to return protocol-specific
42   * {@link EndPoint}s and {@link Connection}s.</p>
43   */
44  public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
45  {
46      public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
47      protected static final Logger LOG = Log.getLogger(SelectorManager.class);
48  
49      private final Executor executor;
50      private final Scheduler scheduler;
51      private final ManagedSelector[] _selectors;
52      private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
53      private long _selectorIndex;
54  
55      protected SelectorManager(Executor executor, Scheduler scheduler)
56      {
57          this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
58      }
59  
60      protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
61      {
62          if (selectors <= 0)
63              throw new IllegalArgumentException("No selectors");
64          this.executor = executor;
65          this.scheduler = scheduler;
66          _selectors = new ManagedSelector[selectors];
67      }
68  
69      public Executor getExecutor()
70      {
71          return executor;
72      }
73  
74      public Scheduler getScheduler()
75      {
76          return scheduler;
77      }
78  
79      /**
80       * Get the connect timeout
81       *
82       * @return the connect timeout (in milliseconds)
83       */
84      public long getConnectTimeout()
85      {
86          return _connectTimeout;
87      }
88  
89      /**
90       * Set the connect timeout (in milliseconds)
91       *
92       * @param milliseconds the number of milliseconds for the timeout
93       */
94      public void setConnectTimeout(long milliseconds)
95      {
96          _connectTimeout = milliseconds;
97      }
98  
99      /**
100      * @return the selector priority delta
101      * @deprecated not implemented
102      */
103     @Deprecated
104     public int getSelectorPriorityDelta()
105     {
106         return 0;
107     }
108 
109     /**
110      * @param selectorPriorityDelta the selector priority delta
111      * @deprecated not implemented
112      */
113     @Deprecated
114     public void setSelectorPriorityDelta(int selectorPriorityDelta)
115     {
116     }
117 
118     /**
119      * Executes the given task in a different thread.
120      *
121      * @param task the task to execute
122      */
123     protected void execute(Runnable task)
124     {
125         executor.execute(task);
126     }
127 
128     /**
129      * @return the number of selectors in use
130      */
131     public int getSelectorCount()
132     {
133         return _selectors.length;
134     }
135 
136     private ManagedSelector chooseSelector(SocketChannel channel)
137     {
138         // Ideally we would like to have all connections from the same client end
139         // up on the same selector (to try to avoid smearing the data from a single 
140         // client over all cores), but because of proxies, the remote address may not
141         // really be the client - so we have to hedge our bets to ensure that all
142         // channels don't end up on the one selector for a proxy.
143         ManagedSelector candidate1 = null;
144         if (channel != null)
145         {
146             try
147             {
148                 SocketAddress remote = channel.getRemoteAddress();
149                 if (remote instanceof InetSocketAddress)
150                 {
151                     byte[] addr = ((InetSocketAddress)remote).getAddress().getAddress();
152                     if (addr != null)
153                     {
154                         int s = addr[addr.length - 1] & 0xFF;
155                         candidate1 = _selectors[s % getSelectorCount()];
156                     }
157                 }
158             }
159             catch (IOException x)
160             {
161                 LOG.ignore(x);
162             }
163         }
164 
165         // The ++ increment here is not atomic, but it does not matter,
166         // so long as the value changes sometimes, then connections will
167         // be distributed over the available selectors.
168         long s = _selectorIndex++;
169         int index = (int)(s % getSelectorCount());
170         ManagedSelector candidate2 = _selectors[index];
171 
172         if (candidate1 == null || candidate1.size() >= candidate2.size() * 2)
173             return candidate2;
174         return candidate1;
175     }
176 
177     /**
178      * <p>Registers a channel to perform a non-blocking connect.</p>
179      * <p>The channel must be set in non-blocking mode, {@link SocketChannel#connect(SocketAddress)}
180      * must be called prior to calling this method, and the connect operation must not be completed
181      * (the return value of {@link SocketChannel#connect(SocketAddress)} must be false).</p>
182      *
183      * @param channel    the channel to register
184      * @param attachment the attachment object
185      * @see #accept(SocketChannel, Object)
186      */
187     public void connect(SocketChannel channel, Object attachment)
188     {
189         ManagedSelector set = chooseSelector(channel);
190         set.submit(set.new Connect(channel, attachment));
191     }
192 
193     /**
194      * @param channel the channel to accept
195      * @see #accept(SocketChannel, Object)
196      */
197     public void accept(SocketChannel channel)
198     {
199         accept(channel, null);
200     }
201 
202     /**
203      * <p>Registers a channel to perform non-blocking read/write operations.</p>
204      * <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()},
205      * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}, or
206      * just after a non-blocking connect via {@link SocketChannel#connect(SocketAddress)} that completed
207      * successfully.</p>
208      *
209      * @param channel    the channel to register
210      * @param attachment the attachment object
211      */
212     public void accept(SocketChannel channel, Object attachment)
213     {
214         final ManagedSelector selector = chooseSelector(channel);
215         selector.submit(selector.new Accept(channel, attachment));
216     }
217 
218     /**
219      * <p>Registers a server channel for accept operations.
220      * When a {@link SocketChannel} is accepted from the given {@link ServerSocketChannel}
221      * then the {@link #accepted(SocketChannel)} method is called, which must be
222      * overridden by a derivation of this class to handle the accepted channel
223      *
224      * @param server the server channel to register
225      */
226     public void acceptor(ServerSocketChannel server)
227     {
228         final ManagedSelector selector = chooseSelector(null);
229         selector.submit(selector.new Acceptor(server));
230     }
231 
232     /**
233      * Callback method when a channel is accepted from the {@link ServerSocketChannel}
234      * passed to {@link #acceptor(ServerSocketChannel)}.
235      * The default impl throws an {@link UnsupportedOperationException}, so it must
236      * be overridden by subclasses if a server channel is provided.
237      *
238      * @param channel the
239      * @throws IOException if unable to accept channel
240      */
241     protected void accepted(SocketChannel channel) throws IOException
242     {
243         throw new UnsupportedOperationException();
244     }
245 
246     @Override
247     protected void doStart() throws Exception
248     {
249         super.doStart();
250         for (int i = 0; i < _selectors.length; i++)
251         {
252             ManagedSelector selector = newSelector(i);
253             _selectors[i] = selector;
254             selector.start();
255             execute(selector);
256         }
257     }
258 
259     /**
260      * <p>Factory method for {@link ManagedSelector}.</p>
261      *
262      * @param id an identifier for the {@link ManagedSelector to create}
263      * @return a new {@link ManagedSelector}
264      */
265     protected ManagedSelector newSelector(int id)
266     {
267         return new ManagedSelector(this, id);
268     }
269 
270     @Override
271     protected void doStop() throws Exception
272     {
273         for (ManagedSelector selector : _selectors)
274             selector.stop();
275         super.doStop();
276     }
277 
278     /**
279      * <p>Callback method invoked when an endpoint is opened.</p>
280      *
281      * @param endpoint the endpoint being opened
282      */
283     protected void endPointOpened(EndPoint endpoint)
284     {
285         endpoint.onOpen();
286     }
287 
288     /**
289      * <p>Callback method invoked when an endpoint is closed.</p>
290      *
291      * @param endpoint the endpoint being closed
292      */
293     protected void endPointClosed(EndPoint endpoint)
294     {
295         endpoint.onClose();
296     }
297 
298     /**
299      * <p>Callback method invoked when a connection is opened.</p>
300      *
301      * @param connection the connection just opened
302      */
303     public void connectionOpened(Connection connection)
304     {
305         try
306         {
307             connection.onOpen();
308         }
309         catch (Throwable x)
310         {
311             if (isRunning())
312                 LOG.warn("Exception while notifying connection " + connection, x);
313             else
314                 LOG.debug("Exception while notifying connection " + connection, x);
315         }
316     }
317 
318     /**
319      * <p>Callback method invoked when a connection is closed.</p>
320      *
321      * @param connection the connection just closed
322      */
323     public void connectionClosed(Connection connection)
324     {
325         try
326         {
327             connection.onClose();
328         }
329         catch (Throwable x)
330         {
331             LOG.debug("Exception while notifying connection " + connection, x);
332         }
333     }
334 
335     protected boolean finishConnect(SocketChannel channel) throws IOException
336     {
337         return channel.finishConnect();
338     }
339 
340     /**
341      * <p>Callback method invoked when a non-blocking connect cannot be completed.</p>
342      * <p>By default it just logs with level warning.</p>
343      *
344      * @param channel    the channel that attempted the connect
345      * @param ex         the exception that caused the connect to fail
346      * @param attachment the attachment object associated at registration
347      */
348     protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
349     {
350         LOG.warn(String.format("%s - %s", channel, attachment), ex);
351     }
352 
353     /**
354      * <p>Factory method to create {@link EndPoint}.</p>
355      * <p>This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)}
356      * or {@link #accept(SocketChannel)}.</p>
357      *
358      * @param channel      the channel associated to the endpoint
359      * @param selector     the selector the channel is registered to
360      * @param selectionKey the selection key
361      * @return a new endpoint
362      * @throws IOException if the endPoint cannot be created
363      * @see #newConnection(SocketChannel, EndPoint, Object)
364      */
365     protected abstract EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException;
366 
367     /**
368      * <p>Factory method to create {@link Connection}.</p>
369      *
370      * @param channel    the channel associated to the connection
371      * @param endpoint   the endpoint
372      * @param attachment the attachment
373      * @return a new connection
374      * @throws IOException if unable to create new connection
375      * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey)
376      */
377     public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
378 
379     @Override
380     public String dump()
381     {
382         return ContainerLifeCycle.dump(this);
383     }
384 
385     @Override
386     public void dump(Appendable out, String indent) throws IOException
387     {
388         ContainerLifeCycle.dumpObject(out, this);
389         ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
390     }
391 }