1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21
22 import org.eclipse.jetty.io.AsyncEndPoint;
23 import org.eclipse.jetty.io.Buffer;
24 import org.eclipse.jetty.io.ConnectedEndPoint;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30
31
32
33
34
35 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
36 {
37 public static final Logger __log=Log.getLogger("org.eclipse.jetty.io.nio");
38
39 private final SelectorManager.SelectSet _selectSet;
40 private final SelectorManager _manager;
41 private final Runnable _handler = new Runnable()
42 {
43 public void run() { handle(); }
44 };
45
46 private volatile Connection _connection;
47 private boolean _dispatched = false;
48 private boolean _redispatched = false;
49 private volatile boolean _writable = true;
50
51 private SelectionKey _key;
52 private int _interestOps;
53 private boolean _readBlocked;
54 private boolean _writeBlocked;
55 private boolean _open;
56 private volatile long _idleTimestamp;
57
58
59 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
60 throws IOException
61 {
62 super(channel, maxIdleTime);
63
64 _manager = selectSet.getManager();
65 _selectSet = selectSet;
66 _dispatched = false;
67 _redispatched = false;
68 _open=true;
69 _key = key;
70
71 _connection = _manager.newConnection(channel,this);
72
73 scheduleIdle();
74 }
75
76
77 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
78 throws IOException
79 {
80 super(channel);
81
82 _manager = selectSet.getManager();
83 _selectSet = selectSet;
84 _dispatched = false;
85 _redispatched = false;
86 _open=true;
87 _key = key;
88
89 _connection = _manager.newConnection(channel,this);
90
91 scheduleIdle();
92 }
93
94 public SelectionKey getSelectionKey()
95 {
96 synchronized (this)
97 {
98 return _key;
99 }
100 }
101
102
103 public SelectorManager getSelectManager()
104 {
105 return _manager;
106 }
107
108
109 public Connection getConnection()
110 {
111 return _connection;
112 }
113
114
115 public void setConnection(Connection connection)
116 {
117 Connection old=_connection;
118 _connection=connection;
119 _manager.endPointUpgraded(this,old);
120 }
121
122
123 public long getIdleTimestamp()
124 {
125 return _idleTimestamp;
126 }
127
128
129
130
131
132 public void schedule()
133 {
134 synchronized (this)
135 {
136
137 if (_key == null || !_key.isValid())
138 {
139 _readBlocked=false;
140 _writeBlocked=false;
141 this.notifyAll();
142 return;
143 }
144
145
146 if (_readBlocked || _writeBlocked)
147 {
148
149 if (_readBlocked && _key.isReadable())
150 _readBlocked=false;
151 if (_writeBlocked && _key.isWritable())
152 _writeBlocked=false;
153
154
155 this.notifyAll();
156
157
158 if (_dispatched)
159 _key.interestOps(0);
160 return;
161 }
162
163
164 if (!isReadyForDispatch())
165 {
166
167 _key.interestOps(0);
168 return;
169 }
170
171
172 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
173 {
174
175 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
176 _key.interestOps(_interestOps);
177 _writable = true;
178 }
179
180
181 if (!_dispatched)
182 {
183 dispatch();
184 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
185 {
186 _key.interestOps(0);
187 }
188 }
189 }
190 }
191
192
193 public void dispatch()
194 {
195 synchronized(this)
196 {
197 if (_dispatched)
198 {
199 _redispatched=true;
200 }
201 else
202 {
203 _dispatched = true;
204 boolean dispatched = _manager.dispatch(_handler);
205 if(!dispatched)
206 {
207 _dispatched = false;
208 __log.warn("Dispatched Failed! "+this+" to "+_manager);
209 updateKey();
210 }
211 }
212 }
213 }
214
215
216
217
218
219
220
221
222 protected boolean undispatch()
223 {
224 synchronized (this)
225 {
226 if (_redispatched)
227 {
228 _redispatched=false;
229 return false;
230 }
231 _dispatched = false;
232 updateKey();
233 }
234 return true;
235 }
236
237
238 public void scheduleIdle()
239 {
240 _idleTimestamp=System.currentTimeMillis();
241 }
242
243
244 public void cancelIdle()
245 {
246 _idleTimestamp=0;
247 }
248
249
250 public void checkIdleTimestamp(long now)
251 {
252 long idleTimestamp=_idleTimestamp;
253 if (idleTimestamp!=0 && _maxIdleTime!=0 && now>(idleTimestamp+_maxIdleTime))
254 idleExpired();
255 }
256
257
258 protected void idleExpired()
259 {
260 _connection.idleExpired();
261 }
262
263
264
265
266 @Override
267 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
268 {
269 int l = super.flush(header, buffer, trailer);
270
271
272 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
273 {
274 synchronized (this)
275 {
276 _writable=false;
277 if (!_dispatched)
278 updateKey();
279 }
280 }
281 else
282 _writable=true;
283 return l;
284 }
285
286
287
288
289 @Override
290 public int flush(Buffer buffer) throws IOException
291 {
292 int l = super.flush(buffer);
293
294
295 if (l==0 && buffer!=null && buffer.hasContent())
296 {
297 synchronized (this)
298 {
299 _writable=false;
300 if (!_dispatched)
301 updateKey();
302 }
303 }
304 else
305 _writable=true;
306
307 return l;
308 }
309
310
311 public boolean isReadyForDispatch()
312 {
313 synchronized (this)
314 {
315
316 return !(_dispatched || getConnection().isSuspended());
317 }
318 }
319
320
321
322
323
324 @Override
325 public boolean blockReadable(long timeoutMs) throws IOException
326 {
327 synchronized (this)
328 {
329 long now=_selectSet.getNow();
330 long end=now+timeoutMs;
331 try
332 {
333 _readBlocked=true;
334 while (isOpen() && _readBlocked)
335 {
336 try
337 {
338 updateKey();
339 this.wait(timeoutMs>=0?(end-now):10000);
340 }
341 catch (InterruptedException e)
342 {
343 __log.warn(e);
344 }
345 finally
346 {
347 now=_selectSet.getNow();
348 }
349
350 if (_readBlocked && timeoutMs>0 && now>=end)
351 return false;
352 }
353 }
354 finally
355 {
356 _readBlocked=false;
357 }
358 }
359 return true;
360 }
361
362
363
364
365
366 @Override
367 public boolean blockWritable(long timeoutMs) throws IOException
368 {
369 synchronized (this)
370 {
371 if (!isOpen() || isOutputShutdown())
372 throw new EofException();
373
374 long now=_selectSet.getNow();
375 long end=now+timeoutMs;
376 try
377 {
378 _writeBlocked=true;
379 while (isOpen() && _writeBlocked && !isOutputShutdown())
380 {
381 try
382 {
383 updateKey();
384 this.wait(timeoutMs>=0?(end-now):10000);
385 }
386 catch (InterruptedException e)
387 {
388 __log.warn(e);
389 }
390 finally
391 {
392 now=_selectSet.getNow();
393 }
394 if (_writeBlocked && timeoutMs>0 && now>=end)
395 return false;
396 }
397 }
398 catch(Throwable e)
399 {
400
401 __log.warn(e);
402 if (e instanceof RuntimeException)
403 throw (RuntimeException)e;
404 if (e instanceof Error)
405 throw (Error)e;
406 throw new RuntimeException(e);
407 }
408 finally
409 {
410 _writeBlocked=false;
411 if (_idleTimestamp!=-1)
412 scheduleIdle();
413 }
414 }
415 return true;
416 }
417
418
419 public void scheduleWrite()
420 {
421 _writable=false;
422 updateKey();
423 }
424
425
426
427
428
429
430
431 private void updateKey()
432 {
433 synchronized (this)
434 {
435 int ops=-1;
436 if (getChannel().isOpen())
437 {
438 _interestOps =
439 ((!_socket.isInputShutdown() && (!_dispatched || _readBlocked)) ? SelectionKey.OP_READ : 0)
440 | ((!_socket.isOutputShutdown()&& (!_writable || _writeBlocked)) ? SelectionKey.OP_WRITE : 0);
441 try
442 {
443 ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
444 }
445 catch(Exception e)
446 {
447 _key=null;
448 __log.ignore(e);
449 }
450 }
451
452 if(_interestOps == ops && getChannel().isOpen())
453 return;
454 }
455 _selectSet.addChange(this);
456 _selectSet.wakeup();
457 }
458
459
460
461
462
463 void doUpdateKey()
464 {
465 synchronized (this)
466 {
467 if (getChannel().isOpen())
468 {
469 if (_interestOps>0)
470 {
471 if (_key==null || !_key.isValid())
472 {
473 SelectableChannel sc = (SelectableChannel)getChannel();
474 if (sc.isRegistered())
475 {
476 updateKey();
477 }
478 else
479 {
480 try
481 {
482 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
483 }
484 catch (Exception e)
485 {
486 __log.ignore(e);
487 if (_key!=null && _key.isValid())
488 {
489 _key.cancel();
490 }
491 cancelIdle();
492
493 if (_open)
494 {
495 _selectSet.destroyEndPoint(this);
496 }
497 _open=false;
498 _key = null;
499 }
500 }
501 }
502 else
503 {
504 _key.interestOps(_interestOps);
505 }
506 }
507 else
508 {
509 if (_key!=null && _key.isValid())
510 _key.interestOps(0);
511 else
512 _key=null;
513 }
514 }
515 else
516 {
517 if (_key!=null && _key.isValid())
518 _key.cancel();
519
520 cancelIdle();
521 if (_open)
522 {
523 _selectSet.destroyEndPoint(this);
524 }
525 _open=false;
526 _key = null;
527 }
528 }
529 }
530
531
532
533
534 protected void handle()
535 {
536 boolean dispatched=true;
537 try
538 {
539 while(dispatched)
540 {
541 try
542 {
543 while(true)
544 {
545 final Connection next = _connection.handle();
546 if (next!=_connection)
547 {
548 __log.debug("{} replaced {}",next,_connection);
549 _connection=next;
550 continue;
551 }
552 break;
553 }
554 }
555 catch (ClosedChannelException e)
556 {
557 __log.ignore(e);
558 }
559 catch (EofException e)
560 {
561 __log.debug("EOF", e);
562 try{close();}
563 catch(IOException e2){__log.ignore(e2);}
564 }
565 catch (IOException e)
566 {
567 __log.warn(e.toString());
568 __log.debug(e);
569 try{close();}
570 catch(IOException e2){__log.ignore(e2);}
571 }
572 catch (Throwable e)
573 {
574 __log.warn("handle failed", e);
575 try{close();}
576 catch(IOException e2){__log.ignore(e2);}
577 }
578 dispatched=!undispatch();
579 }
580 }
581 finally
582 {
583 if (dispatched)
584 {
585 dispatched=!undispatch();
586 while (dispatched)
587 {
588 __log.warn("SCEP.run() finally DISPATCHED");
589 dispatched=!undispatch();
590 }
591 }
592 }
593 }
594
595
596
597
598
599 @Override
600 public void close() throws IOException
601 {
602 try
603 {
604 super.close();
605 }
606 catch (IOException e)
607 {
608 __log.ignore(e);
609 }
610 finally
611 {
612 updateKey();
613 }
614 }
615
616
617 @Override
618 public String toString()
619 {
620 synchronized(this)
621 {
622 return "SCEP@" + hashCode() + _channel+
623 "[d=" + _dispatched + ",io=" + _interestOps+
624 ",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
625 }
626 }
627
628
629 public SelectSet getSelectSet()
630 {
631 return _selectSet;
632 }
633
634
635
636
637
638
639 @Override
640 public void setMaxIdleTime(int timeMs) throws IOException
641 {
642 _maxIdleTime=timeMs;
643 }
644
645 }