1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21
22 import org.eclipse.jetty.io.AsyncEndPoint;
23 import org.eclipse.jetty.io.Buffer;
24 import org.eclipse.jetty.io.ConnectedEndPoint;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.thread.Timeout;
30
31
32
33
34
35 public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint, ConnectedEndPoint
36 {
37 private final SelectorManager.SelectSet _selectSet;
38 private final SelectorManager _manager;
39 private volatile Connection _connection;
40 private boolean _dispatched = false;
41 private boolean _redispatched = false;
42 private volatile boolean _writable = true;
43
44 private SelectionKey _key;
45 private int _interestOps;
46 private boolean _readBlocked;
47 private boolean _writeBlocked;
48 private boolean _open;
49 private volatile long _idleTimestamp;
50
51
52 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
53 throws IOException
54 {
55 super(channel);
56
57 _manager = selectSet.getManager();
58 _selectSet = selectSet;
59 _dispatched = false;
60 _redispatched = false;
61 _open=true;
62 _key = key;
63
64 _connection = _manager.newConnection(channel,this);
65
66 scheduleIdle();
67 }
68
69
70 public SelectionKey getSelectionKey()
71 {
72 synchronized (this)
73 {
74 return _key;
75 }
76 }
77
78
79 public SelectorManager getSelectManager()
80 {
81 return _manager;
82 }
83
84
85 public Connection getConnection()
86 {
87 return _connection;
88 }
89
90
91 public void setConnection(Connection connection)
92 {
93 Connection old=_connection;
94 _connection=connection;
95 _manager.endPointUpgraded(this,old);
96 }
97
98
99
100
101
102 public void schedule()
103 {
104 synchronized (this)
105 {
106
107 if (_key == null || !_key.isValid())
108 {
109 _readBlocked=false;
110 _writeBlocked=false;
111 this.notifyAll();
112 return;
113 }
114
115
116 if (_readBlocked || _writeBlocked)
117 {
118
119 if (_readBlocked && _key.isReadable())
120 _readBlocked=false;
121 if (_writeBlocked && _key.isWritable())
122 _writeBlocked=false;
123
124
125 this.notifyAll();
126
127
128 _key.interestOps(0);
129 return;
130 }
131
132
133 if (!isReadyForDispatch())
134 {
135
136 _key.interestOps(0);
137 return;
138 }
139
140
141
142 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
143 {
144
145 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
146 _key.interestOps(_interestOps);
147 _writable = true;
148 }
149
150 if (_dispatched)
151 _key.interestOps(0);
152 else
153 dispatch();
154 }
155 }
156
157
158 public void dispatch()
159 {
160 synchronized(this)
161 {
162 if (_dispatched)
163 _redispatched=true;
164 else
165 {
166 _dispatched = _manager.dispatch(this);
167 if(!_dispatched)
168 {
169 Log.warn("Dispatched Failed!");
170 updateKey();
171 }
172 }
173 }
174 }
175
176
177
178
179
180
181
182
183 private boolean undispatch()
184 {
185 synchronized (this)
186 {
187 if (_redispatched)
188 {
189 _redispatched=false;
190 return false;
191 }
192 _dispatched = false;
193 updateKey();
194 }
195 return true;
196 }
197
198
199 public void scheduleIdle()
200 {
201 _idleTimestamp=System.currentTimeMillis();
202 }
203
204
205 public void cancelIdle()
206 {
207 _idleTimestamp=0;
208 }
209
210
211 public void checkIdleTimestamp(long now)
212 {
213 if (_idleTimestamp!=0 && _maxIdleTime!=0 && now>(_idleTimestamp+_maxIdleTime))
214 {
215 System.err.println("EXPIRED "+now+">("+_idleTimestamp+"+"+_maxIdleTime+")");
216 idleExpired();
217 }
218 }
219
220
221 protected void idleExpired()
222 {
223 try
224 {
225 close();
226 }
227 catch (IOException e)
228 {
229 Log.ignore(e);
230 }
231 }
232
233
234
235
236 @Override
237 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
238 {
239 int l = super.flush(header, buffer, trailer);
240 if (!(_writable=l!=0))
241 {
242 synchronized (this)
243 {
244 if (!_dispatched)
245 updateKey();
246 }
247 }
248 return l;
249 }
250
251
252
253
254 @Override
255 public int flush(Buffer buffer) throws IOException
256 {
257 int l = super.flush(buffer);
258 if (!(_writable=l!=0))
259 {
260 synchronized (this)
261 {
262 if (!_dispatched)
263 updateKey();
264 }
265 }
266 return l;
267 }
268
269
270 public boolean isReadyForDispatch()
271 {
272 synchronized (this)
273 {
274 return !(_dispatched || getConnection().isSuspended());
275 }
276 }
277
278
279
280
281
282 @Override
283 public boolean blockReadable(long timeoutMs) throws IOException
284 {
285 synchronized (this)
286 {
287 long start=_selectSet.getNow();
288 try
289 {
290 _readBlocked=true;
291 while (isOpen() && _readBlocked)
292 {
293 try
294 {
295 updateKey();
296 this.wait(timeoutMs);
297
298 timeoutMs -= _selectSet.getNow()-start;
299 if (_readBlocked && timeoutMs<=0)
300 return false;
301 }
302 catch (InterruptedException e)
303 {
304 Log.warn(e);
305 }
306 }
307 }
308 finally
309 {
310 _readBlocked=false;
311 }
312 }
313 return true;
314 }
315
316
317
318
319
320 @Override
321 public boolean blockWritable(long timeoutMs) throws IOException
322 {
323 synchronized (this)
324 {
325 long start=_selectSet.getNow();
326 try
327 {
328 _writeBlocked=true;
329 while (isOpen() && _writeBlocked)
330 {
331 try
332 {
333 updateKey();
334 this.wait(timeoutMs);
335
336 timeoutMs -= _selectSet.getNow()-start;
337 if (_writeBlocked && timeoutMs<=0)
338 return false;
339 }
340 catch (InterruptedException e)
341 {
342 Log.warn(e);
343 }
344 }
345 }
346 finally
347 {
348 _writeBlocked=false;
349 if (_idleTimestamp!=-1)
350 scheduleIdle();
351 }
352 }
353 return true;
354 }
355
356
357 public void setWritable(boolean writable)
358 {
359 _writable=writable;
360 }
361
362
363 public void scheduleWrite()
364 {
365 _writable=false;
366 updateKey();
367 }
368
369
370
371
372
373
374
375 private void updateKey()
376 {
377 synchronized (this)
378 {
379 int ops=-1;
380 if (getChannel().isOpen())
381 {
382 _interestOps =
383 ((!_dispatched || _readBlocked) ? SelectionKey.OP_READ : 0)
384 | ((!_writable || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
385 try
386 {
387 ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
388 }
389 catch(Exception e)
390 {
391 _key=null;
392 Log.ignore(e);
393 }
394 }
395
396 if(_interestOps == ops && getChannel().isOpen())
397 return;
398
399 }
400 _selectSet.addChange(this);
401 _selectSet.wakeup();
402 }
403
404
405
406
407
408 void doUpdateKey()
409 {
410 synchronized (this)
411 {
412 if (getChannel().isOpen())
413 {
414 if (_interestOps>0)
415 {
416 if (_key==null || !_key.isValid())
417 {
418 SelectableChannel sc = (SelectableChannel)getChannel();
419 if (sc.isRegistered())
420 {
421 updateKey();
422 }
423 else
424 {
425 try
426 {
427 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
428 }
429 catch (Exception e)
430 {
431 Log.ignore(e);
432 if (_key!=null && _key.isValid())
433 {
434 _key.cancel();
435 }
436 cancelIdle();
437
438 if (_open)
439 {
440 _selectSet.destroyEndPoint(this);
441 }
442 _open=false;
443 _key = null;
444 }
445 }
446 }
447 else
448 {
449 _key.interestOps(_interestOps);
450 }
451 }
452 else
453 {
454 if (_key.isValid())
455 _key.interestOps(0);
456 else
457 _key=null;
458 }
459 }
460 else
461 {
462 if (_key!=null && _key.isValid())
463 _key.cancel();
464
465 cancelIdle();
466 if (_open)
467 {
468 _selectSet.destroyEndPoint(this);
469 }
470 _open=false;
471 _key = null;
472 }
473 }
474 }
475
476
477
478
479 public void run()
480 {
481 boolean dispatched=true;
482 try
483 {
484 while(dispatched)
485 {
486 try
487 {
488 while(true)
489 {
490 final Connection next = _connection.handle();
491 if (next!=_connection)
492 {
493 _connection=next;
494 continue;
495 }
496 break;
497 }
498 }
499 catch (ClosedChannelException e)
500 {
501 Log.ignore(e);
502 }
503 catch (EofException e)
504 {
505 Log.debug("EOF", e);
506 try{close();}
507 catch(IOException e2){Log.ignore(e2);}
508 }
509 catch (IOException e)
510 {
511 Log.warn(e.toString());
512 Log.debug(e);
513 try{close();}
514 catch(IOException e2){Log.ignore(e2);}
515 }
516 catch (Throwable e)
517 {
518 Log.warn("handle failed", e);
519 try{close();}
520 catch(IOException e2){Log.ignore(e2);}
521 }
522 dispatched=!undispatch();
523 }
524 }
525 finally
526 {
527 if (dispatched)
528 {
529 dispatched=!undispatch();
530 while (dispatched)
531 {
532 Log.warn("SCEP.run() finally DISPATCHED");
533 dispatched=!undispatch();
534 }
535 }
536 }
537 }
538
539
540
541
542
543 @Override
544 public void close() throws IOException
545 {
546 try
547 {
548 super.close();
549 }
550 catch (IOException e)
551 {
552 Log.ignore(e);
553 }
554 finally
555 {
556 updateKey();
557 }
558 }
559
560
561 @Override
562 public String toString()
563 {
564 synchronized(this)
565 {
566 return "SCEP@" + hashCode() + "\t[d=" + _dispatched + ",io=" + _interestOps+
567 ",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
568 }
569 }
570
571
572 public SelectSet getSelectSet()
573 {
574 return _selectSet;
575 }
576
577
578
579
580
581
582 @Override
583 public void setMaxIdleTime(int timeMs) throws IOException
584 {
585 _maxIdleTime=timeMs;
586 }
587
588
589
590 }