View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.ClosedChannelException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  
22  import org.eclipse.jetty.io.AsyncEndPoint;
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.ConnectedEndPoint;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.thread.Timeout;
30  
31  /* ------------------------------------------------------------ */
32  /**
33   * An Endpoint that can be scheduled by {@link SelectorManager}.
34   */
35  public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint, ConnectedEndPoint
36  {
37      private final SelectorManager.SelectSet _selectSet;
38      private final SelectorManager _manager;
39      private volatile Connection _connection;
40      private boolean _dispatched = false;
41      private boolean _redispatched = false;
42      private volatile boolean _writable = true; 
43      
44      private  SelectionKey _key;
45      private int _interestOps;
46      private boolean _readBlocked;
47      private boolean _writeBlocked;
48      private boolean _open;
49      private volatile long _idleTimestamp;
50  
51      /* ------------------------------------------------------------ */
52      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
53          throws IOException
54      {
55          super(channel);
56  
57          _manager = selectSet.getManager();
58          _selectSet = selectSet;
59          _dispatched = false;
60          _redispatched = false;
61          _open=true;       
62          _key = key;
63  
64          _connection = _manager.newConnection(channel,this);
65          
66          scheduleIdle();
67      }
68  
69      /* ------------------------------------------------------------ */
70      public SelectionKey getSelectionKey()
71      {
72          synchronized (this)
73          {
74              return _key;
75          }
76      }
77      
78      /* ------------------------------------------------------------ */
79      public SelectorManager getSelectManager()
80      {
81          return _manager;
82      }
83      
84      /* ------------------------------------------------------------ */
85      public Connection getConnection()
86      {
87          return _connection;
88      }
89      
90      /* ------------------------------------------------------------ */
91      public void setConnection(Connection connection)
92      {
93          Connection old=_connection;
94          _connection=connection;
95          _manager.endPointUpgraded(this,old);
96      }
97  
98      /* ------------------------------------------------------------ */
99      /** Called by selectSet to schedule handling
100      * 
101      */
102     public void schedule() 
103     {
104         synchronized (this)
105         {
106             // If there is no key, then do nothing
107             if (_key == null || !_key.isValid())
108             {
109                 _readBlocked=false;
110                 _writeBlocked=false;
111                 this.notifyAll();
112                 return;
113             }
114             
115             // If there are threads dispatched reading and writing
116             if (_readBlocked || _writeBlocked)
117             {
118                 // assert _dispatched;
119                 if (_readBlocked && _key.isReadable())
120                     _readBlocked=false;
121                 if (_writeBlocked && _key.isWritable())
122                     _writeBlocked=false;
123 
124                 // wake them up is as good as a dispatched.
125                 this.notifyAll();
126                 
127                 // we are not interested in further selecting
128                 _key.interestOps(0);
129                 return;
130             }
131 
132             // Otherwise if we are still dispatched
133             if (!isReadyForDispatch())
134             {
135                 // we are not interested in further selecting
136                 _key.interestOps(0);
137                 return;
138             }
139 
140 
141             // Remove writeable op
142             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
143             {
144                 // Remove writeable op
145                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
146                 _key.interestOps(_interestOps);
147                 _writable = true; // Once writable is in ops, only removed with dispatch.
148             }
149 
150             if (_dispatched)
151                 _key.interestOps(0);
152             else
153                 dispatch();
154         }
155     }
156     
157     /* ------------------------------------------------------------ */
158     public void dispatch() 
159     {
160         synchronized(this)
161         {
162             if (_dispatched)
163                 _redispatched=true;
164             else
165             {
166                 _dispatched = _manager.dispatch(this);
167                 if(!_dispatched)
168                 {
169                     Log.warn("Dispatched Failed!");
170                     updateKey();
171                 }
172             }
173         }
174     }
175 
176     /* ------------------------------------------------------------ */
177     /**
178      * Called when a dispatched thread is no longer handling the endpoint. 
179      * The selection key operations are updated.
180      * @return If false is returned, the endpoint has been redispatched and 
181      * thread must keep handling the endpoint.
182      */
183     private boolean undispatch()
184     {
185         synchronized (this)
186         {
187             if (_redispatched)
188             {
189                 _redispatched=false;
190                 return false;
191             }
192             _dispatched = false;
193             updateKey();
194         }
195         return true;
196     }
197 
198     /* ------------------------------------------------------------ */
199     public void scheduleIdle()
200     {
201         _idleTimestamp=System.currentTimeMillis();
202     }
203 
204     /* ------------------------------------------------------------ */
205     public void cancelIdle()
206     {
207         _idleTimestamp=0;
208     }
209     
210     /* ------------------------------------------------------------ */
211     public void checkIdleTimestamp(long now)
212     {
213         if (_idleTimestamp!=0 && _maxIdleTime!=0 && now>(_idleTimestamp+_maxIdleTime))
214         {
215             System.err.println("EXPIRED "+now+">("+_idleTimestamp+"+"+_maxIdleTime+")");
216             idleExpired();
217         }
218     }
219 
220     /* ------------------------------------------------------------ */
221     protected void idleExpired()
222     {
223         try
224         {
225             close();
226         }
227         catch (IOException e)
228         {
229             Log.ignore(e);
230         }
231     }
232 
233     /* ------------------------------------------------------------ */
234     /*
235      */
236     @Override
237     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
238     {
239         int l = super.flush(header, buffer, trailer);
240         if (!(_writable=l!=0))
241         {
242             synchronized (this)
243             {
244                 if (!_dispatched)
245                     updateKey();
246             }
247         }
248         return l;
249     }
250 
251     /* ------------------------------------------------------------ */
252     /*
253      */
254     @Override
255     public int flush(Buffer buffer) throws IOException
256     {
257         int l = super.flush(buffer);
258         if (!(_writable=l!=0))
259         {
260             synchronized (this)
261             {
262                 if (!_dispatched)
263                     updateKey();
264             }
265         }
266         return l;
267     }
268 
269     /* ------------------------------------------------------------ */
270     public boolean isReadyForDispatch()
271     {
272         synchronized (this)
273         {
274             return !(_dispatched || getConnection().isSuspended());
275         }
276     }
277     
278     /* ------------------------------------------------------------ */
279     /*
280      * Allows thread to block waiting for further events.
281      */
282     @Override
283     public boolean blockReadable(long timeoutMs) throws IOException
284     {
285         synchronized (this)
286         {
287             long start=_selectSet.getNow();
288             try
289             {   
290                 _readBlocked=true;
291                 while (isOpen() && _readBlocked)
292                 {
293                     try
294                     {
295                         updateKey();
296                         this.wait(timeoutMs);
297 
298                         timeoutMs -= _selectSet.getNow()-start;
299                         if (_readBlocked && timeoutMs<=0)
300                             return false;
301                     }
302                     catch (InterruptedException e)
303                     {
304                         Log.warn(e);
305                     }
306                 }
307             }
308             finally
309             {
310                 _readBlocked=false;
311             }
312         }
313         return true;
314     }
315 
316     /* ------------------------------------------------------------ */
317     /*
318      * Allows thread to block waiting for further events.
319      */
320     @Override
321     public boolean blockWritable(long timeoutMs) throws IOException
322     {
323         synchronized (this)
324         {
325             long start=_selectSet.getNow();
326             try
327             {   
328                 _writeBlocked=true;
329                 while (isOpen() && _writeBlocked)
330                 {
331                     try
332                     {
333                         updateKey();
334                         this.wait(timeoutMs);
335 
336                         timeoutMs -= _selectSet.getNow()-start;
337                         if (_writeBlocked && timeoutMs<=0)
338                             return false;
339                     }
340                     catch (InterruptedException e)
341                     {
342                         Log.warn(e);
343                     }
344                 }
345             }
346             finally
347             {
348                 _writeBlocked=false;
349                 if (_idleTimestamp!=-1)
350                     scheduleIdle();
351             }
352         }
353         return true;
354     }
355 
356     /* ------------------------------------------------------------ */
357     public void setWritable(boolean writable)
358     {
359         _writable=writable;
360     }
361     
362     /* ------------------------------------------------------------ */
363     public void scheduleWrite()
364     {
365         _writable=false;
366         updateKey();
367     }
368     
369     /* ------------------------------------------------------------ */
370     /**
371      * Updates selection key. Adds operations types to the selection key as needed. No operations
372      * are removed as this is only done during dispatch. This method records the new key and
373      * schedules a call to doUpdateKey to do the keyChange
374      */
375     private void updateKey()
376     {
377         synchronized (this)
378         {
379             int ops=-1;
380             if (getChannel().isOpen())
381             {
382                 _interestOps = 
383                     ((!_dispatched || _readBlocked)  ? SelectionKey.OP_READ  : 0) 
384                 |   ((!_writable   || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
385                 try
386                 {
387                     ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
388                 }
389                 catch(Exception e)
390                 {
391                     _key=null;
392                     Log.ignore(e);
393                 }
394             }
395 
396             if(_interestOps == ops && getChannel().isOpen())
397                 return;
398             
399         }
400         _selectSet.addChange(this);
401         _selectSet.wakeup();
402     }
403     
404     /* ------------------------------------------------------------ */
405     /**
406      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
407      */
408     void doUpdateKey()
409     {
410         synchronized (this)
411         {
412             if (getChannel().isOpen())
413             {
414                 if (_interestOps>0)
415                 {
416                     if (_key==null || !_key.isValid())
417                     {
418                         SelectableChannel sc = (SelectableChannel)getChannel();
419                         if (sc.isRegistered())
420                         {
421                             updateKey();   
422                         }
423                         else
424                         {
425                             try
426                             {
427                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
428                             }
429                             catch (Exception e)
430                             {
431                                 Log.ignore(e);
432                                 if (_key!=null && _key.isValid())
433                                 {
434                                     _key.cancel();
435                                 }
436                                 cancelIdle();
437 
438                                 if (_open)
439                                 {
440                                     _selectSet.destroyEndPoint(this);
441                                 }
442                                 _open=false;
443                                 _key = null;
444                             }
445                         }
446                     }
447                     else
448                     {
449                         _key.interestOps(_interestOps);
450                     }
451                 }
452                 else
453                 {
454                     if (_key.isValid())
455                         _key.interestOps(0);
456                     else
457                         _key=null;
458                 }
459             }
460             else    
461             {
462                 if (_key!=null && _key.isValid())
463                     _key.cancel(); 
464                 
465                 cancelIdle();
466                 if (_open)
467                 {
468                     _selectSet.destroyEndPoint(this);
469                 }
470                 _open=false;
471                 _key = null;
472             }
473         }
474     }
475 
476     /* ------------------------------------------------------------ */
477     /* 
478      */
479     public void run()
480     {
481         boolean dispatched=true;
482         try
483         {
484             while(dispatched)
485             {
486                 try
487                 {
488                     while(true)
489                     {
490                         final Connection next = _connection.handle();
491                         if (next!=_connection)
492                         {  
493                             _connection=next;
494                             continue;
495                         }
496                         break;
497                     }
498                 }
499                 catch (ClosedChannelException e)
500                 {
501                     Log.ignore(e);
502                 }
503                 catch (EofException e)
504                 {
505                     Log.debug("EOF", e);
506                     try{close();}
507                     catch(IOException e2){Log.ignore(e2);}
508                 }
509                 catch (IOException e)
510                 {
511                     Log.warn(e.toString());
512                     Log.debug(e);
513                     try{close();}
514                     catch(IOException e2){Log.ignore(e2);}
515                 }
516                 catch (Throwable e)
517                 {
518                     Log.warn("handle failed", e);
519                     try{close();}
520                     catch(IOException e2){Log.ignore(e2);}
521                 }
522                 dispatched=!undispatch();
523             }
524         }
525         finally
526         {
527             if (dispatched)
528             {
529                 dispatched=!undispatch();
530                 while (dispatched)
531                 {
532                     Log.warn("SCEP.run() finally DISPATCHED");
533                     dispatched=!undispatch();
534                 }
535             }
536         }
537     }
538 
539     /* ------------------------------------------------------------ */
540     /*
541      * @see org.eclipse.io.nio.ChannelEndPoint#close()
542      */
543     @Override
544     public void close() throws IOException
545     {
546         try
547         {
548             super.close();
549         }
550         catch (IOException e)
551         {
552             Log.ignore(e);
553         }   
554         finally
555         {
556             updateKey();
557         }
558     }
559     
560     /* ------------------------------------------------------------ */
561     @Override
562     public String toString()
563     {
564         synchronized(this)
565         {
566             return "SCEP@" + hashCode() + "\t[d=" + _dispatched + ",io=" + _interestOps+
567             ",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
568         }
569     }
570 
571     /* ------------------------------------------------------------ */
572     public SelectSet getSelectSet()
573     {
574         return _selectSet;
575     }
576 
577     /* ------------------------------------------------------------ */
578     /**
579      * Don't set the SoTimeout
580      * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
581      */
582     @Override
583     public void setMaxIdleTime(int timeMs) throws IOException
584     {
585         _maxIdleTime=timeMs;
586     }
587 
588     
589     
590 }