View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.ClosedChannelException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  
22  import org.eclipse.jetty.io.AsyncEndPoint;
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.ConnectedEndPoint;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  import org.eclipse.jetty.util.thread.Timeout.Task;
31  
32  /* ------------------------------------------------------------ */
33  /**
34   * An Endpoint that can be scheduled by {@link SelectorManager}.
35   */
36  public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
37  {
38      public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
39  
40      private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase().contains("win");
41      private final SelectorManager.SelectSet _selectSet;
42      private final SelectorManager _manager;
43      private  SelectionKey _key;
44      private final Runnable _handler = new Runnable()
45          {
46              public void run() { handle(); }
47          };
48  
49      /** The desired value for {@link SelectionKey#interestOps()} */
50      private int _interestOps;
51  
52      /**
53       * The connection instance is the handler for any IO activity on the endpoint.
54       * There is a different type of connection for HTTP, AJP, WebSocket and
55       * ProxyConnect.   The connection may change for an SCEP as it is upgraded
56       * from HTTP to proxy connect or websocket.
57       */
58      private volatile AsyncConnection _connection;
59  
60      /** true if a thread has been dispatched to handle this endpoint */
61      private boolean _dispatched = false;
62  
63      /** true if a non IO dispatch (eg async resume) is outstanding */
64      private boolean _asyncDispatch = false;
65  
66      /** true if the last write operation succeed and wrote all offered bytes */
67      private volatile boolean _writable = true;
68  
69  
70      /** True if a thread has is blocked in {@link #blockReadable(long)} */
71      private boolean _readBlocked;
72  
73      /** True if a thread has is blocked in {@link #blockWritable(long)} */
74      private boolean _writeBlocked;
75  
76      /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
77      private boolean _open;
78  
79      private volatile long _idleTimestamp;
80  
81      private boolean _ishut;
82  
83      /* ------------------------------------------------------------ */
84      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
85          throws IOException
86      {
87          super(channel, maxIdleTime);
88  
89          _manager = selectSet.getManager();
90          _selectSet = selectSet;
91          _dispatched = false;
92          _asyncDispatch = false;
93          _open=true;
94          _key = key;
95  
96          setCheckForIdle(true);
97      }
98  
99      /* ------------------------------------------------------------ */
100     public SelectionKey getSelectionKey()
101     {
102         synchronized (this)
103         {
104             return _key;
105         }
106     }
107 
108     /* ------------------------------------------------------------ */
109     public SelectorManager getSelectManager()
110     {
111         return _manager;
112     }
113 
114     /* ------------------------------------------------------------ */
115     public Connection getConnection()
116     {
117         return _connection;
118     }
119 
120     /* ------------------------------------------------------------ */
121     public void setConnection(Connection connection)
122     {
123         Connection old=_connection;
124         _connection=(AsyncConnection)connection;
125         if (old!=null && old!=_connection)
126             _manager.endPointUpgraded(this,old);
127     }
128 
129     /* ------------------------------------------------------------ */
130     public long getIdleTimestamp()
131     {
132         return _idleTimestamp;
133     }
134 
135     /* ------------------------------------------------------------ */
136     /** Called by selectSet to schedule handling
137      *
138      */
139     public void schedule()
140     {
141         synchronized (this)
142         {
143             // If there is no key, then do nothing
144             if (_key == null || !_key.isValid())
145             {
146                 _readBlocked=false;
147                 _writeBlocked=false;
148                 this.notifyAll();
149                 return;
150             }
151 
152             // If there are threads dispatched reading and writing
153             if (_readBlocked || _writeBlocked)
154             {
155                 // assert _dispatched;
156                 if (_readBlocked && _key.isReadable())
157                     _readBlocked=false;
158                 if (_writeBlocked && _key.isWritable())
159                     _writeBlocked=false;
160 
161                 // wake them up is as good as a dispatched.
162                 this.notifyAll();
163 
164                 // we are not interested in further selecting
165                 _key.interestOps(0);
166                 if (!_dispatched)
167                     updateKey();
168                 return;
169             }
170 
171             // Remove writeable op
172             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
173             {
174                 // Remove writeable op
175                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
176                 _key.interestOps(_interestOps);
177                 _writable = true; // Once writable is in ops, only removed with dispatch.
178             }
179 
180             // If dispatched, then deregister interest
181             if (_dispatched)
182                 _key.interestOps(0);
183             else
184             {
185                 // other wise do the dispatch
186                 dispatch();
187                 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
188                 {
189                     _key.interestOps(0);
190                 }
191             }
192         }
193     }
194 
195     /* ------------------------------------------------------------ */
196     public void asyncDispatch()
197     {
198         synchronized(this)
199         {
200             if (_dispatched)
201                 _asyncDispatch=true;
202             else
203                 dispatch();
204         }
205     }
206 
207     /* ------------------------------------------------------------ */
208     public void dispatch()
209     {
210         synchronized(this)
211         {
212             if (!_dispatched)
213             {
214                 _dispatched = true;
215                 boolean dispatched = _manager.dispatch(_handler);
216                 if(!dispatched)
217                 {
218                     _dispatched = false;
219                     LOG.warn("Dispatched Failed! "+this+" to "+_manager);
220                     updateKey();
221                 }
222             }
223         }
224     }
225 
226     /* ------------------------------------------------------------ */
227     /**
228      * Called when a dispatched thread is no longer handling the endpoint.
229      * The selection key operations are updated.
230      * @return If false is returned, the endpoint has been redispatched and
231      * thread must keep handling the endpoint.
232      */
233     protected boolean undispatch()
234     {
235         synchronized (this)
236         {
237             if (_asyncDispatch)
238             {
239                 _asyncDispatch=false;
240                 return false;
241             }
242             _dispatched = false;
243             updateKey();
244         }
245         return true;
246     }
247 
248     /* ------------------------------------------------------------ */
249     public void cancelTimeout(Task task)
250     {
251         getSelectSet().cancelTimeout(task);
252     }
253 
254     /* ------------------------------------------------------------ */
255     public void scheduleTimeout(Task task, long timeoutMs)
256     {
257         getSelectSet().scheduleTimeout(task,timeoutMs);
258     }
259 
260     /* ------------------------------------------------------------ */
261     public void setCheckForIdle(boolean check)
262     {
263         _idleTimestamp=check?System.currentTimeMillis():0;
264     }
265 
266     /* ------------------------------------------------------------ */
267     public boolean isCheckForIdle()
268     {
269         return _idleTimestamp!=0;
270     }
271 
272     /* ------------------------------------------------------------ */
273     protected void notIdle()
274     {
275         if (_idleTimestamp!=0)
276             _idleTimestamp=System.currentTimeMillis();
277     }
278 
279     /* ------------------------------------------------------------ */
280     public void checkIdleTimestamp(long now)
281     {
282         long idleTimestamp=_idleTimestamp;
283 
284         if (idleTimestamp!=0 && _maxIdleTime>0)
285         {
286             final long idleForMs=now-idleTimestamp;
287 
288             if (idleForMs>_maxIdleTime)
289             {
290                 // Don't idle out again until onIdleExpired task completes.
291                 setCheckForIdle(false);
292                 _manager.dispatch(new Runnable()
293                 {
294                     public void run()
295                     {
296                         try
297                         {
298                             onIdleExpired(idleForMs);
299                         }
300                         finally
301                         {
302                             setCheckForIdle(true);
303                         }
304                     }
305                 });
306             }
307         }
308     }
309 
310     /* ------------------------------------------------------------ */
311     public void onIdleExpired(long idleForMs)
312     {
313         _connection.onIdleExpired(idleForMs);
314     }
315 
316     /* ------------------------------------------------------------ */
317     @Override
318     public int fill(Buffer buffer) throws IOException
319     {
320         int fill=super.fill(buffer);
321         if (fill>0)
322             notIdle();
323         return fill;
324     }
325 
326     /* ------------------------------------------------------------ */
327     @Override
328     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
329     {
330         int l = super.flush(header, buffer, trailer);
331 
332         // If there was something to write and it wasn't written, then we are not writable.
333         if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
334         {
335             synchronized (this)
336             {
337                 if (_dispatched)
338                     _writable=false;
339             }
340         }
341         else if (l>0)
342         {
343             _writable=true;
344             notIdle();
345         }
346         return l;
347     }
348 
349     /* ------------------------------------------------------------ */
350     /*
351      */
352     @Override
353     public int flush(Buffer buffer) throws IOException
354     {
355         int l = super.flush(buffer);
356 
357         // If there was something to write and it wasn't written, then we are not writable.
358         if (l==0 && buffer!=null && buffer.hasContent())
359         {
360             synchronized (this)
361             {
362                 if (_dispatched)
363                     _writable=false;
364             }
365         }
366         else if (l>0)
367         {
368             _writable=true;
369             notIdle();
370         }
371 
372         return l;
373     }
374 
375     /* ------------------------------------------------------------ */
376     /*
377      * Allows thread to block waiting for further events.
378      */
379     @Override
380     public boolean blockReadable(long timeoutMs) throws IOException
381     {
382         synchronized (this)
383         {
384             if (isInputShutdown())
385                 throw new EofException();
386 
387             long now=_selectSet.getNow();
388             long end=now+timeoutMs;
389             boolean check=isCheckForIdle();
390             setCheckForIdle(true);
391             try
392             {
393                 _readBlocked=true;
394                 while (!isInputShutdown() && _readBlocked)
395                 {
396                     try
397                     {
398                         updateKey();
399                         this.wait(timeoutMs>0?(end-now):10000);
400                     }
401                     catch (InterruptedException e)
402                     {
403                         LOG.warn(e);
404                     }
405                     finally
406                     {
407                         now=_selectSet.getNow();
408                     }
409 
410                     if (_readBlocked && timeoutMs>0 && now>=end)
411                         return false;
412                 }
413             }
414             finally
415             {
416                 _readBlocked=false;
417                 setCheckForIdle(check);
418             }
419         }
420         return true;
421     }
422 
423     /* ------------------------------------------------------------ */
424     /*
425      * Allows thread to block waiting for further events.
426      */
427     @Override
428     public boolean blockWritable(long timeoutMs) throws IOException
429     {
430         synchronized (this)
431         {
432             if (isOutputShutdown())
433                 throw new EofException();
434 
435             long now=_selectSet.getNow();
436             long end=now+timeoutMs;
437             boolean check=isCheckForIdle();
438             setCheckForIdle(true);
439             try
440             {
441                 _writeBlocked=true;
442                 while (_writeBlocked && !isOutputShutdown())
443                 {
444                     try
445                     {
446                         updateKey();
447                         this.wait(timeoutMs>0?(end-now):10000);
448                     }
449                     catch (InterruptedException e)
450                     {
451                         LOG.warn(e);
452                     }
453                     finally
454                     {
455                         now=_selectSet.getNow();
456                     }
457                     if (_writeBlocked && timeoutMs>0 && now>=end)
458                         return false;
459                 }
460             }
461             finally
462             {
463                 _writeBlocked=false;
464                 setCheckForIdle(check);
465             }
466         }
467         return true;
468     }
469 
470     /* ------------------------------------------------------------ */
471     /**
472      * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite()
473      */
474     public void scheduleWrite()
475     {
476         if (_writable)
477             LOG.debug("Required scheduleWrite {}",this);
478 
479         _writable=false;
480         updateKey();
481     }
482 
483     /* ------------------------------------------------------------ */
484     public boolean isWritable()
485     {
486         return _writable;
487     }
488 
489     /* ------------------------------------------------------------ */
490     public boolean hasProgressed()
491     {
492         return false;
493     }
494 
495     /* ------------------------------------------------------------ */
496     /**
497      * Updates selection key. Adds operations types to the selection key as needed. No operations
498      * are removed as this is only done during dispatch. This method records the new key and
499      * schedules a call to doUpdateKey to do the keyChange
500      */
501     private void updateKey()
502     {
503         final boolean changed;
504         synchronized (this)
505         {
506             int current_ops=-1;
507             if (getChannel().isOpen())
508             {
509                 boolean read_interest = _readBlocked || (!_dispatched && !_connection.isSuspended());
510                 boolean write_interest= _writeBlocked || (!_dispatched && !_writable);
511 
512                 _interestOps =
513                     ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ  : 0)
514                 |   ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
515                 try
516                 {
517                     current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
518                 }
519                 catch(Exception e)
520                 {
521                     _key=null;
522                     LOG.ignore(e);
523                 }
524             }
525             changed=_interestOps!=current_ops;
526         }
527 
528         if(changed)
529         {
530             _selectSet.addChange(this);
531             _selectSet.wakeup();
532         }
533     }
534 
535 
536     /* ------------------------------------------------------------ */
537     /**
538      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
539      */
540     void doUpdateKey()
541     {
542         synchronized (this)
543         {
544             if (getChannel().isOpen())
545             {
546                 if (_interestOps>0)
547                 {
548                     if (_key==null || !_key.isValid())
549                     {
550                         SelectableChannel sc = (SelectableChannel)getChannel();
551                         if (sc.isRegistered())
552                         {
553                             updateKey();
554                         }
555                         else
556                         {
557                             try
558                             {
559                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
560                             }
561                             catch (Exception e)
562                             {
563                                 LOG.ignore(e);
564                                 if (_key!=null && _key.isValid())
565                                 {
566                                     _key.cancel();
567                                 }
568 
569                                 if (_open)
570                                 {
571                                     _selectSet.destroyEndPoint(this);
572                                 }
573                                 _open=false;
574                                 _key = null;
575                             }
576                         }
577                     }
578                     else
579                     {
580                         _key.interestOps(_interestOps);
581                     }
582                 }
583                 else
584                 {
585                     if (_key!=null && _key.isValid())
586                         _key.interestOps(0);
587                     else
588                         _key=null;
589                 }
590             }
591             else
592             {
593                 if (_key!=null && _key.isValid())
594                     _key.cancel();
595 
596                 if (_open)
597                 {
598                     _open=false;
599                     _selectSet.destroyEndPoint(this);
600                 }
601                 _key = null;
602             }
603         }
604     }
605 
606     /* ------------------------------------------------------------ */
607     /*
608      */
609     protected void handle()
610     {
611         boolean dispatched=true;
612         try
613         {
614             while(dispatched)
615             {
616                 try
617                 {
618                     while(true)
619                     {
620                         final AsyncConnection next = (AsyncConnection)_connection.handle();
621                         if (next!=_connection)
622                         {
623                             LOG.debug("{} replaced {}",next,_connection);
624                             Connection old=_connection;
625                             _connection=next;
626                             _manager.endPointUpgraded(this,old);
627                             continue;
628                         }
629                         break;
630                     }
631                 }
632                 catch (ClosedChannelException e)
633                 {
634                     LOG.ignore(e);
635                 }
636                 catch (EofException e)
637                 {
638                     LOG.debug("EOF", e);
639                     try{close();}
640                     catch(IOException e2){LOG.ignore(e2);}
641                 }
642                 catch (IOException e)
643                 {
644                     LOG.warn(e.toString());
645                     try{close();}
646                     catch(IOException e2){LOG.ignore(e2);}
647                 }
648                 catch (Throwable e)
649                 {
650                     LOG.warn("handle failed", e);
651                     try{close();}
652                     catch(IOException e2){LOG.ignore(e2);}
653                 }
654                 finally
655                 {
656                     if (!_ishut && isInputShutdown() && isOpen())
657                     {
658                         _ishut=true;
659                         try
660                         {
661                             _connection.onInputShutdown();
662                         }
663                         catch(Throwable x)
664                         {
665                             LOG.warn("onInputShutdown failed", x);
666                             try{close();}
667                             catch(IOException e2){LOG.ignore(e2);}
668                         }
669                         finally
670                         {
671                             updateKey();
672                         }
673                     }
674                     dispatched=!undispatch();
675                 }
676             }
677         }
678         finally
679         {
680             if (dispatched)
681             {
682                 dispatched=!undispatch();
683                 while (dispatched)
684                 {
685                     LOG.warn("SCEP.run() finally DISPATCHED");
686                     dispatched=!undispatch();
687                 }
688             }
689         }
690     }
691 
692     /* ------------------------------------------------------------ */
693     /*
694      * @see org.eclipse.io.nio.ChannelEndPoint#close()
695      */
696     @Override
697     public void close() throws IOException
698     {
699         // On unix systems there is a JVM issue that if you cancel before closing, it can 
700         // cause the selector to block waiting for a channel to close and that channel can 
701         // block waiting for the remote end.  But on windows, if you don't cancel before a 
702         // close, then the selector can block anyway!
703         // https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318
704         if (WORK_AROUND_JVM_BUG_6346658)
705         {
706             try
707             {
708                 SelectionKey key = _key;
709                 if (key!=null)
710                     key.cancel();
711             }
712             catch (Throwable e)
713             {
714                 LOG.ignore(e);
715             }
716         }
717 
718         try
719         {
720             super.close();
721         }
722         catch (IOException e)
723         {
724             LOG.ignore(e);
725         }
726         finally
727         {
728             updateKey();
729         }
730     }
731 
732     /* ------------------------------------------------------------ */
733     @Override
734     public String toString()
735     {
736         // Do NOT use synchronized (this)
737         // because it's very easy to deadlock when debugging is enabled.
738         // We do a best effort to print the right toString() and that's it.
739         SelectionKey key = _key;
740         String keyString = "";
741         if (key != null)
742         {
743             if (key.isValid())
744             {
745                 if (key.isReadable())
746                     keyString += "r";
747                 if (key.isWritable())
748                     keyString += "w";
749             }
750             else
751             {
752                 keyString += "!";
753             }
754         }
755         else
756         {
757             keyString += "-";
758         }
759         return String.format("SCEP@%x{l(%s)<->r(%s),d=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
760                 hashCode(),
761                 _socket.getRemoteSocketAddress(),
762                 _socket.getLocalSocketAddress(),
763                 _dispatched,
764                 isOpen(),
765                 isInputShutdown(),
766                 isOutputShutdown(),
767                 _readBlocked,
768                 _writeBlocked,
769                 _writable,
770                 _interestOps,
771                 keyString,
772                 _connection);
773     }
774 
775     /* ------------------------------------------------------------ */
776     public SelectSet getSelectSet()
777     {
778         return _selectSet;
779     }
780 
781     /* ------------------------------------------------------------ */
782     /**
783      * Don't set the SoTimeout
784      * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
785      */
786     @Override
787     public void setMaxIdleTime(int timeMs) throws IOException
788     {
789         _maxIdleTime=timeMs;
790     }
791 
792 }