1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21 import java.util.concurrent.atomic.AtomicInteger;
22
23 import org.eclipse.jetty.io.AsyncEndPoint;
24 import org.eclipse.jetty.io.Buffer;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.thread.Timeout;
30
31
32
33
34
35
36
37
38 public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint
39 {
40 private final SelectorManager.SelectSet _selectSet;
41 private final Connection _connection;
42 private final SelectorManager _manager;
43 private boolean _dispatched = false;
44 private boolean _redispatched = false;
45 private volatile boolean _writable = true;
46
47 private SelectionKey _key;
48 private int _interestOps;
49 private boolean _readBlocked;
50 private boolean _writeBlocked;
51 private boolean _open;
52 private final Timeout.Task _idleTask = new IdleTask();
53
54
55 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
56 {
57 super(channel);
58
59 _manager = selectSet.getManager();
60 _selectSet = selectSet;
61 _connection = _manager.newConnection(channel,this);
62 _dispatched = false;
63 _redispatched = false;
64 _open=true;
65 _manager.endPointOpened(this);
66
67 _key = key;
68 scheduleIdle();
69 }
70
71
72 public Connection getConnection()
73 {
74 return _connection;
75 }
76
77
78 public SelectorManager getSelectManager()
79 {
80 return _manager;
81 }
82
83
84
85
86
87 public void schedule()
88 {
89 synchronized (this)
90 {
91
92 if (_key == null || !_key.isValid())
93 {
94 _readBlocked=false;
95 _writeBlocked=false;
96 this.notifyAll();
97 return;
98 }
99
100
101 if (_readBlocked || _writeBlocked)
102 {
103
104 if (_readBlocked && _key.isReadable())
105 _readBlocked=false;
106 if (_writeBlocked && _key.isWritable())
107 _writeBlocked=false;
108
109
110 this.notifyAll();
111
112
113 _key.interestOps(0);
114 return;
115 }
116
117
118 if (!isReadyForDispatch())
119 {
120
121 _key.interestOps(0);
122 return;
123 }
124
125
126
127 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
128 {
129
130 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
131 _key.interestOps(_interestOps);
132 _writable = true;
133 }
134
135 if (_dispatched)
136 _key.interestOps(0);
137 else
138 dispatch();
139 }
140 }
141
142
143 public void dispatch()
144 {
145 synchronized(this)
146 {
147 if (_dispatched)
148 _redispatched=true;
149 else
150 {
151 _dispatched = _manager.dispatch(this);
152 if(!_dispatched)
153 {
154 Log.warn("Dispatched Failed!");
155 updateKey();
156 }
157 }
158 }
159 }
160
161
162
163
164
165
166
167
168 private boolean undispatch()
169 {
170 synchronized (this)
171 {
172 if (_redispatched)
173 {
174 _redispatched=false;
175 return false;
176 }
177 _dispatched = false;
178 updateKey();
179 }
180 return true;
181 }
182
183
184 public void scheduleIdle()
185 {
186 _selectSet.scheduleIdle(_idleTask);
187 }
188
189
190 public void cancelIdle()
191 {
192 _selectSet.cancelIdle(_idleTask);
193 }
194
195
196 protected void idleExpired()
197 {
198 try
199 {
200 close();
201 }
202 catch (IOException e)
203 {
204 Log.ignore(e);
205 }
206 }
207
208
209
210
211 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
212 {
213 int l = super.flush(header, buffer, trailer);
214 _writable = l!=0;
215 return l;
216 }
217
218
219
220
221 public int flush(Buffer buffer) throws IOException
222 {
223 int l = super.flush(buffer);
224 _writable = l!=0;
225 return l;
226 }
227
228
229 public boolean isReadyForDispatch()
230 {
231 synchronized (this)
232 {
233 return !(_dispatched || getConnection().isSuspended());
234 }
235 }
236
237
238
239
240
241 public boolean blockReadable(long timeoutMs) throws IOException
242 {
243 synchronized (this)
244 {
245 long start=_selectSet.getNow();
246 try
247 {
248 _readBlocked=true;
249 while (isOpen() && _readBlocked)
250 {
251 try
252 {
253 updateKey();
254 this.wait(timeoutMs);
255
256 if (_readBlocked && timeoutMs<(_selectSet.getNow()-start))
257 return false;
258 }
259 catch (InterruptedException e)
260 {
261 Log.warn(e);
262 }
263 }
264 }
265 finally
266 {
267 _readBlocked=false;
268 }
269 }
270 return true;
271 }
272
273
274
275
276
277 public boolean blockWritable(long timeoutMs) throws IOException
278 {
279 synchronized (this)
280 {
281 long start=_selectSet.getNow();
282 try
283 {
284 _writeBlocked=true;
285 while (isOpen() && _writeBlocked)
286 {
287 try
288 {
289 updateKey();
290 this.wait(timeoutMs);
291
292 if (_writeBlocked && timeoutMs<(_selectSet.getNow()-start))
293 return false;
294 }
295 catch (InterruptedException e)
296 {
297 Log.warn(e);
298 }
299 }
300 }
301 finally
302 {
303 _writeBlocked=false;
304 }
305 }
306 return true;
307 }
308
309
310 public void setWritable(boolean writable)
311 {
312 _writable=writable;
313 }
314
315
316 public void scheduleWrite()
317 {
318 _writable=false;
319 updateKey();
320 }
321
322
323
324
325
326
327
328 private void updateKey()
329 {
330 synchronized (this)
331 {
332 int ops=-1;
333 if (getChannel().isOpen())
334 {
335 _interestOps =
336 ((!_dispatched || _readBlocked) ? SelectionKey.OP_READ : 0)
337 | ((!_writable || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
338 try
339 {
340 ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
341 }
342 catch(Exception e)
343 {
344 _key=null;
345 Log.ignore(e);
346 }
347 }
348
349 if(_interestOps == ops && getChannel().isOpen())
350 return;
351
352 }
353 _selectSet.addChange(this);
354 _selectSet.wakeup();
355 }
356
357
358
359
360
361 void doUpdateKey()
362 {
363 synchronized (this)
364 {
365 if (getChannel().isOpen())
366 {
367 if (_interestOps>0)
368 {
369 if (_key==null || !_key.isValid())
370 {
371 SelectableChannel sc = (SelectableChannel)getChannel();
372 if (sc.isRegistered())
373 {
374 updateKey();
375 }
376 else
377 {
378 try
379 {
380 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
381 }
382 catch (Exception e)
383 {
384 Log.ignore(e);
385 if (_key!=null && _key.isValid())
386 {
387 _key.cancel();
388 }
389 cancelIdle();
390
391 if (_open)
392 _manager.endPointClosed(this);
393 _open=false;
394 _key = null;
395 }
396 }
397 }
398 else
399 {
400 _key.interestOps(_interestOps);
401 }
402 }
403 else
404 {
405 if (_key.isValid())
406 _key.interestOps(0);
407 else
408 _key=null;
409 }
410 }
411 else
412 {
413 if (_key!=null && _key.isValid())
414 _key.cancel();
415
416 cancelIdle();
417 if (_open)
418 _manager.endPointClosed(this);
419 _open=false;
420 _key = null;
421 }
422 }
423 }
424
425
426
427
428 public void run()
429 {
430 boolean dispatched=true;
431 try
432 {
433 while(dispatched)
434 {
435 try
436 {
437 _connection.handle();
438 }
439 catch (ClosedChannelException e)
440 {
441 Log.ignore(e);
442 }
443 catch (EofException e)
444 {
445 Log.debug("EOF", e);
446 try{close();}
447 catch(IOException e2){Log.ignore(e2);}
448 }
449 catch (IOException e)
450 {
451 Log.warn(e.toString());
452 Log.debug(e);
453 try{close();}
454 catch(IOException e2){Log.ignore(e2);}
455 }
456 catch (Throwable e)
457 {
458 Log.warn("handle failed", e);
459 try{close();}
460 catch(IOException e2){Log.ignore(e2);}
461 }
462 dispatched=!undispatch();
463 }
464 }
465 finally
466 {
467 if (dispatched)
468 {
469 dispatched=!undispatch();
470 while (dispatched)
471 {
472 Log.warn("SCEP.run() finally DISPATCHED");
473 dispatched=!undispatch();
474 }
475 }
476 }
477 }
478
479
480
481
482
483 public void close() throws IOException
484 {
485 try
486 {
487 super.close();
488 }
489 catch (IOException e)
490 {
491 Log.ignore(e);
492 }
493 finally
494 {
495 updateKey();
496 }
497 }
498
499
500 public String toString()
501 {
502 synchronized(this)
503 {
504 return "SCEP@" + hashCode() + "\t[d=" + _dispatched + ",io=" + _interestOps+
505 ",w=" + _writable + ",b=" + _readBlocked + "|" + _writeBlocked + "]";
506 }
507 }
508
509
510 public Timeout.Task getTimeoutTask()
511 {
512 return _idleTask;
513 }
514
515
516 public SelectSet getSelectSet()
517 {
518 return _selectSet;
519 }
520
521
522
523
524 private class IdleTask extends Timeout.Task
525 {
526
527
528
529
530 public void expired()
531 {
532 idleExpired();
533 }
534
535 public String toString()
536 {
537 return "TimeoutTask:" + SelectChannelEndPoint.this.toString();
538 }
539
540 }
541
542 }