View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.ClosedChannelException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  
22  import org.eclipse.jetty.io.AsyncEndPoint;
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.ConnectedEndPoint;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28  import org.eclipse.jetty.util.log.Log;
29  
30  /* ------------------------------------------------------------ */
31  /**
32   * An Endpoint that can be scheduled by {@link SelectorManager}.
33   */
34  public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
35  {
36      private final SelectorManager.SelectSet _selectSet;
37      private final SelectorManager _manager;
38      private final Runnable _handler = new Runnable()
39          {
40              public void run() { handle(); }
41          };
42  
43      private volatile Connection _connection;
44      private boolean _dispatched = false;
45      private boolean _redispatched = false;
46      private volatile boolean _writable = true;
47  
48      private  SelectionKey _key;
49      private int _interestOps;
50      private boolean _readBlocked;
51      private boolean _writeBlocked;
52      private boolean _open;
53      private volatile long _idleTimestamp;
54      private boolean _changing=false;
55  
56      /* ------------------------------------------------------------ */
57      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
58          throws IOException
59      {
60          super(channel, maxIdleTime);
61  
62          _manager = selectSet.getManager();
63          _selectSet = selectSet;
64          _dispatched = false;
65          _redispatched = false;
66          _open=true;
67          _key = key;
68  
69          _connection = _manager.newConnection(channel,this);
70  
71          scheduleIdle();
72      }
73  
74      /* ------------------------------------------------------------ */
75      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
76          throws IOException
77      {
78          super(channel);
79  
80          _manager = selectSet.getManager();
81          _selectSet = selectSet;
82          _dispatched = false;
83          _redispatched = false;
84          _open=true;
85          _key = key;
86  
87          _connection = _manager.newConnection(channel,this);
88  
89          scheduleIdle();
90      }
91      /* ------------------------------------------------------------ */
92      public SelectionKey getSelectionKey()
93      {
94          synchronized (this)
95          {
96              return _key;
97          }
98      }
99  
100     /* ------------------------------------------------------------ */
101     public SelectorManager getSelectManager()
102     {
103         return _manager;
104     }
105 
106     /* ------------------------------------------------------------ */
107     public Connection getConnection()
108     {
109         return _connection;
110     }
111 
112     /* ------------------------------------------------------------ */
113     public void setConnection(Connection connection)
114     {
115         Connection old=_connection;
116         _connection=connection;
117         _manager.endPointUpgraded(this,old);
118     }
119 
120     /* ------------------------------------------------------------ */
121     /** Called by selectSet to schedule handling
122      *
123      */
124     public void schedule()
125     {
126         synchronized (this)
127         {
128             // If there is no key, then do nothing
129             if (_key == null || !_key.isValid())
130             {
131                 _readBlocked=false;
132                 _writeBlocked=false;
133                 this.notifyAll();
134                 return;
135             }
136 
137             // If there are threads dispatched reading and writing
138             if (_readBlocked || _writeBlocked)
139             {
140                 // assert _dispatched;
141                 if (_readBlocked && _key.isReadable())
142                     _readBlocked=false;
143                 if (_writeBlocked && _key.isWritable())
144                     _writeBlocked=false;
145 
146                 // wake them up is as good as a dispatched.
147                 this.notifyAll();
148 
149                 // we are not interested in further selecting
150                 if (_dispatched)
151                     _key.interestOps(0);
152                 return;
153             }
154 
155             // Otherwise if we are still dispatched
156             if (!isReadyForDispatch())
157             {
158                 // we are not interested in further selecting
159                 _key.interestOps(0);
160                 return;
161             }
162 
163             // Remove writeable op
164             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
165             {
166                 // Remove writeable op
167                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
168                 _key.interestOps(_interestOps);
169                 _writable = true; // Once writable is in ops, only removed with dispatch.
170             }
171 
172             // Dispatch if we are not already
173             if (!_dispatched)
174             {
175                 dispatch();
176                 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
177                 {
178                     _key.interestOps(0);
179                 }
180             }
181         }
182     }
183 
184     /* ------------------------------------------------------------ */
185     public void dispatch()
186     {
187         synchronized(this)
188         {
189             if (_dispatched)
190             {
191                 _redispatched=true;
192             }
193             else
194             {
195                 _dispatched = true;
196                 boolean dispatched = _manager.dispatch(_handler);
197                 if(!dispatched)
198                 {
199                     _dispatched = false;
200                     Log.warn("Dispatched Failed!");
201                     updateKey();
202                 }
203             }
204         }
205     }
206 
207     /* ------------------------------------------------------------ */
208     /**
209      * Called when a dispatched thread is no longer handling the endpoint.
210      * The selection key operations are updated.
211      * @return If false is returned, the endpoint has been redispatched and
212      * thread must keep handling the endpoint.
213      */
214     protected boolean undispatch()
215     {
216         synchronized (this)
217         {
218             if (_redispatched)
219             {
220                 _redispatched=false;
221                 return false;
222             }
223             _dispatched = false;
224             updateKey();
225         }
226         return true;
227     }
228 
229     /* ------------------------------------------------------------ */
230     public void scheduleIdle()
231     {
232         _idleTimestamp=System.currentTimeMillis();
233     }
234 
235     /* ------------------------------------------------------------ */
236     public void cancelIdle()
237     {
238         _idleTimestamp=0;
239     }
240 
241     /* ------------------------------------------------------------ */
242     public void checkIdleTimestamp(long now)
243     {
244         if (_idleTimestamp!=0 && _maxIdleTime!=0 && now>(_idleTimestamp+_maxIdleTime))
245             idleExpired();
246     }
247 
248     /* ------------------------------------------------------------ */
249     protected void idleExpired()
250     {
251         try
252         {
253             close();
254         }
255         catch (IOException e)
256         {
257             Log.ignore(e);
258         }
259     }
260 
261     /* ------------------------------------------------------------ */
262     /*
263      */
264     @Override
265     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
266     {
267         int l = super.flush(header, buffer, trailer);
268         if (!(_writable=l!=0))
269         {
270             synchronized (this)
271             {
272                 if (!_dispatched)
273                     updateKey();
274             }
275         }
276         return l;
277     }
278 
279     /* ------------------------------------------------------------ */
280     /*
281      */
282     @Override
283     public int flush(Buffer buffer) throws IOException
284     {
285         int l = super.flush(buffer);
286         if (!(_writable=l!=0))
287         {
288             synchronized (this)
289             {
290                 if (!_dispatched)
291                     updateKey();
292             }
293         }
294         return l;
295     }
296 
297     /* ------------------------------------------------------------ */
298     public boolean isReadyForDispatch()
299     {
300         synchronized (this)
301         {
302             // Ready if not dispatched and not suspended
303             return !(_dispatched || getConnection().isSuspended());
304         }
305     }
306 
307     /* ------------------------------------------------------------ */
308     /*
309      * Allows thread to block waiting for further events.
310      */
311     @Override
312     public boolean blockReadable(long timeoutMs) throws IOException
313     {
314         synchronized (this)
315         {
316             long start=_selectSet.getNow();
317             try
318             {
319                 _readBlocked=true;
320                 while (isOpen() && _readBlocked)
321                 {
322                     try
323                     {
324                         updateKey();
325                         this.wait(timeoutMs);
326 
327                         timeoutMs -= _selectSet.getNow()-start;
328                         if (_readBlocked && timeoutMs<=0)
329                             return false;
330                     }
331                     catch (InterruptedException e)
332                     {
333                         Log.warn(e);
334                     }
335                 }
336             }
337             finally
338             {
339                 _readBlocked=false;
340             }
341         }
342         return true;
343     }
344 
345     /* ------------------------------------------------------------ */
346     /*
347      * Allows thread to block waiting for further events.
348      */
349     @Override
350     public boolean blockWritable(long timeoutMs) throws IOException
351     {
352         synchronized (this)
353         {
354             long start=_selectSet.getNow();
355             try
356             {
357                 _writeBlocked=true;
358                 while (isOpen() && _writeBlocked)
359                 {
360                     try
361                     {
362                         updateKey();
363                         this.wait(timeoutMs);
364 
365                         timeoutMs -= _selectSet.getNow()-start;
366                         if (_writeBlocked && timeoutMs<=0)
367                             return false;
368                     }
369                     catch (InterruptedException e)
370                     {
371                         Log.warn(e);
372                     }
373                 }
374             }
375             finally
376             {
377                 _writeBlocked=false;
378                 if (_idleTimestamp!=-1)
379                     scheduleIdle();
380             }
381         }
382         return true;
383     }
384 
385     /* ------------------------------------------------------------ */
386     public void setWritable(boolean writable)
387     {
388         _writable=writable;
389     }
390 
391     /* ------------------------------------------------------------ */
392     public void scheduleWrite()
393     {
394         _writable=false;
395         updateKey();
396     }
397 
398     /* ------------------------------------------------------------ */
399     /**
400      * Updates selection key. Adds operations types to the selection key as needed. No operations
401      * are removed as this is only done during dispatch. This method records the new key and
402      * schedules a call to doUpdateKey to do the keyChange
403      */
404     private void updateKey()
405     {
406         synchronized (this)
407         {
408             int ops=-1;
409             if (getChannel().isOpen())
410             {
411                 _interestOps =
412                     ((!_dispatched || _readBlocked)  ? SelectionKey.OP_READ  : 0)
413                 |   ((!_writable   || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
414                 try
415                 {
416                     ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
417                 }
418                 catch(Exception e)
419                 {
420                     _key=null;
421                     Log.ignore(e);
422                 }
423             }
424 
425             if(_interestOps == ops && getChannel().isOpen())
426                 return;
427             _changing=true;
428         }
429         _selectSet.addChange(this);
430         _selectSet.wakeup();
431     }
432 
433     /* ------------------------------------------------------------ */
434     /**
435      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
436      */
437     void doUpdateKey()
438     {
439         synchronized (this)
440         {
441             _changing=false;
442             if (getChannel().isOpen())
443             {
444                 if (_interestOps>0)
445                 {
446                     if (_key==null || !_key.isValid())
447                     {
448                         SelectableChannel sc = (SelectableChannel)getChannel();
449                         if (sc.isRegistered())
450                         {
451                             updateKey();
452                         }
453                         else
454                         {
455                             try
456                             {
457                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
458                             }
459                             catch (Exception e)
460                             {
461                                 Log.ignore(e);
462                                 if (_key!=null && _key.isValid())
463                                 {
464                                     _key.cancel();
465                                 }
466                                 cancelIdle();
467 
468                                 if (_open)
469                                 {
470                                     _selectSet.destroyEndPoint(this);
471                                 }
472                                 _open=false;
473                                 _key = null;
474                             }
475                         }
476                     }
477                     else
478                     {
479                         _key.interestOps(_interestOps);
480                     }
481                 }
482                 else
483                 {
484                     if (_key!=null && _key.isValid())
485                         _key.interestOps(0);
486                     else
487                         _key=null;
488                 }
489             }
490             else
491             {
492                 if (_key!=null && _key.isValid())
493                     _key.cancel();
494 
495                 cancelIdle();
496                 if (_open)
497                 {
498                     _selectSet.destroyEndPoint(this);
499                 }
500                 _open=false;
501                 _key = null;
502             }
503         }
504     }
505 
506     /* ------------------------------------------------------------ */
507     /*
508      */
509     protected void handle()
510     {
511         boolean dispatched=true;
512         try
513         {
514             while(dispatched)
515             {
516                 try
517                 {
518                     while(true)
519                     {
520                         final Connection next = _connection.handle();
521                         if (next!=_connection)
522                         {
523                             _connection=next;
524                             continue;
525                         }
526                         break;
527                     }
528                 }
529                 catch (ClosedChannelException e)
530                 {
531                     Log.ignore(e);
532                 }
533                 catch (EofException e)
534                 {
535                     Log.debug("EOF", e);
536                     try{close();}
537                     catch(IOException e2){Log.ignore(e2);}
538                 }
539                 catch (IOException e)
540                 {
541                     Log.warn(e.toString());
542                     Log.debug(e);
543                     try{close();}
544                     catch(IOException e2){Log.ignore(e2);}
545                 }
546                 catch (Throwable e)
547                 {
548                     Log.warn("handle failed", e);
549                     try{close();}
550                     catch(IOException e2){Log.ignore(e2);}
551                 }
552                 dispatched=!undispatch();
553             }
554         }
555         finally
556         {
557             if (dispatched)
558             {
559                 dispatched=!undispatch();
560                 while (dispatched)
561                 {
562                     Log.warn("SCEP.run() finally DISPATCHED");
563                     dispatched=!undispatch();
564                 }
565             }
566         }
567     }
568 
569     /* ------------------------------------------------------------ */
570     /*
571      * @see org.eclipse.io.nio.ChannelEndPoint#close()
572      */
573     @Override
574     public void close() throws IOException
575     {
576         try
577         {
578             super.close();
579         }
580         catch (IOException e)
581         {
582             Log.ignore(e);
583         }
584         finally
585         {
586             updateKey();
587         }
588     }
589 
590     /* ------------------------------------------------------------ */
591     @Override
592     public String toString()
593     {
594         synchronized(this)
595         {
596             return "SCEP@" + hashCode() + _channel+            
597             "[d=" + _dispatched + ",io=" + _interestOps+
598             ",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
599         }
600     }
601 
602     /* ------------------------------------------------------------ */
603     public SelectSet getSelectSet()
604     {
605         return _selectSet;
606     }
607 
608     /* ------------------------------------------------------------ */
609     /**
610      * Don't set the SoTimeout
611      * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
612      */
613     @Override
614     public void setMaxIdleTime(int timeMs) throws IOException
615     {
616         _maxIdleTime=timeMs;
617     }
618 
619     /* ------------------------------------------------------------ */
620     /**
621      * This looks for undispatched endpoints that have key.interestOps!=endp.interestOps
622      * @TODO find out the root cause of this and delete this method.
623      */
624     public void checkKey(SelectionKey key)
625     {                  
626         synchronized (this)
627         {
628             if (!_changing && key.interestOps()!=_interestOps && isReadyForDispatch())
629             {
630                 Log.warn("NIO InterestOps mismatch "+key.interestOps()+"!="+_interestOps+" for "+this);
631                 updateKey();
632             }
633         } 
634     }
635 
636 }