1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21
22 import org.eclipse.jetty.io.AsyncEndPoint;
23 import org.eclipse.jetty.io.Buffer;
24 import org.eclipse.jetty.io.ConnectedEndPoint;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28 import org.eclipse.jetty.util.log.Log;
29
30
31
32
33
34 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
35 {
36 private final SelectorManager.SelectSet _selectSet;
37 private final SelectorManager _manager;
38 private final Runnable _handler = new Runnable()
39 {
40 public void run() { handle(); }
41 };
42
43 private volatile Connection _connection;
44 private boolean _dispatched = false;
45 private boolean _redispatched = false;
46 private volatile boolean _writable = true;
47
48 private SelectionKey _key;
49 private int _interestOps;
50 private boolean _readBlocked;
51 private boolean _writeBlocked;
52 private boolean _open;
53 private volatile long _idleTimestamp;
54
55
56 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
57 throws IOException
58 {
59 super(channel, maxIdleTime);
60
61 _manager = selectSet.getManager();
62 _selectSet = selectSet;
63 _dispatched = false;
64 _redispatched = false;
65 _open=true;
66 _key = key;
67
68 _connection = _manager.newConnection(channel,this);
69
70 scheduleIdle();
71 }
72
73
74 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
75 throws IOException
76 {
77 super(channel);
78
79 _manager = selectSet.getManager();
80 _selectSet = selectSet;
81 _dispatched = false;
82 _redispatched = false;
83 _open=true;
84 _key = key;
85
86 _connection = _manager.newConnection(channel,this);
87
88 scheduleIdle();
89 }
90
91 public SelectionKey getSelectionKey()
92 {
93 synchronized (this)
94 {
95 return _key;
96 }
97 }
98
99
100 public SelectorManager getSelectManager()
101 {
102 return _manager;
103 }
104
105
106 public Connection getConnection()
107 {
108 return _connection;
109 }
110
111
112 public void setConnection(Connection connection)
113 {
114 Connection old=_connection;
115 _connection=connection;
116 _manager.endPointUpgraded(this,old);
117 }
118
119
120
121
122
123 public void schedule()
124 {
125 synchronized (this)
126 {
127
128 if (_key == null || !_key.isValid())
129 {
130 _readBlocked=false;
131 _writeBlocked=false;
132 this.notifyAll();
133 return;
134 }
135
136
137 if (_readBlocked || _writeBlocked)
138 {
139
140 if (_readBlocked && _key.isReadable())
141 _readBlocked=false;
142 if (_writeBlocked && _key.isWritable())
143 _writeBlocked=false;
144
145
146 this.notifyAll();
147
148
149 if (_dispatched)
150 _key.interestOps(0);
151 return;
152 }
153
154
155 if (!isReadyForDispatch())
156 {
157
158 _key.interestOps(0);
159 return;
160 }
161
162
163 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
164 {
165
166 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
167 _key.interestOps(_interestOps);
168 _writable = true;
169 }
170
171
172 if (!_dispatched)
173 {
174 dispatch();
175 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
176 {
177 _key.interestOps(0);
178 }
179 }
180 }
181 }
182
183
184 public void dispatch()
185 {
186 synchronized(this)
187 {
188 if (_dispatched)
189 {
190 _redispatched=true;
191 }
192 else
193 {
194 _dispatched = true;
195 boolean dispatched = _manager.dispatch(_handler);
196 if(!dispatched)
197 {
198 _dispatched = false;
199 Log.warn("Dispatched Failed! "+this+" to "+_manager);
200 updateKey();
201 }
202 }
203 }
204 }
205
206
207
208
209
210
211
212
213 protected boolean undispatch()
214 {
215 synchronized (this)
216 {
217 if (_redispatched)
218 {
219 _redispatched=false;
220 return false;
221 }
222 _dispatched = false;
223 updateKey();
224 }
225 return true;
226 }
227
228
229 public void scheduleIdle()
230 {
231 _idleTimestamp=System.currentTimeMillis();
232 }
233
234
235 public void cancelIdle()
236 {
237 _idleTimestamp=0;
238 }
239
240
241 public void checkIdleTimestamp(long now)
242 {
243 if (_idleTimestamp!=0 && _maxIdleTime!=0 && now>(_idleTimestamp+_maxIdleTime))
244 idleExpired();
245 }
246
247
248 protected void idleExpired()
249 {
250 _connection.idleExpired();
251 }
252
253
254
255
256 @Override
257 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
258 {
259 int l = super.flush(header, buffer, trailer);
260
261
262 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
263 {
264 synchronized (this)
265 {
266 _writable=false;
267 if (!_dispatched)
268 updateKey();
269 }
270 }
271 else
272 _writable=true;
273 return l;
274 }
275
276
277
278
279 @Override
280 public int flush(Buffer buffer) throws IOException
281 {
282 int l = super.flush(buffer);
283
284
285 if (l==0 && buffer!=null && buffer.hasContent())
286 {
287 synchronized (this)
288 {
289 _writable=false;
290 if (!_dispatched)
291 updateKey();
292 }
293 }
294 else
295 _writable=true;
296
297 return l;
298 }
299
300
301 public boolean isReadyForDispatch()
302 {
303 synchronized (this)
304 {
305
306 return !(_dispatched || getConnection().isSuspended());
307 }
308 }
309
310
311
312
313
314 @Override
315 public boolean blockReadable(long timeoutMs) throws IOException
316 {
317 synchronized (this)
318 {
319 long start=_selectSet.getNow();
320 try
321 {
322 _readBlocked=true;
323 while (isOpen() && _readBlocked)
324 {
325 try
326 {
327 updateKey();
328 this.wait(timeoutMs);
329 }
330 catch (InterruptedException e)
331 {
332 Log.warn(e);
333 }
334
335 timeoutMs -= _selectSet.getNow()-start;
336 if (_readBlocked && timeoutMs<=0)
337 return false;
338 }
339 }
340 finally
341 {
342 _readBlocked=false;
343 }
344 }
345 return true;
346 }
347
348
349
350
351
352 @Override
353 public boolean blockWritable(long timeoutMs) throws IOException
354 {
355 synchronized (this)
356 {
357 long start=_selectSet.getNow();
358 try
359 {
360 _writeBlocked=true;
361 while (isOpen() && _writeBlocked)
362 {
363 try
364 {
365 updateKey();
366 this.wait(timeoutMs);
367 }
368 catch (InterruptedException e)
369 {
370 Log.warn(e);
371 }
372
373 timeoutMs -= _selectSet.getNow()-start;
374 if (_writeBlocked && timeoutMs<=0)
375 return false;
376 }
377 }
378 finally
379 {
380 _writeBlocked=false;
381 if (_idleTimestamp!=-1)
382 scheduleIdle();
383 }
384 }
385 return true;
386 }
387
388
389 public void scheduleWrite()
390 {
391 _writable=false;
392 updateKey();
393 }
394
395
396
397
398
399
400
401 private void updateKey()
402 {
403 synchronized (this)
404 {
405 int ops=-1;
406 if (getChannel().isOpen())
407 {
408 _interestOps =
409 ((!_socket.isInputShutdown() && (!_dispatched || _readBlocked)) ? SelectionKey.OP_READ : 0)
410 | ((!_socket.isOutputShutdown()&& (!_writable || _writeBlocked)) ? SelectionKey.OP_WRITE : 0);
411 try
412 {
413 ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
414 }
415 catch(Exception e)
416 {
417 _key=null;
418 Log.ignore(e);
419 }
420 }
421
422 if(_interestOps == ops && getChannel().isOpen())
423 return;
424 }
425 _selectSet.addChange(this);
426 _selectSet.wakeup();
427 }
428
429
430
431
432
433 void doUpdateKey()
434 {
435 synchronized (this)
436 {
437 if (getChannel().isOpen())
438 {
439 if (_interestOps>0)
440 {
441 if (_key==null || !_key.isValid())
442 {
443 SelectableChannel sc = (SelectableChannel)getChannel();
444 if (sc.isRegistered())
445 {
446 updateKey();
447 }
448 else
449 {
450 try
451 {
452 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
453 }
454 catch (Exception e)
455 {
456 Log.ignore(e);
457 if (_key!=null && _key.isValid())
458 {
459 _key.cancel();
460 }
461 cancelIdle();
462
463 if (_open)
464 {
465 _selectSet.destroyEndPoint(this);
466 }
467 _open=false;
468 _key = null;
469 }
470 }
471 }
472 else
473 {
474 _key.interestOps(_interestOps);
475 }
476 }
477 else
478 {
479 if (_key!=null && _key.isValid())
480 _key.interestOps(0);
481 else
482 _key=null;
483 }
484 }
485 else
486 {
487 if (_key!=null && _key.isValid())
488 _key.cancel();
489
490 cancelIdle();
491 if (_open)
492 {
493 _selectSet.destroyEndPoint(this);
494 }
495 _open=false;
496 _key = null;
497 }
498 }
499 }
500
501
502
503
504 protected void handle()
505 {
506 boolean dispatched=true;
507 try
508 {
509 while(dispatched)
510 {
511 try
512 {
513 while(true)
514 {
515 final Connection next = _connection.handle();
516 if (next!=_connection)
517 {
518 Log.debug("{} replaced {}",next,_connection);
519 _connection=next;
520 continue;
521 }
522 break;
523 }
524 }
525 catch (ClosedChannelException e)
526 {
527 Log.ignore(e);
528 }
529 catch (EofException e)
530 {
531 Log.debug("EOF", e);
532 try{close();}
533 catch(IOException e2){Log.ignore(e2);}
534 }
535 catch (IOException e)
536 {
537 Log.warn(e.toString());
538 Log.debug(e);
539 try{close();}
540 catch(IOException e2){Log.ignore(e2);}
541 }
542 catch (Throwable e)
543 {
544 Log.warn("handle failed", e);
545 try{close();}
546 catch(IOException e2){Log.ignore(e2);}
547 }
548 dispatched=!undispatch();
549 }
550 }
551 finally
552 {
553 if (dispatched)
554 {
555 dispatched=!undispatch();
556 while (dispatched)
557 {
558 Log.warn("SCEP.run() finally DISPATCHED");
559 dispatched=!undispatch();
560 }
561 }
562 }
563 }
564
565
566
567
568
569 @Override
570 public void close() throws IOException
571 {
572 try
573 {
574 super.close();
575 }
576 catch (IOException e)
577 {
578 Log.ignore(e);
579 }
580 finally
581 {
582 updateKey();
583 }
584 }
585
586
587 @Override
588 public String toString()
589 {
590 synchronized(this)
591 {
592 return "SCEP@" + hashCode() + _channel+
593 "[d=" + _dispatched + ",io=" + _interestOps+
594 ",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
595 }
596 }
597
598
599 public SelectSet getSelectSet()
600 {
601 return _selectSet;
602 }
603
604
605
606
607
608
609 @Override
610 public void setMaxIdleTime(int timeMs) throws IOException
611 {
612 _maxIdleTime=timeMs;
613 }
614
615 }