1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.Socket;
24 import java.net.SocketAddress;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.ServerSocketChannel;
27 import java.nio.channels.SocketChannel;
28 import java.util.concurrent.Executor;
29
30 import org.eclipse.jetty.util.TypeUtil;
31 import org.eclipse.jetty.util.component.AbstractLifeCycle;
32 import org.eclipse.jetty.util.component.ContainerLifeCycle;
33 import org.eclipse.jetty.util.component.Dumpable;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.log.Logger;
36 import org.eclipse.jetty.util.thread.Scheduler;
37
38
39
40
41
42
43
44 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
45 {
46 public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
47 protected static final Logger LOG = Log.getLogger(SelectorManager.class);
48
49 private final Executor executor;
50 private final Scheduler scheduler;
51 private final ManagedSelector[] _selectors;
52 private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
53 private long _selectorIndex;
54
55 protected SelectorManager(Executor executor, Scheduler scheduler)
56 {
57 this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
58 }
59
60 protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
61 {
62 if (selectors <= 0)
63 throw new IllegalArgumentException("No selectors");
64 this.executor = executor;
65 this.scheduler = scheduler;
66 _selectors = new ManagedSelector[selectors];
67 }
68
69 public Executor getExecutor()
70 {
71 return executor;
72 }
73
74 public Scheduler getScheduler()
75 {
76 return scheduler;
77 }
78
79
80
81
82
83
84 public long getConnectTimeout()
85 {
86 return _connectTimeout;
87 }
88
89
90
91
92
93
94 public void setConnectTimeout(long milliseconds)
95 {
96 _connectTimeout = milliseconds;
97 }
98
99
100
101
102
103 @Deprecated
104 public int getSelectorPriorityDelta()
105 {
106 return 0;
107 }
108
109
110
111
112
113 @Deprecated
114 public void setSelectorPriorityDelta(int selectorPriorityDelta)
115 {
116 }
117
118
119
120
121
122
123 protected void execute(Runnable task)
124 {
125 executor.execute(task);
126 }
127
128
129
130
131 public int getSelectorCount()
132 {
133 return _selectors.length;
134 }
135
136 private ManagedSelector chooseSelector(SocketChannel channel)
137 {
138
139
140
141
142
143 ManagedSelector candidate1 = null;
144 if (channel != null)
145 {
146 try
147 {
148 SocketAddress remote = channel.getRemoteAddress();
149 if (remote instanceof InetSocketAddress)
150 {
151 byte[] addr = ((InetSocketAddress)remote).getAddress().getAddress();
152 if (addr != null)
153 {
154 int s = addr[addr.length - 1] & 0xFF;
155 candidate1 = _selectors[s % getSelectorCount()];
156 }
157 }
158 }
159 catch (IOException x)
160 {
161 LOG.ignore(x);
162 }
163 }
164
165
166
167
168 long s = _selectorIndex++;
169 int index = (int)(s % getSelectorCount());
170 ManagedSelector candidate2 = _selectors[index];
171
172 if (candidate1 == null || candidate1.size() >= candidate2.size() * 2)
173 return candidate2;
174 return candidate1;
175 }
176
177
178
179
180
181
182
183
184
185
186
187 public void connect(SocketChannel channel, Object attachment)
188 {
189 ManagedSelector set = chooseSelector(channel);
190 set.submit(set.new Connect(channel, attachment));
191 }
192
193
194
195
196
197 public void accept(SocketChannel channel)
198 {
199 accept(channel, null);
200 }
201
202
203
204
205
206
207
208
209
210
211
212 public void accept(SocketChannel channel, Object attachment)
213 {
214 final ManagedSelector selector = chooseSelector(channel);
215 selector.submit(selector.new Accept(channel, attachment));
216 }
217
218
219
220
221
222
223
224
225
226 public void acceptor(ServerSocketChannel server)
227 {
228 final ManagedSelector selector = chooseSelector(null);
229 selector.submit(selector.new Acceptor(server));
230 }
231
232
233
234
235
236
237
238
239
240
241 protected void accepted(SocketChannel channel) throws IOException
242 {
243 throw new UnsupportedOperationException();
244 }
245
246 @Override
247 protected void doStart() throws Exception
248 {
249 super.doStart();
250 for (int i = 0; i < _selectors.length; i++)
251 {
252 ManagedSelector selector = newSelector(i);
253 _selectors[i] = selector;
254 selector.start();
255 execute(selector);
256 }
257 }
258
259
260
261
262
263
264
265 protected ManagedSelector newSelector(int id)
266 {
267 return new ManagedSelector(this, id);
268 }
269
270 @Override
271 protected void doStop() throws Exception
272 {
273 for (ManagedSelector selector : _selectors)
274 selector.stop();
275 super.doStop();
276 }
277
278
279
280
281
282
283 protected void endPointOpened(EndPoint endpoint)
284 {
285 endpoint.onOpen();
286 }
287
288
289
290
291
292
293 protected void endPointClosed(EndPoint endpoint)
294 {
295 endpoint.onClose();
296 }
297
298
299
300
301
302
303 public void connectionOpened(Connection connection)
304 {
305 try
306 {
307 connection.onOpen();
308 }
309 catch (Throwable x)
310 {
311 if (isRunning())
312 LOG.warn("Exception while notifying connection " + connection, x);
313 else
314 LOG.debug("Exception while notifying connection " + connection, x);
315 }
316 }
317
318
319
320
321
322
323 public void connectionClosed(Connection connection)
324 {
325 try
326 {
327 connection.onClose();
328 }
329 catch (Throwable x)
330 {
331 LOG.debug("Exception while notifying connection " + connection, x);
332 }
333 }
334
335 protected boolean finishConnect(SocketChannel channel) throws IOException
336 {
337 return channel.finishConnect();
338 }
339
340
341
342
343
344
345
346
347
348 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
349 {
350 LOG.warn(String.format("%s - %s", channel, attachment), ex);
351 }
352
353
354
355
356
357
358
359
360
361
362
363
364
365 protected abstract EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException;
366
367
368
369
370
371
372
373
374
375
376
377 public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
378
379 @Override
380 public String dump()
381 {
382 return ContainerLifeCycle.dump(this);
383 }
384
385 @Override
386 public void dump(Appendable out, String indent) throws IOException
387 {
388 ContainerLifeCycle.dumpObject(out, this);
389 ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
390 }
391 }