View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io.nio;
20  
21  import java.io.IOException;
22  import java.nio.channels.ClosedChannelException;
23  import java.nio.channels.SelectableChannel;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.SocketChannel;
26  import java.util.Locale;
27  
28  import org.eclipse.jetty.io.AsyncEndPoint;
29  import org.eclipse.jetty.io.Buffer;
30  import org.eclipse.jetty.io.ConnectedEndPoint;
31  import org.eclipse.jetty.io.Connection;
32  import org.eclipse.jetty.io.EofException;
33  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
34  import org.eclipse.jetty.util.log.Log;
35  import org.eclipse.jetty.util.log.Logger;
36  import org.eclipse.jetty.util.thread.Timeout.Task;
37  
38  /* ------------------------------------------------------------ */
39  /**
40   * An Endpoint that can be scheduled by {@link SelectorManager}.
41   */
42  public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
43  {
44      public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
45  
46      private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
47      private final SelectorManager.SelectSet _selectSet;
48      private final SelectorManager _manager;
49      private  SelectionKey _key;
50      private final Runnable _handler = new Runnable()
51          {
52              public void run() { handle(); }
53          };
54  
55      /** The desired value for {@link SelectionKey#interestOps()} */
56      private int _interestOps;
57  
58      /**
59       * The connection instance is the handler for any IO activity on the endpoint.
60       * There is a different type of connection for HTTP, AJP, WebSocket and
61       * ProxyConnect.   The connection may change for an SCEP as it is upgraded
62       * from HTTP to proxy connect or websocket.
63       */
64      private volatile AsyncConnection _connection;
65  
66      private static final int STATE_NEEDS_DISPATCH=-1;
67      private static final int STATE_UNDISPATCHED=0;
68      private static final int STATE_DISPATCHED=1;
69      private static final int STATE_ASYNC=2;
70      private int _state;
71      
72      private boolean _onIdle;
73  
74      /** true if the last write operation succeed and wrote all offered bytes */
75      private volatile boolean _writable = true;
76  
77  
78      /** True if a thread has is blocked in {@link #blockReadable(long)} */
79      private boolean _readBlocked;
80  
81      /** True if a thread has is blocked in {@link #blockWritable(long)} */
82      private boolean _writeBlocked;
83  
84      /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
85      private boolean _open;
86  
87      private volatile long _idleTimestamp;
88      private volatile boolean _checkIdle;
89  
90      private boolean _ishut;
91  
92      /* ------------------------------------------------------------ */
93      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
94          throws IOException
95      {
96          super(channel, maxIdleTime);
97  
98          _manager = selectSet.getManager();
99          _selectSet = selectSet;
100         _state=STATE_UNDISPATCHED;
101         _onIdle=false;
102         _open=true;
103         _key = key;
104 
105         setCheckForIdle(true);
106     }
107 
108     /* ------------------------------------------------------------ */
109     public SelectionKey getSelectionKey()
110     {
111         synchronized (this)
112         {
113             return _key;
114         }
115     }
116 
117     /* ------------------------------------------------------------ */
118     public SelectorManager getSelectManager()
119     {
120         return _manager;
121     }
122 
123     /* ------------------------------------------------------------ */
124     public Connection getConnection()
125     {
126         return _connection;
127     }
128 
129     /* ------------------------------------------------------------ */
130     public void setConnection(Connection connection)
131     {
132         Connection old=_connection;
133         _connection=(AsyncConnection)connection;
134         if (old!=null && old!=_connection)
135             _manager.endPointUpgraded(this,old);
136     }
137 
138     /* ------------------------------------------------------------ */
139     public long getIdleTimestamp()
140     {
141         return _idleTimestamp;
142     }
143 
144     /* ------------------------------------------------------------ */
145     /** Called by selectSet to schedule handling
146      *
147      */
148     public void schedule()
149     {
150         synchronized (this)
151         {
152             // If there is no key, then do nothing
153             if (_key == null || !_key.isValid())
154             {
155                 _readBlocked=false;
156                 _writeBlocked=false;
157                 this.notifyAll();
158                 return;
159             }
160 
161             // If there are threads dispatched reading and writing
162             if (_readBlocked || _writeBlocked)
163             {
164                 // assert _dispatched;
165                 if (_readBlocked && _key.isReadable())
166                     _readBlocked=false;
167                 if (_writeBlocked && _key.isWritable())
168                     _writeBlocked=false;
169 
170                 // wake them up is as good as a dispatched.
171                 this.notifyAll();
172 
173                 // we are not interested in further selecting
174                 _key.interestOps(0);
175                 if (_state<STATE_DISPATCHED)
176                     updateKey();
177                 return;
178             }
179 
180             // Remove writeable op
181             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
182             {
183                 // Remove writeable op
184                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
185                 _key.interestOps(_interestOps);
186                 _writable = true; // Once writable is in ops, only removed with dispatch.
187             }
188 
189             // If dispatched, then deregister interest
190             if (_state>=STATE_DISPATCHED)
191                 _key.interestOps(0);
192             else
193             {
194                 // other wise do the dispatch
195                 dispatch();
196                 if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0())
197                 {
198                     _key.interestOps(0);
199                 }
200             }
201         }
202     }
203 
204     /* ------------------------------------------------------------ */
205     public void asyncDispatch()
206     {
207         synchronized(this)
208         {
209             switch(_state)
210             {
211                 case STATE_NEEDS_DISPATCH:
212                 case STATE_UNDISPATCHED:
213                     dispatch();
214                     break;
215                     
216                 case STATE_DISPATCHED:
217                 case STATE_ASYNC:
218                     _state=STATE_ASYNC;
219                     break;
220             }
221         }
222     }
223 
224     /* ------------------------------------------------------------ */
225     public void dispatch()
226     {
227         synchronized(this)
228         {
229             if (_state<=STATE_UNDISPATCHED)
230             {
231                 if (_onIdle)
232                     _state = STATE_NEEDS_DISPATCH;
233                 else
234                 {
235                     _state = STATE_DISPATCHED;
236                     boolean dispatched = _manager.dispatch(_handler);
237                     if(!dispatched)
238                     {
239                         _state = STATE_NEEDS_DISPATCH;
240                         LOG.warn("Dispatched Failed! "+this+" to "+_manager);
241                         updateKey();
242                     }
243                 }
244             }
245         }
246     }
247 
248     /* ------------------------------------------------------------ */
249     /**
250      * Called when a dispatched thread is no longer handling the endpoint.
251      * The selection key operations are updated.
252      * @return If false is returned, the endpoint has been redispatched and
253      * thread must keep handling the endpoint.
254      */
255     protected boolean undispatch()
256     {
257         synchronized (this)
258         {
259             switch(_state)
260             {
261                 case STATE_ASYNC:
262                     _state=STATE_DISPATCHED;
263                     return false;
264 
265                 default:
266                     _state=STATE_UNDISPATCHED;
267                     updateKey();
268                     return true;
269             }
270         }
271     }
272 
273     /* ------------------------------------------------------------ */
274     public void cancelTimeout(Task task)
275     {
276         getSelectSet().cancelTimeout(task);
277     }
278 
279     /* ------------------------------------------------------------ */
280     public void scheduleTimeout(Task task, long timeoutMs)
281     {
282         getSelectSet().scheduleTimeout(task,timeoutMs);
283     }
284 
285     /* ------------------------------------------------------------ */
286     public void setCheckForIdle(boolean check)
287     {
288         if (check)
289         {
290             _idleTimestamp=System.currentTimeMillis();
291             _checkIdle=true;
292         }
293         else
294             _checkIdle=false;
295     }
296 
297     /* ------------------------------------------------------------ */
298     public boolean isCheckForIdle()
299     {
300         return _checkIdle;
301     }
302 
303     /* ------------------------------------------------------------ */
304     protected void notIdle()
305     {
306         _idleTimestamp=System.currentTimeMillis();
307     }
308 
309     /* ------------------------------------------------------------ */
310     public void checkIdleTimestamp(long now)
311     {
312         if (isCheckForIdle() && _maxIdleTime>0)
313         {
314             final long idleForMs=now-_idleTimestamp;
315 
316             if (idleForMs>_maxIdleTime)
317             {
318                 // Don't idle out again until onIdleExpired task completes.
319                 setCheckForIdle(false);
320                 _manager.dispatch(new Runnable()
321                 {
322                     public void run()
323                     {
324                         try
325                         {
326                             onIdleExpired(idleForMs);
327                         }
328                         finally
329                         {
330                             setCheckForIdle(true);
331                         }
332                     }
333                 });
334             }
335         }
336     }
337 
338     /* ------------------------------------------------------------ */
339     public void onIdleExpired(long idleForMs)
340     {
341         try
342         {
343             synchronized (this)
344             {
345                 _onIdle=true;
346             }
347 
348             if (_maxIdleTime>0 && (System.currentTimeMillis()-_idleTimestamp)>_maxIdleTime)
349                 _connection.onIdleExpired(idleForMs);
350         }
351         finally
352         {
353             synchronized (this)
354             {
355                 _onIdle=false;
356                 if (_state==STATE_NEEDS_DISPATCH)
357                     dispatch();
358             }
359         }
360     }
361 
362     /* ------------------------------------------------------------ */
363     @Override
364     public int fill(Buffer buffer) throws IOException
365     {
366         int fill=super.fill(buffer);
367         if (fill>0)
368             notIdle();
369         return fill;
370     }
371 
372     /* ------------------------------------------------------------ */
373     @Override
374     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
375     {
376         int l = super.flush(header, buffer, trailer);
377 
378         // If there was something to write and it wasn't written, then we are not writable.
379         if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
380         {
381             synchronized (this)
382             {   
383                 _writable=false;
384                 if (_state<STATE_DISPATCHED)
385                     updateKey();
386             }
387         }
388         else if (l>0)
389         {
390             _writable=true;
391             notIdle();
392         }
393         return l;
394     }
395 
396     /* ------------------------------------------------------------ */
397     /*
398      */
399     @Override
400     public int flush(Buffer buffer) throws IOException
401     {
402         int l = super.flush(buffer);
403 
404         // If there was something to write and it wasn't written, then we are not writable.
405         if (l==0 && buffer!=null && buffer.hasContent())
406         {
407             synchronized (this)
408             {   
409                 _writable=false;
410                 if (_state<STATE_DISPATCHED)
411                     updateKey();
412             }
413         }
414         else if (l>0)
415         {
416             _writable=true;
417             notIdle();
418         }
419 
420         return l;
421     }
422 
423     /* ------------------------------------------------------------ */
424     /*
425      * Allows thread to block waiting for further events.
426      */
427     @Override
428     public boolean blockReadable(long timeoutMs) throws IOException
429     {
430         synchronized (this)
431         {
432             if (isInputShutdown())
433                 throw new EofException();
434 
435             long now=_selectSet.getNow();
436             long end=now+timeoutMs;
437             boolean check=isCheckForIdle();
438             setCheckForIdle(true);
439             try
440             {
441                 _readBlocked=true;
442                 while (!isInputShutdown() && _readBlocked)
443                 {
444                     try
445                     {
446                         updateKey();
447                         this.wait(timeoutMs>0?(end-now):10000);
448                     }
449                     catch (InterruptedException e)
450                     {
451                         LOG.warn(e);
452                     }
453                     finally
454                     {
455                         now=_selectSet.getNow();
456                     }
457 
458                     if (_readBlocked && timeoutMs>0 && now>=end)
459                         return false;
460                 }
461             }
462             finally
463             {
464                 _readBlocked=false;
465                 setCheckForIdle(check);
466             }
467         }
468         return true;
469     }
470 
471     /* ------------------------------------------------------------ */
472     /*
473      * Allows thread to block waiting for further events.
474      */
475     @Override
476     public boolean blockWritable(long timeoutMs) throws IOException
477     {
478         synchronized (this)
479         {
480             if (isOutputShutdown())
481                 throw new EofException();
482 
483             long now=_selectSet.getNow();
484             long end=now+timeoutMs;
485             boolean check=isCheckForIdle();
486             setCheckForIdle(true);
487             try
488             {
489                 _writeBlocked=true;
490                 while (_writeBlocked && !isOutputShutdown())
491                 {
492                     try
493                     {
494                         updateKey();
495                         this.wait(timeoutMs>0?(end-now):10000);
496                     }
497                     catch (InterruptedException e)
498                     {
499                         LOG.warn(e);
500                     }
501                     finally
502                     {
503                         now=_selectSet.getNow();
504                     }
505                     if (_writeBlocked && timeoutMs>0 && now>=end)
506                         return false;
507                 }
508             }
509             finally
510             {
511                 _writeBlocked=false;
512                 setCheckForIdle(check);
513             }
514         }
515         return true;
516     }
517 
518     /* ------------------------------------------------------------ */
519     /**
520      * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite()
521      */
522     public void scheduleWrite()
523     {
524         if (_writable)
525             LOG.debug("Required scheduleWrite {}",this);
526 
527         _writable=false;
528         updateKey();
529     }
530 
531     /* ------------------------------------------------------------ */
532     public boolean isWritable()
533     {
534         return _writable;
535     }
536 
537     /* ------------------------------------------------------------ */
538     public boolean hasProgressed()
539     {
540         return false;
541     }
542 
543     /* ------------------------------------------------------------ */
544     /**
545      * Updates selection key. Adds operations types to the selection key as needed. No operations
546      * are removed as this is only done during dispatch. This method records the new key and
547      * schedules a call to doUpdateKey to do the keyChange
548      */
549     private void updateKey()
550     {
551         final boolean changed;
552         synchronized (this)
553         {
554             int current_ops=-1;
555             if (getChannel().isOpen())
556             {
557                 boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended());
558                 boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable);
559 
560                 _interestOps =
561                     ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ  : 0)
562                 |   ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
563                 try
564                 {
565                     current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
566                 }
567                 catch(Exception e)
568                 {
569                     _key=null;
570                     LOG.ignore(e);
571                 }
572             }
573             changed=_interestOps!=current_ops;
574         }
575 
576         if(changed)
577         {
578             _selectSet.addChange(this);
579             _selectSet.wakeup();
580         }
581     }
582 
583 
584     /* ------------------------------------------------------------ */
585     /**
586      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
587      */
588     void doUpdateKey()
589     {
590         synchronized (this)
591         {
592             if (getChannel().isOpen())
593             {
594                 if (_interestOps>0)
595                 {
596                     if (_key==null || !_key.isValid())
597                     {
598                         SelectableChannel sc = (SelectableChannel)getChannel();
599                         if (sc.isRegistered())
600                         {
601                             updateKey();
602                         }
603                         else
604                         {
605                             try
606                             {
607                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
608                             }
609                             catch (Exception e)
610                             {
611                                 LOG.ignore(e);
612                                 if (_key!=null && _key.isValid())
613                                 {
614                                     _key.cancel();
615                                 }
616 
617                                 if (_open)
618                                 {
619                                     _selectSet.destroyEndPoint(this);
620                                 }
621                                 _open=false;
622                                 _key = null;
623                             }
624                         }
625                     }
626                     else
627                     {
628                         _key.interestOps(_interestOps);
629                     }
630                 }
631                 else
632                 {
633                     if (_key!=null && _key.isValid())
634                         _key.interestOps(0);
635                     else
636                         _key=null;
637                 }
638             }
639             else
640             {
641                 if (_key!=null && _key.isValid())
642                     _key.cancel();
643 
644                 if (_open)
645                 {
646                     _open=false;
647                     _selectSet.destroyEndPoint(this);
648                 }
649                 _key = null;
650             }
651         }
652     }
653 
654     /* ------------------------------------------------------------ */
655     /*
656      */
657     protected void handle()
658     {
659         boolean dispatched=true;
660         try
661         {
662             while(dispatched)
663             {
664                 try
665                 {
666                     while(true)
667                     {
668                         final AsyncConnection next = (AsyncConnection)_connection.handle();
669                         if (next!=_connection)
670                         {
671                             LOG.debug("{} replaced {}",next,_connection);
672                             Connection old=_connection;
673                             _connection=next;
674                             _manager.endPointUpgraded(this,old);
675                             continue;
676                         }
677                         break;
678                     }
679                 }
680                 catch (ClosedChannelException e)
681                 {
682                     LOG.ignore(e);
683                 }
684                 catch (EofException e)
685                 {
686                     LOG.debug("EOF", e);
687                     try{close();}
688                     catch(IOException e2){LOG.ignore(e2);}
689                 }
690                 catch (IOException e)
691                 {
692                     LOG.warn(e.toString());
693                     try{close();}
694                     catch(IOException e2){LOG.ignore(e2);}
695                 }
696                 catch (Throwable e)
697                 {
698                     LOG.warn("handle failed", e);
699                     try{close();}
700                     catch(IOException e2){LOG.ignore(e2);}
701                 }
702                 finally
703                 {
704                     if (!_ishut && isInputShutdown() && isOpen())
705                     {
706                         _ishut=true;
707                         try
708                         {
709                             _connection.onInputShutdown();
710                         }
711                         catch(Throwable x)
712                         {
713                             LOG.warn("onInputShutdown failed", x);
714                             try{close();}
715                             catch(IOException e2){LOG.ignore(e2);}
716                         }
717                         finally
718                         {
719                             updateKey();
720                         }
721                     }
722                     dispatched=!undispatch();
723                 }
724             }
725         }
726         finally
727         {
728             if (dispatched)
729             {
730                 dispatched=!undispatch();
731                 while (dispatched)
732                 {
733                     LOG.warn("SCEP.run() finally DISPATCHED");
734                     dispatched=!undispatch();
735                 }
736             }
737         }
738     }
739 
740     /* ------------------------------------------------------------ */
741     /*
742      * @see org.eclipse.io.nio.ChannelEndPoint#close()
743      */
744     @Override
745     public void close() throws IOException
746     {
747         // On unix systems there is a JVM issue that if you cancel before closing, it can 
748         // cause the selector to block waiting for a channel to close and that channel can 
749         // block waiting for the remote end.  But on windows, if you don't cancel before a 
750         // close, then the selector can block anyway!
751         // https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318
752         if (WORK_AROUND_JVM_BUG_6346658)
753         {
754             try
755             {
756                 SelectionKey key = _key;
757                 if (key!=null)
758                     key.cancel();
759             }
760             catch (Throwable e)
761             {
762                 LOG.ignore(e);
763             }
764         }
765 
766         try
767         {
768             super.close();
769         }
770         catch (IOException e)
771         {
772             LOG.ignore(e);
773         }
774         finally
775         {
776             updateKey();
777         }
778     }
779 
780     /* ------------------------------------------------------------ */
781     @Override
782     public String toString()
783     {
784         // Do NOT use synchronized (this)
785         // because it's very easy to deadlock when debugging is enabled.
786         // We do a best effort to print the right toString() and that's it.
787         SelectionKey key = _key;
788         String keyString = "";
789         if (key != null)
790         {
791             if (key.isValid())
792             {
793                 if (key.isReadable())
794                     keyString += "r";
795                 if (key.isWritable())
796                     keyString += "w";
797             }
798             else
799             {
800                 keyString += "!";
801             }
802         }
803         else
804         {
805             keyString += "-";
806         }
807         return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
808                 hashCode(),
809                 _socket.getRemoteSocketAddress(),
810                 _socket.getLocalSocketAddress(),
811                 _state,
812                 isOpen(),
813                 isInputShutdown(),
814                 isOutputShutdown(),
815                 _readBlocked,
816                 _writeBlocked,
817                 _writable,
818                 _interestOps,
819                 keyString,
820                 _connection);
821     }
822 
823     /* ------------------------------------------------------------ */
824     public SelectSet getSelectSet()
825     {
826         return _selectSet;
827     }
828 
829     /* ------------------------------------------------------------ */
830     /**
831      * Don't set the SoTimeout
832      * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
833      */
834     @Override
835     public void setMaxIdleTime(int timeMs) throws IOException
836     {
837         _maxIdleTime=timeMs;
838     }
839 
840 }