1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21 import java.util.concurrent.atomic.AtomicInteger;
22
23 import org.eclipse.jetty.io.AsyncEndPoint;
24 import org.eclipse.jetty.io.Buffer;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.thread.Timeout;
30
31
32
33
34
35
36
37
38 public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint
39 {
40 private final SelectorManager.SelectSet _selectSet;
41 private final Connection _connection;
42 private final SelectorManager _manager;
43 private boolean _dispatched = false;
44 private boolean _redispatched = false;
45 private volatile boolean _writable = true;
46
47 private SelectionKey _key;
48 private int _interestOps;
49 private boolean _readBlocked;
50 private boolean _writeBlocked;
51 private boolean _open;
52 private final Timeout.Task _idleTask = new IdleTask();
53
54
55 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
56 {
57 super(channel);
58
59 _manager = selectSet.getManager();
60 _selectSet = selectSet;
61 _connection = _manager.newConnection(channel,this);
62 _dispatched = false;
63 _redispatched = false;
64 _open=true;
65 _manager.endPointOpened(this);
66
67 _key = key;
68 scheduleIdle();
69 }
70
71
72 public Connection getConnection()
73 {
74 return _connection;
75 }
76
77
78 public SelectorManager getSelectManager()
79 {
80 return _manager;
81 }
82
83
84
85
86
87 public void schedule()
88 {
89
90 synchronized (this)
91 {
92
93 if (_key == null || !_key.isValid())
94 {
95 _readBlocked=false;
96 _writeBlocked=false;
97 this.notifyAll();
98 return;
99 }
100
101
102 if (_readBlocked || _writeBlocked)
103 {
104
105 if (_readBlocked && _key.isReadable())
106 _readBlocked=false;
107 if (_writeBlocked && _key.isWritable())
108 _writeBlocked=false;
109
110
111 this.notifyAll();
112
113
114 _key.interestOps(0);
115 return;
116 }
117
118
119 if (!isReadyForDispatch())
120 {
121
122 _key.interestOps(0);
123 return;
124 }
125
126
127
128 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
129 {
130
131 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
132 _key.interestOps(_interestOps);
133 _writable = true;
134 }
135
136 if (_dispatched)
137 _key.interestOps(0);
138 else
139 dispatch();
140 }
141 }
142
143
144 public void dispatch()
145 {
146 synchronized(this)
147 {
148 if (_dispatched)
149 _redispatched=true;
150 else
151 {
152 _dispatched = _manager.dispatch(this);
153 if(!_dispatched)
154 {
155 Log.warn("Dispatched Failed!");
156 updateKey();
157 }
158 }
159 }
160 }
161
162
163
164
165
166
167
168
169 private boolean undispatch()
170 {
171 synchronized (this)
172 {
173 if (_redispatched)
174 {
175 _redispatched=false;
176 return false;
177 }
178 _dispatched = false;
179 updateKey();
180 }
181 return true;
182 }
183
184
185 public void scheduleIdle()
186 {
187 _selectSet.scheduleIdle(_idleTask);
188 }
189
190
191 public void cancelIdle()
192 {
193 _selectSet.cancelIdle(_idleTask);
194 }
195
196
197 protected void idleExpired()
198 {
199 try
200 {
201 close();
202 }
203 catch (IOException e)
204 {
205 Log.ignore(e);
206 }
207 }
208
209
210
211
212 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
213 {
214 int l = super.flush(header, buffer, trailer);
215 _writable = l!=0;
216 return l;
217 }
218
219
220
221
222 public int flush(Buffer buffer) throws IOException
223 {
224 int l = super.flush(buffer);
225 _writable = l!=0;
226 return l;
227 }
228
229
230 public boolean isReadyForDispatch()
231 {
232 synchronized (this)
233 {
234 return !(_dispatched || getConnection().isSuspended());
235 }
236 }
237
238
239
240
241
242 public boolean blockReadable(long timeoutMs) throws IOException
243 {
244 synchronized (this)
245 {
246 long start=_selectSet.getNow();
247 try
248 {
249 _readBlocked=true;
250 while (isOpen() && _readBlocked)
251 {
252 try
253 {
254 updateKey();
255 this.wait(timeoutMs);
256
257 if (_readBlocked && timeoutMs<(_selectSet.getNow()-start))
258 return false;
259 }
260 catch (InterruptedException e)
261 {
262 Log.warn(e);
263 }
264 }
265 }
266 finally
267 {
268 _readBlocked=false;
269 }
270 }
271 return true;
272 }
273
274
275
276
277
278 public boolean blockWritable(long timeoutMs) throws IOException
279 {
280 synchronized (this)
281 {
282 long start=_selectSet.getNow();
283 try
284 {
285 _writeBlocked=true;
286 while (isOpen() && _writeBlocked)
287 {
288 try
289 {
290 updateKey();
291 this.wait(timeoutMs);
292
293 if (_writeBlocked && timeoutMs<(_selectSet.getNow()-start))
294 return false;
295 }
296 catch (InterruptedException e)
297 {
298 Log.warn(e);
299 }
300 }
301 }
302 finally
303 {
304 _writeBlocked=false;
305 }
306 }
307 return true;
308 }
309
310
311 public void setWritable(boolean writable)
312 {
313 _writable=writable;
314 }
315
316
317 public void scheduleWrite()
318 {
319 _writable=false;
320 updateKey();
321 }
322
323
324
325
326
327
328
329 private void updateKey()
330 {
331 synchronized (this)
332 {
333 int ops=-1;
334 if (getChannel().isOpen())
335 {
336 _interestOps =
337 ((!_dispatched || _readBlocked) ? SelectionKey.OP_READ : 0)
338 | ((!_writable || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
339 try
340 {
341 ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
342 }
343 catch(Exception e)
344 {
345 _key=null;
346 Log.ignore(e);
347 }
348 }
349
350 if(_interestOps == ops && getChannel().isOpen())
351 return;
352
353 }
354 _selectSet.addChange(this);
355 _selectSet.wakeup();
356 }
357
358
359
360
361
362 void doUpdateKey()
363 {
364 synchronized (this)
365 {
366 if (getChannel().isOpen())
367 {
368 if (_interestOps>0)
369 {
370 if (_key==null || !_key.isValid())
371 {
372 SelectableChannel sc = (SelectableChannel)getChannel();
373 if (sc.isRegistered())
374 {
375 updateKey();
376 }
377 else
378 {
379 try
380 {
381 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
382 }
383 catch (Exception e)
384 {
385 Log.ignore(e);
386 if (_key!=null && _key.isValid())
387 {
388 _key.cancel();
389 }
390 cancelIdle();
391
392 if (_open)
393 _manager.endPointClosed(this);
394 _open=false;
395 _key = null;
396 }
397 }
398 }
399 else
400 {
401 _key.interestOps(_interestOps);
402 }
403 }
404 else
405 {
406 if (_key.isValid())
407 _key.interestOps(0);
408 else
409 _key=null;
410 }
411 }
412 else
413 {
414 if (_key!=null && _key.isValid())
415 _key.cancel();
416
417 cancelIdle();
418 if (_open)
419 _manager.endPointClosed(this);
420 _open=false;
421 _key = null;
422 }
423 }
424 }
425
426
427
428
429 public void run()
430 {
431 boolean dispatched=true;
432 try
433 {
434 while(dispatched)
435 {
436 try
437 {
438 _connection.handle();
439 }
440 catch (ClosedChannelException e)
441 {
442 Log.ignore(e);
443 }
444 catch (EofException e)
445 {
446 Log.debug("EOF", e);
447 try{close();}
448 catch(IOException e2){Log.ignore(e2);}
449 }
450 catch (IOException e)
451 {
452 Log.warn(e.toString());
453 Log.debug(e);
454 try{close();}
455 catch(IOException e2){Log.ignore(e2);}
456 }
457 catch (Throwable e)
458 {
459 Log.warn("handle failed", e);
460 try{close();}
461 catch(IOException e2){Log.ignore(e2);}
462 }
463 dispatched=!undispatch();
464 }
465 }
466 finally
467 {
468 if (dispatched)
469 {
470 dispatched=!undispatch();
471 while (dispatched)
472 {
473 Log.warn("SCEP.run() finally DISPATCHED");
474 dispatched=!undispatch();
475 }
476 }
477 }
478 }
479
480
481
482
483
484 public void close() throws IOException
485 {
486 try
487 {
488 super.close();
489 }
490 catch (IOException e)
491 {
492 Log.ignore(e);
493 }
494 finally
495 {
496 updateKey();
497 }
498 }
499
500
501 public String toString()
502 {
503 synchronized(this)
504 {
505 return "SCEP@" + hashCode() + "\t[d=" + _dispatched + ",io=" + _interestOps+
506 ",w=" + _writable + ",b=" + _readBlocked + "|" + _writeBlocked + "]";
507 }
508 }
509
510
511 public Timeout.Task getTimeoutTask()
512 {
513 return _idleTask;
514 }
515
516
517 public SelectSet getSelectSet()
518 {
519 return _selectSet;
520 }
521
522
523
524
525 private class IdleTask extends Timeout.Task
526 {
527
528
529
530
531 public void expired()
532 {
533 idleExpired();
534 }
535
536 public String toString()
537 {
538 return "TimeoutTask:" + SelectChannelEndPoint.this.toString();
539 }
540
541 }
542
543 }