View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.ClosedChannelException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  
22  import org.eclipse.jetty.io.AsyncEndPoint;
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.ConnectedEndPoint;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  
31  /* ------------------------------------------------------------ */
32  /**
33   * An Endpoint that can be scheduled by {@link SelectorManager}.
34   */
35  public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
36  {
37      public static final Logger __log=Log.getLogger("org.eclipse.jetty.io.nio");
38      
39      private final SelectorManager.SelectSet _selectSet;
40      private final SelectorManager _manager;
41      private final Runnable _handler = new Runnable()
42          {
43              public void run() { handle(); }
44          };
45  
46      private volatile Connection _connection;
47      private boolean _dispatched = false;
48      private boolean _redispatched = false;
49      private volatile boolean _writable = true;
50  
51      private  SelectionKey _key;
52      private int _interestOps;
53      private boolean _readBlocked;
54      private boolean _writeBlocked;
55      private boolean _open;
56      private volatile long _idleTimestamp;
57  
58      /* ------------------------------------------------------------ */
59      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
60          throws IOException
61      {
62          super(channel, maxIdleTime);
63  
64          _manager = selectSet.getManager();
65          _selectSet = selectSet;
66          _dispatched = false;
67          _redispatched = false;
68          _open=true;
69          _key = key;
70  
71          _connection = _manager.newConnection(channel,this);
72  
73          scheduleIdle();
74      }
75  
76      /* ------------------------------------------------------------ */
77      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
78          throws IOException
79      {
80          super(channel);
81  
82          _manager = selectSet.getManager();
83          _selectSet = selectSet;
84          _dispatched = false;
85          _redispatched = false;
86          _open=true;
87          _key = key;
88  
89          _connection = _manager.newConnection(channel,this);
90  
91          scheduleIdle();
92      }
93      /* ------------------------------------------------------------ */
94      public SelectionKey getSelectionKey()
95      {
96          synchronized (this)
97          {
98              return _key;
99          }
100     }
101 
102     /* ------------------------------------------------------------ */
103     public SelectorManager getSelectManager()
104     {
105         return _manager;
106     }
107 
108     /* ------------------------------------------------------------ */
109     public Connection getConnection()
110     {
111         return _connection;
112     }
113 
114     /* ------------------------------------------------------------ */
115     public void setConnection(Connection connection)
116     {
117         Connection old=_connection;
118         _connection=connection;
119         _manager.endPointUpgraded(this,old);
120     }
121 
122     /* ------------------------------------------------------------ */
123     public long getIdleTimestamp()
124     {
125         return _idleTimestamp;
126     }
127     
128     /* ------------------------------------------------------------ */
129     /** Called by selectSet to schedule handling
130      *
131      */
132     public void schedule()
133     {
134         synchronized (this)
135         {
136             // If there is no key, then do nothing
137             if (_key == null || !_key.isValid())
138             {
139                 _readBlocked=false;
140                 _writeBlocked=false;
141                 this.notifyAll();
142                 return;
143             }
144 
145             // If there are threads dispatched reading and writing
146             if (_readBlocked || _writeBlocked)
147             {
148                 // assert _dispatched;
149                 if (_readBlocked && _key.isReadable())
150                     _readBlocked=false;
151                 if (_writeBlocked && _key.isWritable())
152                     _writeBlocked=false;
153 
154                 // wake them up is as good as a dispatched.
155                 this.notifyAll();
156 
157                 // we are not interested in further selecting
158                 if (_dispatched)
159                     _key.interestOps(0);
160                 return;
161             }
162 
163             // Otherwise if we are still dispatched
164             if (!isReadyForDispatch())
165             {
166                 // we are not interested in further selecting
167                 _key.interestOps(0);
168                 return;
169             }
170 
171             // Remove writeable op
172             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
173             {
174                 // Remove writeable op
175                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
176                 _key.interestOps(_interestOps);
177                 _writable = true; // Once writable is in ops, only removed with dispatch.
178             }
179 
180             // Dispatch if we are not already
181             if (!_dispatched)
182             {
183                 dispatch();
184                 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
185                 {
186                     _key.interestOps(0);
187                 }
188             }
189         }
190     }
191 
192     /* ------------------------------------------------------------ */
193     public void dispatch()
194     {
195         synchronized(this)
196         {
197             if (_dispatched)
198             {
199                 _redispatched=true;
200             }
201             else
202             {
203                 _dispatched = true;
204                 boolean dispatched = _manager.dispatch(_handler);
205                 if(!dispatched)
206                 {
207                     _dispatched = false;
208                     __log.warn("Dispatched Failed! "+this+" to "+_manager);
209                     updateKey();
210                 }
211             }
212         }
213     }
214 
215     /* ------------------------------------------------------------ */
216     /**
217      * Called when a dispatched thread is no longer handling the endpoint.
218      * The selection key operations are updated.
219      * @return If false is returned, the endpoint has been redispatched and
220      * thread must keep handling the endpoint.
221      */
222     protected boolean undispatch()
223     {
224         synchronized (this)
225         {
226             if (_redispatched)
227             {
228                 _redispatched=false;
229                 return false;
230             }
231             _dispatched = false;
232             updateKey();
233         }
234         return true;
235     }
236 
237     /* ------------------------------------------------------------ */
238     public void scheduleIdle()
239     {
240         _idleTimestamp=System.currentTimeMillis();
241     }
242 
243     /* ------------------------------------------------------------ */
244     public void cancelIdle()
245     {
246         _idleTimestamp=0;
247     }
248 
249     /* ------------------------------------------------------------ */
250     public void checkIdleTimestamp(long now)
251     {
252         long idleTimestamp=_idleTimestamp;
253         if (idleTimestamp!=0 && _maxIdleTime!=0 && now>(idleTimestamp+_maxIdleTime))
254             idleExpired();
255     }
256 
257     /* ------------------------------------------------------------ */
258     protected void idleExpired()
259     {
260         _connection.idleExpired();
261     }
262 
263     /* ------------------------------------------------------------ */
264     /*
265      */
266     @Override
267     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
268     {
269         int l = super.flush(header, buffer, trailer);
270         
271         // If there was something to write and it wasn't written, then we are not writable.
272         if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
273         {
274             synchronized (this)
275             {
276                 _writable=false;
277                 if (!_dispatched)
278                     updateKey();
279             }
280         }
281         else
282             _writable=true;
283         return l;
284     }
285 
286     /* ------------------------------------------------------------ */
287     /*
288      */
289     @Override
290     public int flush(Buffer buffer) throws IOException
291     {
292         int l = super.flush(buffer);
293         
294         // If there was something to write and it wasn't written, then we are not writable.
295         if (l==0 && buffer!=null && buffer.hasContent())
296         {
297             synchronized (this)
298             {
299                 _writable=false;
300                 if (!_dispatched)
301                     updateKey();
302             }
303         }
304         else
305             _writable=true;
306         
307         return l;
308     }
309 
310     /* ------------------------------------------------------------ */
311     public boolean isReadyForDispatch()
312     {
313         synchronized (this)
314         {
315             // Ready if not dispatched and not suspended
316             return !(_dispatched || getConnection().isSuspended());
317         }
318     }
319 
320     /* ------------------------------------------------------------ */
321     /*
322      * Allows thread to block waiting for further events.
323      */
324     @Override
325     public boolean blockReadable(long timeoutMs) throws IOException
326     {
327         synchronized (this)
328         {
329             long now=_selectSet.getNow();
330             long end=now+timeoutMs;
331             try
332             {
333                 _readBlocked=true;
334                 while (isOpen() && _readBlocked)
335                 {
336                     try
337                     {
338                         updateKey();
339                         this.wait(timeoutMs>=0?(end-now):10000);
340                     }
341                     catch (InterruptedException e)
342                     {
343                         __log.warn(e);
344                     }
345                     finally
346                     {
347                         now=_selectSet.getNow();
348                     }
349 
350                     if (_readBlocked && timeoutMs>0 && now>=end)
351                         return false;
352                 }
353             }
354             finally
355             {
356                 _readBlocked=false;
357             }
358         }
359         return true;
360     }
361 
362     /* ------------------------------------------------------------ */
363     /*
364      * Allows thread to block waiting for further events.
365      */
366     @Override
367     public boolean blockWritable(long timeoutMs) throws IOException
368     {
369         synchronized (this)
370         {
371             if (!isOpen() || isOutputShutdown())
372                 throw new EofException();
373             
374             long now=_selectSet.getNow();
375             long end=now+timeoutMs;
376             try
377             {
378                 _writeBlocked=true;
379                 while (isOpen() && _writeBlocked && !isOutputShutdown())
380                 {
381                     try
382                     {
383                         updateKey();
384                         this.wait(timeoutMs>=0?(end-now):10000);
385                     }
386                     catch (InterruptedException e)
387                     {
388                         __log.warn(e);
389                     }
390                     finally
391                     {
392                         now=_selectSet.getNow();
393                     }
394                     if (_writeBlocked && timeoutMs>0 && now>=end)
395                         return false;
396                 }
397             }
398             catch(Throwable e)
399             {
400                 // TODO remove this if it finds nothing
401                 __log.warn(e);
402                 if (e instanceof RuntimeException)
403                     throw (RuntimeException)e;
404                 if (e instanceof Error)
405                     throw (Error)e;
406                 throw new RuntimeException(e);
407             }
408             finally
409             {
410                 _writeBlocked=false;
411                 if (_idleTimestamp!=-1)
412                     scheduleIdle();
413             }
414         }
415         return true;
416     }
417     
418     /* ------------------------------------------------------------ */
419     public void scheduleWrite()
420     {
421         _writable=false;
422         updateKey();
423     }
424 
425     /* ------------------------------------------------------------ */
426     /**
427      * Updates selection key. Adds operations types to the selection key as needed. No operations
428      * are removed as this is only done during dispatch. This method records the new key and
429      * schedules a call to doUpdateKey to do the keyChange
430      */
431     private void updateKey()
432     {
433         synchronized (this)
434         {
435             int ops=-1;
436             if (getChannel().isOpen())
437             {
438                 _interestOps =
439                     ((!_socket.isInputShutdown() && (!_dispatched || _readBlocked))  ? SelectionKey.OP_READ  : 0)
440                 |   ((!_socket.isOutputShutdown()&& (!_writable   || _writeBlocked)) ? SelectionKey.OP_WRITE : 0);
441                 try
442                 {
443                     ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
444                 }
445                 catch(Exception e)
446                 {
447                     _key=null;
448                     __log.ignore(e);
449                 }
450             }
451 
452             if(_interestOps == ops && getChannel().isOpen())
453                 return;
454         }
455         _selectSet.addChange(this);
456         _selectSet.wakeup();
457     }
458 
459     /* ------------------------------------------------------------ */
460     /**
461      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
462      */
463     void doUpdateKey()
464     {
465         synchronized (this)
466         {
467             if (getChannel().isOpen())
468             {
469                 if (_interestOps>0)
470                 {
471                     if (_key==null || !_key.isValid())
472                     {
473                         SelectableChannel sc = (SelectableChannel)getChannel();
474                         if (sc.isRegistered())
475                         {
476                             updateKey();
477                         }
478                         else
479                         {
480                             try
481                             {
482                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
483                             }
484                             catch (Exception e)
485                             {
486                                 __log.ignore(e);
487                                 if (_key!=null && _key.isValid())
488                                 {
489                                     _key.cancel();
490                                 }
491                                 cancelIdle();
492 
493                                 if (_open)
494                                 {
495                                     _selectSet.destroyEndPoint(this);
496                                 }
497                                 _open=false;
498                                 _key = null;
499                             }
500                         }
501                     }
502                     else
503                     {
504                         _key.interestOps(_interestOps);
505                     }
506                 }
507                 else
508                 {
509                     if (_key!=null && _key.isValid())
510                         _key.interestOps(0);
511                     else
512                         _key=null;
513                 }
514             }
515             else
516             {
517                 if (_key!=null && _key.isValid())
518                     _key.cancel();
519 
520                 cancelIdle();
521                 if (_open)
522                 {
523                     _selectSet.destroyEndPoint(this);
524                 }
525                 _open=false;
526                 _key = null;
527             }
528         }
529     }
530 
531     /* ------------------------------------------------------------ */
532     /*
533      */
534     protected void handle()
535     {
536         boolean dispatched=true;
537         try
538         {
539             while(dispatched)
540             {
541                 try
542                 {
543                     while(true)
544                     {
545                         final Connection next = _connection.handle();
546                         if (next!=_connection)
547                         {
548                             __log.debug("{} replaced {}",next,_connection);
549                             _connection=next;
550                             continue;
551                         }
552                         break;
553                     }
554                 }
555                 catch (ClosedChannelException e)
556                 {
557                     __log.ignore(e);
558                 }
559                 catch (EofException e)
560                 {
561                     __log.debug("EOF", e);
562                     try{close();}
563                     catch(IOException e2){__log.ignore(e2);}
564                 }
565                 catch (IOException e)
566                 {
567                     __log.warn(e.toString());
568                     __log.debug(e);
569                     try{close();}
570                     catch(IOException e2){__log.ignore(e2);}
571                 }
572                 catch (Throwable e)
573                 {
574                     __log.warn("handle failed", e);
575                     try{close();}
576                     catch(IOException e2){__log.ignore(e2);}
577                 }
578                 dispatched=!undispatch();
579             }
580         }
581         finally
582         {
583             if (dispatched)
584             {
585                 dispatched=!undispatch();
586                 while (dispatched)
587                 {
588                     __log.warn("SCEP.run() finally DISPATCHED");
589                     dispatched=!undispatch();
590                 }
591             }
592         }
593     }
594 
595     /* ------------------------------------------------------------ */
596     /*
597      * @see org.eclipse.io.nio.ChannelEndPoint#close()
598      */
599     @Override
600     public void close() throws IOException
601     {
602         try
603         {
604             super.close();
605         }
606         catch (IOException e)
607         {
608             __log.ignore(e);
609         }
610         finally
611         {
612             updateKey();
613         }
614     }
615 
616     /* ------------------------------------------------------------ */
617     @Override
618     public String toString()
619     {
620         synchronized(this)
621         {
622             return "SCEP@" + hashCode() + _channel+            
623             "[d=" + _dispatched + ",io=" + _interestOps+
624             ",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
625         }
626     }
627 
628     /* ------------------------------------------------------------ */
629     public SelectSet getSelectSet()
630     {
631         return _selectSet;
632     }
633 
634     /* ------------------------------------------------------------ */
635     /**
636      * Don't set the SoTimeout
637      * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
638      */
639     @Override
640     public void setMaxIdleTime(int timeMs) throws IOException
641     {
642         _maxIdleTime=timeMs;
643     }
644 
645 }