1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21
22 import org.eclipse.jetty.io.AsyncEndPoint;
23 import org.eclipse.jetty.io.Buffer;
24 import org.eclipse.jetty.io.ConnectedEndPoint;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30
31
32
33
34
35 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
36 {
37 public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
38
39 private final SelectorManager.SelectSet _selectSet;
40 private final SelectorManager _manager;
41 private SelectionKey _key;
42 private final Runnable _handler = new Runnable()
43 {
44 public void run() { handle(); }
45 };
46
47
48 private int _interestOps;
49
50
51
52
53
54
55
56 private volatile Connection _connection;
57
58
59 private boolean _dispatched = false;
60
61
62 private boolean _redispatched = false;
63
64
65 private volatile boolean _writable = true;
66
67
68
69 private boolean _readBlocked;
70
71
72 private boolean _writeBlocked;
73
74
75 private boolean _open;
76
77 private volatile long _idleTimestamp;
78
79
80 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
81 throws IOException
82 {
83 super(channel, maxIdleTime);
84
85 _manager = selectSet.getManager();
86 _selectSet = selectSet;
87 _dispatched = false;
88 _redispatched = false;
89 _open=true;
90 _key = key;
91
92 _connection = _manager.newConnection(channel,this);
93
94 scheduleIdle();
95 }
96
97
98 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
99 throws IOException
100 {
101 super(channel);
102
103 _manager = selectSet.getManager();
104 _selectSet = selectSet;
105 _dispatched = false;
106 _redispatched = false;
107 _open=true;
108 _key = key;
109
110 _connection = _manager.newConnection(channel,this);
111
112 scheduleIdle();
113 }
114
115
116 public SelectionKey getSelectionKey()
117 {
118 synchronized (this)
119 {
120 return _key;
121 }
122 }
123
124
125 public SelectorManager getSelectManager()
126 {
127 return _manager;
128 }
129
130
131 public Connection getConnection()
132 {
133 return _connection;
134 }
135
136
137 public void setConnection(Connection connection)
138 {
139 Connection old=_connection;
140 _connection=connection;
141 _manager.endPointUpgraded(this,old);
142 }
143
144
145 public long getIdleTimestamp()
146 {
147 return _idleTimestamp;
148 }
149
150
151
152
153
154 public void schedule()
155 {
156 synchronized (this)
157 {
158
159 if (_key == null || !_key.isValid())
160 {
161 _readBlocked=false;
162 _writeBlocked=false;
163 this.notifyAll();
164 return;
165 }
166
167
168 if (_readBlocked || _writeBlocked)
169 {
170
171 if (_readBlocked && _key.isReadable())
172 _readBlocked=false;
173 if (_writeBlocked && _key.isWritable())
174 _writeBlocked=false;
175
176
177 this.notifyAll();
178
179
180 if (_dispatched)
181 _key.interestOps(0);
182 return;
183 }
184
185
186 if (!isReadyForDispatch())
187 {
188
189 _key.interestOps(0);
190 return;
191 }
192
193
194 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
195 {
196
197 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
198 _key.interestOps(_interestOps);
199 _writable = true;
200 }
201
202
203 if (!_dispatched)
204 {
205 dispatch();
206 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
207 {
208 _key.interestOps(0);
209 }
210 }
211 }
212 }
213
214
215 public void dispatch()
216 {
217 synchronized(this)
218 {
219 if (_dispatched)
220 {
221 _redispatched=true;
222 }
223 else
224 {
225 _dispatched = true;
226 boolean dispatched = _manager.dispatch(_handler);
227 if(!dispatched)
228 {
229 _dispatched = false;
230 LOG.warn("Dispatched Failed! "+this+" to "+_manager);
231 updateKey();
232 }
233 }
234 }
235 }
236
237
238
239
240
241
242
243
244 protected boolean undispatch()
245 {
246 synchronized (this)
247 {
248 if (_redispatched)
249 {
250 _redispatched=false;
251 return false;
252 }
253 _dispatched = false;
254 updateKey();
255 }
256 return true;
257 }
258
259
260 public void scheduleIdle()
261 {
262 _idleTimestamp=System.currentTimeMillis();
263 }
264
265
266 public void cancelIdle()
267 {
268 _idleTimestamp=0;
269 }
270
271
272 public void checkIdleTimestamp(long now)
273 {
274 long idleTimestamp=_idleTimestamp;
275 if (!getChannel().isOpen() || idleTimestamp!=0 && _maxIdleTime>0 && now>(idleTimestamp+_maxIdleTime))
276 idleExpired();
277 }
278
279
280 protected void idleExpired()
281 {
282 _connection.idleExpired();
283 }
284
285
286
287
288
289 public boolean isProgressing()
290 {
291 return false;
292 }
293
294
295
296
297 @Override
298 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
299 {
300 int l = super.flush(header, buffer, trailer);
301
302
303 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
304 {
305 synchronized (this)
306 {
307 _writable=false;
308 if (!_dispatched)
309 updateKey();
310 }
311 }
312 else
313 _writable=true;
314 return l;
315 }
316
317
318
319
320 @Override
321 public int flush(Buffer buffer) throws IOException
322 {
323 int l = super.flush(buffer);
324
325
326 if (l==0 && buffer!=null && buffer.hasContent())
327 {
328 synchronized (this)
329 {
330 _writable=false;
331 if (!_dispatched)
332 updateKey();
333 }
334 }
335 else
336 _writable=true;
337
338 return l;
339 }
340
341
342 public boolean isReadyForDispatch()
343 {
344 synchronized (this)
345 {
346
347 return !(_dispatched || getConnection().isSuspended());
348 }
349 }
350
351
352
353
354
355 @Override
356 public boolean blockReadable(long timeoutMs) throws IOException
357 {
358 synchronized (this)
359 {
360 long now=_selectSet.getNow();
361 long end=now+timeoutMs;
362 try
363 {
364 _readBlocked=true;
365 while (isOpen() && _readBlocked)
366 {
367 try
368 {
369 updateKey();
370 this.wait(timeoutMs>=0?(end-now):10000);
371 }
372 catch (InterruptedException e)
373 {
374 LOG.warn(e);
375 }
376 finally
377 {
378 now=_selectSet.getNow();
379 }
380
381 if (_readBlocked && timeoutMs>0 && now>=end)
382 return false;
383 }
384 }
385 finally
386 {
387 _readBlocked=false;
388 }
389 }
390 return true;
391 }
392
393
394
395
396
397 @Override
398 public boolean blockWritable(long timeoutMs) throws IOException
399 {
400 synchronized (this)
401 {
402 if (!isOpen() || isOutputShutdown())
403 throw new EofException();
404
405 long now=_selectSet.getNow();
406 long end=now+timeoutMs;
407 try
408 {
409 _writeBlocked=true;
410 while (isOpen() && _writeBlocked && !isOutputShutdown())
411 {
412 try
413 {
414 updateKey();
415 this.wait(timeoutMs>=0?(end-now):10000);
416 }
417 catch (InterruptedException e)
418 {
419 LOG.warn(e);
420 }
421 finally
422 {
423 now=_selectSet.getNow();
424 }
425 if (_writeBlocked && timeoutMs>0 && now>=end)
426 return false;
427 }
428 }
429 catch(Throwable e)
430 {
431
432 LOG.warn(e);
433 if (e instanceof RuntimeException)
434 throw (RuntimeException)e;
435 if (e instanceof Error)
436 throw (Error)e;
437 throw new RuntimeException(e);
438 }
439 finally
440 {
441 _writeBlocked=false;
442 if (_idleTimestamp!=-1)
443 scheduleIdle();
444 }
445 }
446 return true;
447 }
448
449
450
451 public void clearWritable()
452 {
453 _writable=false;
454 }
455
456
457 public void scheduleWrite()
458 {
459 if (_writable==true)
460 LOG.debug("Required scheduleWrite {}",this);
461
462 _writable=false;
463 updateKey();
464 }
465
466
467
468
469
470
471
472 private void updateKey()
473 {
474 synchronized (this)
475 {
476 int ops=-1;
477 if (getChannel().isOpen())
478 {
479 _interestOps =
480 ((!_socket.isInputShutdown() && (!_dispatched || _readBlocked)) ? SelectionKey.OP_READ : 0)
481 | ((!_socket.isOutputShutdown()&& (!_writable || _writeBlocked)) ? SelectionKey.OP_WRITE : 0);
482 try
483 {
484 ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
485 }
486 catch(Exception e)
487 {
488 _key=null;
489 LOG.ignore(e);
490 }
491 }
492
493 if(_interestOps == ops && getChannel().isOpen())
494 return;
495 }
496 _selectSet.addChange(this);
497 _selectSet.wakeup();
498 }
499
500
501
502
503
504 void doUpdateKey()
505 {
506 synchronized (this)
507 {
508 if (getChannel().isOpen())
509 {
510 if (_interestOps>0)
511 {
512 if (_key==null || !_key.isValid())
513 {
514 SelectableChannel sc = (SelectableChannel)getChannel();
515 if (sc.isRegistered())
516 {
517 updateKey();
518 }
519 else
520 {
521 try
522 {
523 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
524 }
525 catch (Exception e)
526 {
527 LOG.ignore(e);
528 if (_key!=null && _key.isValid())
529 {
530 _key.cancel();
531 }
532 cancelIdle();
533
534 if (_open)
535 {
536 _selectSet.destroyEndPoint(this);
537 }
538 _open=false;
539 _key = null;
540 }
541 }
542 }
543 else
544 {
545 _key.interestOps(_interestOps);
546 }
547 }
548 else
549 {
550 if (_key!=null && _key.isValid())
551 _key.interestOps(0);
552 else
553 _key=null;
554 }
555 }
556 else
557 {
558 if (_key!=null && _key.isValid())
559 _key.cancel();
560
561 cancelIdle();
562 if (_open)
563 {
564 _open=false;
565 _selectSet.destroyEndPoint(this);
566 }
567 _key = null;
568 }
569 }
570 }
571
572
573
574
575 protected void handle()
576 {
577 boolean dispatched=true;
578 try
579 {
580 while(dispatched)
581 {
582 try
583 {
584 while(true)
585 {
586 final Connection next = _connection.handle();
587 if (next!=_connection)
588 {
589 LOG.debug("{} replaced {}",next,_connection);
590 _connection=next;
591 continue;
592 }
593 break;
594 }
595 }
596 catch (ClosedChannelException e)
597 {
598 LOG.ignore(e);
599 }
600 catch (EofException e)
601 {
602 LOG.debug("EOF", e);
603 try{getChannel().close();}
604 catch(IOException e2){LOG.ignore(e2);}
605 }
606 catch (IOException e)
607 {
608 LOG.warn(e.toString());
609 LOG.debug(e);
610 try{getChannel().close();}
611 catch(IOException e2){LOG.ignore(e2);}
612 }
613 catch (Throwable e)
614 {
615 LOG.warn("handle failed", e);
616 try{getChannel().close();}
617 catch(IOException e2){LOG.ignore(e2);}
618 }
619 dispatched=!undispatch();
620 }
621 }
622 finally
623 {
624 if (dispatched)
625 {
626 dispatched=!undispatch();
627 while (dispatched)
628 {
629 LOG.warn("SCEP.run() finally DISPATCHED");
630 dispatched=!undispatch();
631 }
632 }
633 }
634 }
635
636
637
638
639
640 @Override
641 public void close() throws IOException
642 {
643 try
644 {
645 super.close();
646 }
647 catch (IOException e)
648 {
649 LOG.ignore(e);
650 }
651 finally
652 {
653 updateKey();
654 }
655 }
656
657
658 @Override
659 public String toString()
660 {
661 synchronized(this)
662 {
663 return "SCEP@" + hashCode() + _channel+
664 "[o="+isOpen()+" d=" + _dispatched + ",io=" + _interestOps+
665 ",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
666 }
667 }
668
669
670 public SelectSet getSelectSet()
671 {
672 return _selectSet;
673 }
674
675
676
677
678
679
680 @Override
681 public void setMaxIdleTime(int timeMs) throws IOException
682 {
683 _maxIdleTime=timeMs;
684 }
685
686 }