1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21
22 import org.eclipse.jetty.io.AsyncEndPoint;
23 import org.eclipse.jetty.io.Buffer;
24 import org.eclipse.jetty.io.ConnectedEndPoint;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28 import org.eclipse.jetty.util.log.Log;
29
30
31
32
33
34 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
35 {
36 private final SelectorManager.SelectSet _selectSet;
37 private final SelectorManager _manager;
38 private final Runnable _handler = new Runnable()
39 {
40 public void run() { handle(); }
41 };
42
43 private volatile Connection _connection;
44 private boolean _dispatched = false;
45 private boolean _redispatched = false;
46 private volatile boolean _writable = true;
47
48 private SelectionKey _key;
49 private int _interestOps;
50 private boolean _readBlocked;
51 private boolean _writeBlocked;
52 private boolean _open;
53 private volatile long _idleTimestamp;
54 private boolean _changing=false;
55
56
57 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
58 throws IOException
59 {
60 super(channel, maxIdleTime);
61
62 _manager = selectSet.getManager();
63 _selectSet = selectSet;
64 _dispatched = false;
65 _redispatched = false;
66 _open=true;
67 _key = key;
68
69 _connection = _manager.newConnection(channel,this);
70
71 scheduleIdle();
72 }
73
74
75 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
76 throws IOException
77 {
78 super(channel);
79
80 _manager = selectSet.getManager();
81 _selectSet = selectSet;
82 _dispatched = false;
83 _redispatched = false;
84 _open=true;
85 _key = key;
86
87 _connection = _manager.newConnection(channel,this);
88
89 scheduleIdle();
90 }
91
92 public SelectionKey getSelectionKey()
93 {
94 synchronized (this)
95 {
96 return _key;
97 }
98 }
99
100
101 public SelectorManager getSelectManager()
102 {
103 return _manager;
104 }
105
106
107 public Connection getConnection()
108 {
109 return _connection;
110 }
111
112
113 public void setConnection(Connection connection)
114 {
115 Connection old=_connection;
116 _connection=connection;
117 _manager.endPointUpgraded(this,old);
118 }
119
120
121
122
123
124 public void schedule()
125 {
126 synchronized (this)
127 {
128
129 if (_key == null || !_key.isValid())
130 {
131 _readBlocked=false;
132 _writeBlocked=false;
133 this.notifyAll();
134 return;
135 }
136
137
138 if (_readBlocked || _writeBlocked)
139 {
140
141 if (_readBlocked && _key.isReadable())
142 _readBlocked=false;
143 if (_writeBlocked && _key.isWritable())
144 _writeBlocked=false;
145
146
147 this.notifyAll();
148
149
150 if (_dispatched)
151 _key.interestOps(0);
152 return;
153 }
154
155
156 if (!isReadyForDispatch())
157 {
158
159 _key.interestOps(0);
160 return;
161 }
162
163
164 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
165 {
166
167 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
168 _key.interestOps(_interestOps);
169 _writable = true;
170 }
171
172
173 if (!_dispatched)
174 {
175 dispatch();
176 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
177 {
178 _key.interestOps(0);
179 }
180 }
181 }
182 }
183
184
185 public void dispatch()
186 {
187 synchronized(this)
188 {
189 if (_dispatched)
190 {
191 _redispatched=true;
192 }
193 else
194 {
195 _dispatched = true;
196 boolean dispatched = _manager.dispatch(_handler);
197 if(!dispatched)
198 {
199 _dispatched = false;
200 Log.warn("Dispatched Failed!");
201 updateKey();
202 }
203 }
204 }
205 }
206
207
208
209
210
211
212
213
214 protected boolean undispatch()
215 {
216 synchronized (this)
217 {
218 if (_redispatched)
219 {
220 _redispatched=false;
221 return false;
222 }
223 _dispatched = false;
224 updateKey();
225 }
226 return true;
227 }
228
229
230 public void scheduleIdle()
231 {
232 _idleTimestamp=System.currentTimeMillis();
233 }
234
235
236 public void cancelIdle()
237 {
238 _idleTimestamp=0;
239 }
240
241
242 public void checkIdleTimestamp(long now)
243 {
244 if (_idleTimestamp!=0 && _maxIdleTime!=0 && now>(_idleTimestamp+_maxIdleTime))
245 idleExpired();
246 }
247
248
249 protected void idleExpired()
250 {
251 try
252 {
253 close();
254 }
255 catch (IOException e)
256 {
257 Log.ignore(e);
258 }
259 }
260
261
262
263
264 @Override
265 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
266 {
267 int l = super.flush(header, buffer, trailer);
268 if (!(_writable=l!=0))
269 {
270 synchronized (this)
271 {
272 if (!_dispatched)
273 updateKey();
274 }
275 }
276 return l;
277 }
278
279
280
281
282 @Override
283 public int flush(Buffer buffer) throws IOException
284 {
285 int l = super.flush(buffer);
286 if (!(_writable=l!=0))
287 {
288 synchronized (this)
289 {
290 if (!_dispatched)
291 updateKey();
292 }
293 }
294 return l;
295 }
296
297
298 public boolean isReadyForDispatch()
299 {
300 synchronized (this)
301 {
302
303 return !(_dispatched || getConnection().isSuspended());
304 }
305 }
306
307
308
309
310
311 @Override
312 public boolean blockReadable(long timeoutMs) throws IOException
313 {
314 synchronized (this)
315 {
316 long start=_selectSet.getNow();
317 try
318 {
319 _readBlocked=true;
320 while (isOpen() && _readBlocked)
321 {
322 try
323 {
324 updateKey();
325 this.wait(timeoutMs);
326
327 timeoutMs -= _selectSet.getNow()-start;
328 if (_readBlocked && timeoutMs<=0)
329 return false;
330 }
331 catch (InterruptedException e)
332 {
333 Log.warn(e);
334 }
335 }
336 }
337 finally
338 {
339 _readBlocked=false;
340 }
341 }
342 return true;
343 }
344
345
346
347
348
349 @Override
350 public boolean blockWritable(long timeoutMs) throws IOException
351 {
352 synchronized (this)
353 {
354 long start=_selectSet.getNow();
355 try
356 {
357 _writeBlocked=true;
358 while (isOpen() && _writeBlocked)
359 {
360 try
361 {
362 updateKey();
363 this.wait(timeoutMs);
364
365 timeoutMs -= _selectSet.getNow()-start;
366 if (_writeBlocked && timeoutMs<=0)
367 return false;
368 }
369 catch (InterruptedException e)
370 {
371 Log.warn(e);
372 }
373 }
374 }
375 finally
376 {
377 _writeBlocked=false;
378 if (_idleTimestamp!=-1)
379 scheduleIdle();
380 }
381 }
382 return true;
383 }
384
385
386 public void setWritable(boolean writable)
387 {
388 _writable=writable;
389 }
390
391
392 public void scheduleWrite()
393 {
394 _writable=false;
395 updateKey();
396 }
397
398
399
400
401
402
403
404 private void updateKey()
405 {
406 synchronized (this)
407 {
408 int ops=-1;
409 if (getChannel().isOpen())
410 {
411 _interestOps =
412 ((!_dispatched || _readBlocked) ? SelectionKey.OP_READ : 0)
413 | ((!_writable || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
414 try
415 {
416 ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
417 }
418 catch(Exception e)
419 {
420 _key=null;
421 Log.ignore(e);
422 }
423 }
424
425 if(_interestOps == ops && getChannel().isOpen())
426 return;
427 _changing=true;
428 }
429 _selectSet.addChange(this);
430 _selectSet.wakeup();
431 }
432
433
434
435
436
437 void doUpdateKey()
438 {
439 synchronized (this)
440 {
441 _changing=false;
442 if (getChannel().isOpen())
443 {
444 if (_interestOps>0)
445 {
446 if (_key==null || !_key.isValid())
447 {
448 SelectableChannel sc = (SelectableChannel)getChannel();
449 if (sc.isRegistered())
450 {
451 updateKey();
452 }
453 else
454 {
455 try
456 {
457 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
458 }
459 catch (Exception e)
460 {
461 Log.ignore(e);
462 if (_key!=null && _key.isValid())
463 {
464 _key.cancel();
465 }
466 cancelIdle();
467
468 if (_open)
469 {
470 _selectSet.destroyEndPoint(this);
471 }
472 _open=false;
473 _key = null;
474 }
475 }
476 }
477 else
478 {
479 _key.interestOps(_interestOps);
480 }
481 }
482 else
483 {
484 if (_key!=null && _key.isValid())
485 _key.interestOps(0);
486 else
487 _key=null;
488 }
489 }
490 else
491 {
492 if (_key!=null && _key.isValid())
493 _key.cancel();
494
495 cancelIdle();
496 if (_open)
497 {
498 _selectSet.destroyEndPoint(this);
499 }
500 _open=false;
501 _key = null;
502 }
503 }
504 }
505
506
507
508
509 protected void handle()
510 {
511 boolean dispatched=true;
512 try
513 {
514 while(dispatched)
515 {
516 try
517 {
518 while(true)
519 {
520 final Connection next = _connection.handle();
521 if (next!=_connection)
522 {
523 _connection=next;
524 continue;
525 }
526 break;
527 }
528 }
529 catch (ClosedChannelException e)
530 {
531 Log.ignore(e);
532 }
533 catch (EofException e)
534 {
535 Log.debug("EOF", e);
536 try{close();}
537 catch(IOException e2){Log.ignore(e2);}
538 }
539 catch (IOException e)
540 {
541 Log.warn(e.toString());
542 Log.debug(e);
543 try{close();}
544 catch(IOException e2){Log.ignore(e2);}
545 }
546 catch (Throwable e)
547 {
548 Log.warn("handle failed", e);
549 try{close();}
550 catch(IOException e2){Log.ignore(e2);}
551 }
552 dispatched=!undispatch();
553 }
554 }
555 finally
556 {
557 if (dispatched)
558 {
559 dispatched=!undispatch();
560 while (dispatched)
561 {
562 Log.warn("SCEP.run() finally DISPATCHED");
563 dispatched=!undispatch();
564 }
565 }
566 }
567 }
568
569
570
571
572
573 @Override
574 public void close() throws IOException
575 {
576 try
577 {
578 super.close();
579 }
580 catch (IOException e)
581 {
582 Log.ignore(e);
583 }
584 finally
585 {
586 updateKey();
587 }
588 }
589
590
591 @Override
592 public String toString()
593 {
594 synchronized(this)
595 {
596 return "SCEP@" + hashCode() + _channel+
597 "[d=" + _dispatched + ",io=" + _interestOps+
598 ",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
599 }
600 }
601
602
603 public SelectSet getSelectSet()
604 {
605 return _selectSet;
606 }
607
608
609
610
611
612
613 @Override
614 public void setMaxIdleTime(int timeMs) throws IOException
615 {
616 _maxIdleTime=timeMs;
617 }
618
619
620
621
622
623
624 public void checkKey(SelectionKey key)
625 {
626 synchronized (this)
627 {
628 if (!_changing && key.interestOps()!=_interestOps && isReadyForDispatch())
629 {
630 Log.warn("NIO InterestOps mismatch "+key.interestOps()+"!="+_interestOps+" for "+this);
631 updateKey();
632 }
633 }
634 }
635
636 }