1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21
22 import org.eclipse.jetty.io.AsyncEndPoint;
23 import org.eclipse.jetty.io.Buffer;
24 import org.eclipse.jetty.io.ConnectedEndPoint;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28 import org.eclipse.jetty.util.log.Log;
29
30
31
32
33
34 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
35 {
36 private final SelectorManager.SelectSet _selectSet;
37 private final SelectorManager _manager;
38 private final Runnable _handler = new Runnable()
39 {
40 public void run() { handle(); }
41 };
42
43 private volatile Connection _connection;
44 private boolean _dispatched = false;
45 private boolean _redispatched = false;
46 private volatile boolean _writable = true;
47
48 private SelectionKey _key;
49 private int _interestOps;
50 private boolean _readBlocked;
51 private boolean _writeBlocked;
52 private boolean _open;
53 private volatile long _idleTimestamp;
54
55
56 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
57 throws IOException
58 {
59 super(channel, maxIdleTime);
60
61 _manager = selectSet.getManager();
62 _selectSet = selectSet;
63 _dispatched = false;
64 _redispatched = false;
65 _open=true;
66 _key = key;
67
68 _connection = _manager.newConnection(channel,this);
69
70 scheduleIdle();
71 }
72
73
74 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
75 throws IOException
76 {
77 super(channel);
78
79 _manager = selectSet.getManager();
80 _selectSet = selectSet;
81 _dispatched = false;
82 _redispatched = false;
83 _open=true;
84 _key = key;
85
86 _connection = _manager.newConnection(channel,this);
87
88 scheduleIdle();
89 }
90
91 public SelectionKey getSelectionKey()
92 {
93 synchronized (this)
94 {
95 return _key;
96 }
97 }
98
99
100 public SelectorManager getSelectManager()
101 {
102 return _manager;
103 }
104
105
106 public Connection getConnection()
107 {
108 return _connection;
109 }
110
111
112 public void setConnection(Connection connection)
113 {
114 Connection old=_connection;
115 _connection=connection;
116 _manager.endPointUpgraded(this,old);
117 }
118
119
120
121
122
123 public void schedule()
124 {
125 synchronized (this)
126 {
127
128 if (_key == null || !_key.isValid())
129 {
130 _readBlocked=false;
131 _writeBlocked=false;
132 this.notifyAll();
133 return;
134 }
135
136
137 if (_readBlocked || _writeBlocked)
138 {
139
140 if (_readBlocked && _key.isReadable())
141 _readBlocked=false;
142 if (_writeBlocked && _key.isWritable())
143 _writeBlocked=false;
144
145
146 this.notifyAll();
147
148
149 if (_dispatched)
150 _key.interestOps(0);
151 return;
152 }
153
154
155 if (!isReadyForDispatch())
156 {
157
158 _key.interestOps(0);
159 return;
160 }
161
162
163 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
164 {
165
166 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
167 _key.interestOps(_interestOps);
168 _writable = true;
169 }
170
171
172 if (!_dispatched)
173 {
174 dispatch();
175 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
176 {
177 _key.interestOps(0);
178 }
179 }
180 }
181 }
182
183
184 public void dispatch()
185 {
186 synchronized(this)
187 {
188 if (_dispatched)
189 {
190 _redispatched=true;
191 }
192 else
193 {
194 _dispatched = true;
195 boolean dispatched = _manager.dispatch(_handler);
196 if(!dispatched)
197 {
198 _dispatched = false;
199 Log.warn("Dispatched Failed! "+this+" to "+_manager);
200 updateKey();
201 }
202 }
203 }
204 }
205
206
207
208
209
210
211
212
213 protected boolean undispatch()
214 {
215 synchronized (this)
216 {
217 if (_redispatched)
218 {
219 _redispatched=false;
220 return false;
221 }
222 _dispatched = false;
223 updateKey();
224 }
225 return true;
226 }
227
228
229 public void scheduleIdle()
230 {
231 _idleTimestamp=System.currentTimeMillis();
232 }
233
234
235 public void cancelIdle()
236 {
237 _idleTimestamp=0;
238 }
239
240
241 public void checkIdleTimestamp(long now)
242 {
243 if (_idleTimestamp!=0 && _maxIdleTime!=0 && now>(_idleTimestamp+_maxIdleTime))
244 idleExpired();
245 }
246
247
248 protected void idleExpired()
249 {
250 _connection.idleExpired();
251 }
252
253
254
255
256 @Override
257 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
258 {
259 int l = super.flush(header, buffer, trailer);
260
261
262 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
263 {
264 synchronized (this)
265 {
266 _writable=false;
267 if (!_dispatched)
268 updateKey();
269 }
270 }
271 else
272 _writable=true;
273 return l;
274 }
275
276
277
278
279 @Override
280 public int flush(Buffer buffer) throws IOException
281 {
282 int l = super.flush(buffer);
283
284
285 if (l==0 && buffer!=null && buffer.hasContent())
286 {
287 synchronized (this)
288 {
289 _writable=false;
290 if (!_dispatched)
291 updateKey();
292 }
293 }
294 else
295 _writable=true;
296
297 return l;
298 }
299
300
301 public boolean isReadyForDispatch()
302 {
303 synchronized (this)
304 {
305
306 return !(_dispatched || getConnection().isSuspended());
307 }
308 }
309
310
311
312
313
314 @Override
315 public boolean blockReadable(long timeoutMs) throws IOException
316 {
317 synchronized (this)
318 {
319 long now=_selectSet.getNow();
320 long end=now+timeoutMs;
321 try
322 {
323 _readBlocked=true;
324 while (isOpen() && _readBlocked)
325 {
326 try
327 {
328 updateKey();
329 this.wait(timeoutMs>=0?(end-now):10000);
330 }
331 catch (InterruptedException e)
332 {
333 Log.warn(e);
334 }
335 finally
336 {
337 now=_selectSet.getNow();
338 }
339
340 if (_readBlocked && timeoutMs>0 && now>=end)
341 return false;
342 }
343 }
344 finally
345 {
346 _readBlocked=false;
347 }
348 }
349 return true;
350 }
351
352
353
354
355
356 @Override
357 public boolean blockWritable(long timeoutMs) throws IOException
358 {
359 synchronized (this)
360 {
361 if (!isOpen() || isOutputShutdown())
362 throw new EofException();
363
364 long now=_selectSet.getNow();
365 long end=now+timeoutMs;
366 try
367 {
368 _writeBlocked=true;
369 while (isOpen() && _writeBlocked && !isOutputShutdown())
370 {
371 try
372 {
373 updateKey();
374 this.wait(timeoutMs>=0?(end-now):10000);
375 }
376 catch (InterruptedException e)
377 {
378 Log.warn(e);
379 }
380 finally
381 {
382 now=_selectSet.getNow();
383 }
384 if (_writeBlocked && timeoutMs>0 && now>=end)
385 return false;
386 }
387 }
388 catch(Throwable e)
389 {
390
391 Log.warn(e);
392 if (e instanceof RuntimeException)
393 throw (RuntimeException)e;
394 if (e instanceof Error)
395 throw (Error)e;
396 throw new RuntimeException(e);
397 }
398 finally
399 {
400 _writeBlocked=false;
401 if (_idleTimestamp!=-1)
402 scheduleIdle();
403 }
404 }
405 return true;
406 }
407
408
409 public void scheduleWrite()
410 {
411 _writable=false;
412 updateKey();
413 }
414
415
416
417
418
419
420
421 private void updateKey()
422 {
423 synchronized (this)
424 {
425 int ops=-1;
426 if (getChannel().isOpen())
427 {
428 _interestOps =
429 ((!_socket.isInputShutdown() && (!_dispatched || _readBlocked)) ? SelectionKey.OP_READ : 0)
430 | ((!_socket.isOutputShutdown()&& (!_writable || _writeBlocked)) ? SelectionKey.OP_WRITE : 0);
431 try
432 {
433 ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
434 }
435 catch(Exception e)
436 {
437 _key=null;
438 Log.ignore(e);
439 }
440 }
441
442 if(_interestOps == ops && getChannel().isOpen())
443 return;
444 }
445 _selectSet.addChange(this);
446 _selectSet.wakeup();
447 }
448
449
450
451
452
453 void doUpdateKey()
454 {
455 synchronized (this)
456 {
457 if (getChannel().isOpen())
458 {
459 if (_interestOps>0)
460 {
461 if (_key==null || !_key.isValid())
462 {
463 SelectableChannel sc = (SelectableChannel)getChannel();
464 if (sc.isRegistered())
465 {
466 updateKey();
467 }
468 else
469 {
470 try
471 {
472 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
473 }
474 catch (Exception e)
475 {
476 Log.ignore(e);
477 if (_key!=null && _key.isValid())
478 {
479 _key.cancel();
480 }
481 cancelIdle();
482
483 if (_open)
484 {
485 _selectSet.destroyEndPoint(this);
486 }
487 _open=false;
488 _key = null;
489 }
490 }
491 }
492 else
493 {
494 _key.interestOps(_interestOps);
495 }
496 }
497 else
498 {
499 if (_key!=null && _key.isValid())
500 _key.interestOps(0);
501 else
502 _key=null;
503 }
504 }
505 else
506 {
507 if (_key!=null && _key.isValid())
508 _key.cancel();
509
510 cancelIdle();
511 if (_open)
512 {
513 _selectSet.destroyEndPoint(this);
514 }
515 _open=false;
516 _key = null;
517 }
518 }
519 }
520
521
522
523
524 protected void handle()
525 {
526 boolean dispatched=true;
527 try
528 {
529 while(dispatched)
530 {
531 try
532 {
533 while(true)
534 {
535 final Connection next = _connection.handle();
536 if (next!=_connection)
537 {
538 Log.debug("{} replaced {}",next,_connection);
539 _connection=next;
540 continue;
541 }
542 break;
543 }
544 }
545 catch (ClosedChannelException e)
546 {
547 Log.ignore(e);
548 }
549 catch (EofException e)
550 {
551 Log.debug("EOF", e);
552 try{close();}
553 catch(IOException e2){Log.ignore(e2);}
554 }
555 catch (IOException e)
556 {
557 Log.warn(e.toString());
558 Log.debug(e);
559 try{close();}
560 catch(IOException e2){Log.ignore(e2);}
561 }
562 catch (Throwable e)
563 {
564 Log.warn("handle failed", e);
565 try{close();}
566 catch(IOException e2){Log.ignore(e2);}
567 }
568 dispatched=!undispatch();
569 }
570 }
571 finally
572 {
573 if (dispatched)
574 {
575 dispatched=!undispatch();
576 while (dispatched)
577 {
578 Log.warn("SCEP.run() finally DISPATCHED");
579 dispatched=!undispatch();
580 }
581 }
582 }
583 }
584
585
586
587
588
589 @Override
590 public void close() throws IOException
591 {
592 try
593 {
594 super.close();
595 }
596 catch (IOException e)
597 {
598 Log.ignore(e);
599 }
600 finally
601 {
602 updateKey();
603 }
604 }
605
606
607 @Override
608 public String toString()
609 {
610 synchronized(this)
611 {
612 return "SCEP@" + hashCode() + _channel+
613 "[d=" + _dispatched + ",io=" + _interestOps+
614 ",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
615 }
616 }
617
618
619 public SelectSet getSelectSet()
620 {
621 return _selectSet;
622 }
623
624
625
626
627
628
629 @Override
630 public void setMaxIdleTime(int timeMs) throws IOException
631 {
632 _maxIdleTime=timeMs;
633 }
634
635 }