View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.ClosedChannelException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  
22  import org.eclipse.jetty.io.AsyncEndPoint;
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.ConnectedEndPoint;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.thread.Timeout;
30  
31  /* ------------------------------------------------------------ */
32  /**
33   * An Endpoint that can be scheduled by {@link SelectorManager}.
34   */
35  public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint, ConnectedEndPoint
36  {
37      private final SelectorManager.SelectSet _selectSet;
38      private final SelectorManager _manager;
39      private volatile Connection _connection;
40      private boolean _dispatched = false;
41      private boolean _redispatched = false;
42      private volatile boolean _writable = true; 
43      
44      private  SelectionKey _key;
45      private int _interestOps;
46      private boolean _readBlocked;
47      private boolean _writeBlocked;
48      private boolean _open;
49      private volatile long _idleTimestamp;
50  
51      /* ------------------------------------------------------------ */
52      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
53          throws IOException
54      {
55          super(channel);
56  
57          _manager = selectSet.getManager();
58          _selectSet = selectSet;
59          _dispatched = false;
60          _redispatched = false;
61          _open=true;       
62          _key = key;
63  
64          _connection = _manager.newConnection(channel,this);
65          
66          scheduleIdle();
67      }
68  
69      /* ------------------------------------------------------------ */
70      public SelectionKey getSelectionKey()
71      {
72          synchronized (this)
73          {
74              return _key;
75          }
76      }
77      
78      /* ------------------------------------------------------------ */
79      public SelectorManager getSelectManager()
80      {
81          return _manager;
82      }
83      
84      /* ------------------------------------------------------------ */
85      public Connection getConnection()
86      {
87          return _connection;
88      }
89      
90      /* ------------------------------------------------------------ */
91      public void setConnection(Connection connection)
92      {
93          Connection old=_connection;
94          _connection=connection;
95          _manager.endPointUpgraded(this,old);
96      }
97  
98      /* ------------------------------------------------------------ */
99      /** Called by selectSet to schedule handling
100      * 
101      */
102     public void schedule() 
103     {
104         synchronized (this)
105         {
106             // If there is no key, then do nothing
107             if (_key == null || !_key.isValid())
108             {
109                 _readBlocked=false;
110                 _writeBlocked=false;
111                 this.notifyAll();
112                 return;
113             }
114             
115             // If there are threads dispatched reading and writing
116             if (_readBlocked || _writeBlocked)
117             {
118                 // assert _dispatched;
119                 if (_readBlocked && _key.isReadable())
120                     _readBlocked=false;
121                 if (_writeBlocked && _key.isWritable())
122                     _writeBlocked=false;
123 
124                 // wake them up is as good as a dispatched.
125                 this.notifyAll();
126                 
127                 // we are not interested in further selecting
128                 _key.interestOps(0);
129                 return;
130             }
131 
132             // Otherwise if we are still dispatched
133             if (!isReadyForDispatch())
134             {
135                 // we are not interested in further selecting
136                 _key.interestOps(0);
137                 return;
138             }
139 
140 
141             // Remove writeable op
142             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
143             {
144                 // Remove writeable op
145                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
146                 _key.interestOps(_interestOps);
147                 _writable = true; // Once writable is in ops, only removed with dispatch.
148             }
149 
150             if (_dispatched)
151                 _key.interestOps(0);
152             else
153                 dispatch();
154         }
155     }
156     
157     /* ------------------------------------------------------------ */
158     public void dispatch() 
159     {
160         synchronized(this)
161         {
162             if (_dispatched)
163                 _redispatched=true;
164             else
165             {
166                 _dispatched = _manager.dispatch(this);
167                 if(!_dispatched)
168                 {
169                     Log.warn("Dispatched Failed!");
170                     updateKey();
171                 }
172             }
173         }
174     }
175 
176     /* ------------------------------------------------------------ */
177     /**
178      * Called when a dispatched thread is no longer handling the endpoint. 
179      * The selection key operations are updated.
180      * @return If false is returned, the endpoint has been redispatched and 
181      * thread must keep handling the endpoint.
182      */
183     private boolean undispatch()
184     {
185         synchronized (this)
186         {
187             if (_redispatched)
188             {
189                 _redispatched=false;
190                 return false;
191             }
192             _dispatched = false;
193             updateKey();
194         }
195         return true;
196     }
197 
198     /* ------------------------------------------------------------ */
199     public void scheduleIdle()
200     {
201         _idleTimestamp=System.currentTimeMillis();
202     }
203 
204     /* ------------------------------------------------------------ */
205     public void cancelIdle()
206     {
207         _idleTimestamp=0;
208     }
209     
210     /* ------------------------------------------------------------ */
211     public void checkIdleTimestamp(long now)
212     {
213         if (_idleTimestamp!=0 && _maxIdleTime!=0 && now>(_idleTimestamp+_maxIdleTime))
214         {
215             idleExpired();
216         }
217     }
218 
219     /* ------------------------------------------------------------ */
220     protected void idleExpired()
221     {
222         try
223         {
224             close();
225         }
226         catch (IOException e)
227         {
228             Log.ignore(e);
229         }
230     }
231 
232     /* ------------------------------------------------------------ */
233     /*
234      */
235     @Override
236     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
237     {
238         int l = super.flush(header, buffer, trailer);
239         if (!(_writable=l!=0))
240         {
241             synchronized (this)
242             {
243                 if (!_dispatched)
244                     updateKey();
245             }
246         }
247         return l;
248     }
249 
250     /* ------------------------------------------------------------ */
251     /*
252      */
253     @Override
254     public int flush(Buffer buffer) throws IOException
255     {
256         int l = super.flush(buffer);
257         if (!(_writable=l!=0))
258         {
259             synchronized (this)
260             {
261                 if (!_dispatched)
262                     updateKey();
263             }
264         }
265         return l;
266     }
267 
268     /* ------------------------------------------------------------ */
269     public boolean isReadyForDispatch()
270     {
271         synchronized (this)
272         {
273             return !(_dispatched || getConnection().isSuspended());
274         }
275     }
276     
277     /* ------------------------------------------------------------ */
278     /*
279      * Allows thread to block waiting for further events.
280      */
281     @Override
282     public boolean blockReadable(long timeoutMs) throws IOException
283     {
284         synchronized (this)
285         {
286             long start=_selectSet.getNow();
287             try
288             {   
289                 _readBlocked=true;
290                 while (isOpen() && _readBlocked)
291                 {
292                     try
293                     {
294                         updateKey();
295                         this.wait(timeoutMs);
296 
297                         timeoutMs -= _selectSet.getNow()-start;
298                         if (_readBlocked && timeoutMs<=0)
299                             return false;
300                     }
301                     catch (InterruptedException e)
302                     {
303                         Log.warn(e);
304                     }
305                 }
306             }
307             finally
308             {
309                 _readBlocked=false;
310             }
311         }
312         return true;
313     }
314 
315     /* ------------------------------------------------------------ */
316     /*
317      * Allows thread to block waiting for further events.
318      */
319     @Override
320     public boolean blockWritable(long timeoutMs) throws IOException
321     {
322         synchronized (this)
323         {
324             long start=_selectSet.getNow();
325             try
326             {   
327                 _writeBlocked=true;
328                 while (isOpen() && _writeBlocked)
329                 {
330                     try
331                     {
332                         updateKey();
333                         this.wait(timeoutMs);
334 
335                         timeoutMs -= _selectSet.getNow()-start;
336                         if (_writeBlocked && timeoutMs<=0)
337                             return false;
338                     }
339                     catch (InterruptedException e)
340                     {
341                         Log.warn(e);
342                     }
343                 }
344             }
345             finally
346             {
347                 _writeBlocked=false;
348                 if (_idleTimestamp!=-1)
349                     scheduleIdle();
350             }
351         }
352         return true;
353     }
354 
355     /* ------------------------------------------------------------ */
356     public void setWritable(boolean writable)
357     {
358         _writable=writable;
359     }
360     
361     /* ------------------------------------------------------------ */
362     public void scheduleWrite()
363     {
364         _writable=false;
365         updateKey();
366     }
367     
368     /* ------------------------------------------------------------ */
369     /**
370      * Updates selection key. Adds operations types to the selection key as needed. No operations
371      * are removed as this is only done during dispatch. This method records the new key and
372      * schedules a call to doUpdateKey to do the keyChange
373      */
374     private void updateKey()
375     {
376         synchronized (this)
377         {
378             int ops=-1;
379             if (getChannel().isOpen())
380             {
381                 _interestOps = 
382                     ((!_dispatched || _readBlocked)  ? SelectionKey.OP_READ  : 0) 
383                 |   ((!_writable   || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
384                 try
385                 {
386                     ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
387                 }
388                 catch(Exception e)
389                 {
390                     _key=null;
391                     Log.ignore(e);
392                 }
393             }
394 
395             if(_interestOps == ops && getChannel().isOpen())
396                 return;
397             
398         }
399         _selectSet.addChange(this);
400         _selectSet.wakeup();
401     }
402     
403     /* ------------------------------------------------------------ */
404     /**
405      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
406      */
407     void doUpdateKey()
408     {
409         synchronized (this)
410         {
411             if (getChannel().isOpen())
412             {
413                 if (_interestOps>0)
414                 {
415                     if (_key==null || !_key.isValid())
416                     {
417                         SelectableChannel sc = (SelectableChannel)getChannel();
418                         if (sc.isRegistered())
419                         {
420                             updateKey();   
421                         }
422                         else
423                         {
424                             try
425                             {
426                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
427                             }
428                             catch (Exception e)
429                             {
430                                 Log.ignore(e);
431                                 if (_key!=null && _key.isValid())
432                                 {
433                                     _key.cancel();
434                                 }
435                                 cancelIdle();
436 
437                                 if (_open)
438                                 {
439                                     _selectSet.destroyEndPoint(this);
440                                 }
441                                 _open=false;
442                                 _key = null;
443                             }
444                         }
445                     }
446                     else
447                     {
448                         _key.interestOps(_interestOps);
449                     }
450                 }
451                 else
452                 {
453                     if (_key.isValid())
454                         _key.interestOps(0);
455                     else
456                         _key=null;
457                 }
458             }
459             else    
460             {
461                 if (_key!=null && _key.isValid())
462                     _key.cancel(); 
463                 
464                 cancelIdle();
465                 if (_open)
466                 {
467                     _selectSet.destroyEndPoint(this);
468                 }
469                 _open=false;
470                 _key = null;
471             }
472         }
473     }
474 
475     /* ------------------------------------------------------------ */
476     /* 
477      */
478     public void run()
479     {
480         boolean dispatched=true;
481         try
482         {
483             while(dispatched)
484             {
485                 try
486                 {
487                     while(true)
488                     {
489                         final Connection next = _connection.handle();
490                         if (next!=_connection)
491                         {  
492                             _connection=next;
493                             continue;
494                         }
495                         break;
496                     }
497                 }
498                 catch (ClosedChannelException e)
499                 {
500                     Log.ignore(e);
501                 }
502                 catch (EofException e)
503                 {
504                     Log.debug("EOF", e);
505                     try{close();}
506                     catch(IOException e2){Log.ignore(e2);}
507                 }
508                 catch (IOException e)
509                 {
510                     Log.warn(e.toString());
511                     Log.debug(e);
512                     try{close();}
513                     catch(IOException e2){Log.ignore(e2);}
514                 }
515                 catch (Throwable e)
516                 {
517                     Log.warn("handle failed", e);
518                     try{close();}
519                     catch(IOException e2){Log.ignore(e2);}
520                 }
521                 dispatched=!undispatch();
522             }
523         }
524         finally
525         {
526             if (dispatched)
527             {
528                 dispatched=!undispatch();
529                 while (dispatched)
530                 {
531                     Log.warn("SCEP.run() finally DISPATCHED");
532                     dispatched=!undispatch();
533                 }
534             }
535         }
536     }
537 
538     /* ------------------------------------------------------------ */
539     /*
540      * @see org.eclipse.io.nio.ChannelEndPoint#close()
541      */
542     @Override
543     public void close() throws IOException
544     {
545         try
546         {
547             super.close();
548         }
549         catch (IOException e)
550         {
551             Log.ignore(e);
552         }   
553         finally
554         {
555             updateKey();
556         }
557     }
558     
559     /* ------------------------------------------------------------ */
560     @Override
561     public String toString()
562     {
563         synchronized(this)
564         {
565             return "SCEP@" + hashCode() + "\t[d=" + _dispatched + ",io=" + _interestOps+
566             ",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
567         }
568     }
569 
570     /* ------------------------------------------------------------ */
571     public SelectSet getSelectSet()
572     {
573         return _selectSet;
574     }
575 
576     /* ------------------------------------------------------------ */
577     /**
578      * Don't set the SoTimeout
579      * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
580      */
581     @Override
582     public void setMaxIdleTime(int timeMs) throws IOException
583     {
584         _maxIdleTime=timeMs;
585     }
586 
587     
588     
589 }