View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.ClosedChannelException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  
22  import org.eclipse.jetty.io.AsyncEndPoint;
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.ConnectedEndPoint;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  import org.eclipse.jetty.util.thread.Timeout.Task;
31  
32  /* ------------------------------------------------------------ */
33  /**
34   * An Endpoint that can be scheduled by {@link SelectorManager}.
35   */
36  public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
37  {
38      public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
39  
40      private final SelectorManager.SelectSet _selectSet;
41      private final SelectorManager _manager;
42      private  SelectionKey _key;
43      private final Runnable _handler = new Runnable()
44          {
45              public void run() { handle(); }
46          };
47  
48      /** The desired value for {@link SelectionKey#interestOps()} */
49      private int _interestOps;
50  
51      /**
52       * The connection instance is the handler for any IO activity on the endpoint.
53       * There is a different type of connection for HTTP, AJP, WebSocket and
54       * ProxyConnect.   The connection may change for an SCEP as it is upgraded
55       * from HTTP to proxy connect or websocket.
56       */
57      private volatile AsyncConnection _connection;
58  
59      /** true if a thread has been dispatched to handle this endpoint */
60      private boolean _dispatched = false;
61  
62      /** true if a non IO dispatch (eg async resume) is outstanding */
63      private boolean _asyncDispatch = false;
64  
65      /** true if the last write operation succeed and wrote all offered bytes */
66      private volatile boolean _writable = true;
67  
68  
69      /** True if a thread has is blocked in {@link #blockReadable(long)} */
70      private boolean _readBlocked;
71  
72      /** True if a thread has is blocked in {@link #blockWritable(long)} */
73      private boolean _writeBlocked;
74  
75      /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
76      private boolean _open;
77  
78      private volatile long _idleTimestamp;
79  
80      private boolean _ishut;
81  
82      /* ------------------------------------------------------------ */
83      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
84          throws IOException
85      {
86          super(channel, maxIdleTime);
87  
88          _manager = selectSet.getManager();
89          _selectSet = selectSet;
90          _dispatched = false;
91          _asyncDispatch = false;
92          _open=true;
93          _key = key;
94  
95          setCheckForIdle(true);
96      }
97  
98      /* ------------------------------------------------------------ */
99      public SelectionKey getSelectionKey()
100     {
101         synchronized (this)
102         {
103             return _key;
104         }
105     }
106 
107     /* ------------------------------------------------------------ */
108     public SelectorManager getSelectManager()
109     {
110         return _manager;
111     }
112 
113     /* ------------------------------------------------------------ */
114     public Connection getConnection()
115     {
116         return _connection;
117     }
118 
119     /* ------------------------------------------------------------ */
120     public void setConnection(Connection connection)
121     {
122         Connection old=_connection;
123         _connection=(AsyncConnection)connection;
124         if (old!=null && old!=_connection)
125             _manager.endPointUpgraded(this,old);
126     }
127 
128     /* ------------------------------------------------------------ */
129     public long getIdleTimestamp()
130     {
131         return _idleTimestamp;
132     }
133 
134     /* ------------------------------------------------------------ */
135     /** Called by selectSet to schedule handling
136      *
137      */
138     public void schedule()
139     {
140         synchronized (this)
141         {
142             // If there is no key, then do nothing
143             if (_key == null || !_key.isValid())
144             {
145                 _readBlocked=false;
146                 _writeBlocked=false;
147                 this.notifyAll();
148                 return;
149             }
150 
151             // If there are threads dispatched reading and writing
152             if (_readBlocked || _writeBlocked)
153             {
154                 // assert _dispatched;
155                 if (_readBlocked && _key.isReadable())
156                     _readBlocked=false;
157                 if (_writeBlocked && _key.isWritable())
158                     _writeBlocked=false;
159 
160                 // wake them up is as good as a dispatched.
161                 this.notifyAll();
162 
163                 // we are not interested in further selecting
164                 _key.interestOps(0);
165                 if (!_dispatched)
166                     updateKey();
167                 return;
168             }
169 
170             // Remove writeable op
171             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
172             {
173                 // Remove writeable op
174                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
175                 _key.interestOps(_interestOps);
176                 _writable = true; // Once writable is in ops, only removed with dispatch.
177             }
178 
179             // Dispatch if we are not already
180             if (!_dispatched)
181             {
182                 dispatch();
183                 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
184                 {
185                     _key.interestOps(0);
186                 }
187             }
188         }
189     }
190 
191     /* ------------------------------------------------------------ */
192     public void asyncDispatch()
193     {
194         synchronized(this)
195         {
196             if (_dispatched)
197                 _asyncDispatch=true;
198             else
199                 dispatch();
200         }
201     }
202 
203     /* ------------------------------------------------------------ */
204     public void dispatch()
205     {
206         synchronized(this)
207         {
208             if (_dispatched)
209             {
210                 throw new IllegalStateException("dispatched");
211             }
212             else
213             {
214                 _dispatched = true;
215                 boolean dispatched = _manager.dispatch(_handler);
216                 if(!dispatched)
217                 {
218                     _dispatched = false;
219                     LOG.warn("Dispatched Failed! "+this+" to "+_manager);
220                     updateKey();
221                 }
222             }
223         }
224     }
225 
226     /* ------------------------------------------------------------ */
227     /**
228      * Called when a dispatched thread is no longer handling the endpoint.
229      * The selection key operations are updated.
230      * @return If false is returned, the endpoint has been redispatched and
231      * thread must keep handling the endpoint.
232      */
233     protected boolean undispatch()
234     {
235         synchronized (this)
236         {
237             if (_asyncDispatch)
238             {
239                 _asyncDispatch=false;
240                 return false;
241             }
242             _dispatched = false;
243             updateKey();
244         }
245         return true;
246     }
247 
248     /* ------------------------------------------------------------ */
249     public void cancelTimeout(Task task)
250     {
251         getSelectSet().cancelTimeout(task);
252     }
253 
254     /* ------------------------------------------------------------ */
255     public void scheduleTimeout(Task task, long timeoutMs)
256     {
257         getSelectSet().scheduleTimeout(task,timeoutMs);
258     }
259 
260     /* ------------------------------------------------------------ */
261     public void setCheckForIdle(boolean check)
262     {
263         _idleTimestamp=check?System.currentTimeMillis():0;
264     }
265 
266     /* ------------------------------------------------------------ */
267     public boolean isCheckForIdle()
268     {
269         return _idleTimestamp!=0;
270     }
271 
272     /* ------------------------------------------------------------ */
273     protected void notIdle()
274     {
275         if (_idleTimestamp!=0)
276             _idleTimestamp=System.currentTimeMillis();
277     }
278 
279     /* ------------------------------------------------------------ */
280     public void checkIdleTimestamp(long now)
281     {
282         long idleTimestamp=_idleTimestamp;
283                 
284         if (idleTimestamp!=0 && _maxIdleTime>0)
285         {
286             long idleForMs=now-idleTimestamp;
287 
288             if (idleForMs>_maxIdleTime)
289             {
290                 onIdleExpired(idleForMs);
291                 _idleTimestamp=now;
292             }
293         }
294     }
295 
296     /* ------------------------------------------------------------ */
297     public void onIdleExpired(long idleForMs)
298     {
299         _connection.onIdleExpired(idleForMs);
300     }
301 
302     /* ------------------------------------------------------------ */
303     @Override
304     public int fill(Buffer buffer) throws IOException
305     {
306         int fill=super.fill(buffer);
307         if (fill>0)
308             notIdle();
309         return fill;
310     }
311 
312     /* ------------------------------------------------------------ */
313     @Override
314     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
315     {
316         int l = super.flush(header, buffer, trailer);
317 
318         // If there was something to write and it wasn't written, then we are not writable.
319         if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
320         {
321             synchronized (this)
322             {
323                 _writable=false;
324                 if (!_dispatched)
325                     updateKey();
326             }
327         }
328         else if (l>0)
329         {
330             _writable=true;
331             notIdle();
332         }
333         return l;
334     }
335 
336     /* ------------------------------------------------------------ */
337     /*
338      */
339     @Override
340     public int flush(Buffer buffer) throws IOException
341     {
342         int l = super.flush(buffer);
343 
344         // If there was something to write and it wasn't written, then we are not writable.
345         if (l==0 && buffer!=null && buffer.hasContent())
346         {
347             synchronized (this)
348             {
349                 _writable=false;
350                 if (!_dispatched)
351                     updateKey();
352             }
353         }
354         else if (l>0)
355         {
356             _writable=true;
357             notIdle();
358         }
359 
360         return l;
361     }
362 
363     /* ------------------------------------------------------------ */
364     /*
365      * Allows thread to block waiting for further events.
366      */
367     @Override
368     public boolean blockReadable(long timeoutMs) throws IOException
369     {
370         synchronized (this)
371         {
372             if (isInputShutdown())
373                 throw new EofException();
374 
375             long now=_selectSet.getNow();
376             long end=now+timeoutMs;
377             boolean check=isCheckForIdle();
378             setCheckForIdle(true);
379             try
380             {
381                 _readBlocked=true;
382                 while (!isInputShutdown() && _readBlocked)
383                 {
384                     try
385                     {
386                         updateKey();
387                         this.wait(timeoutMs>=0?(end-now):10000);
388                     }
389                     catch (InterruptedException e)
390                     {
391                         LOG.warn(e);
392                     }
393                     finally
394                     {
395                         now=_selectSet.getNow();
396                     }
397 
398                     if (_readBlocked && timeoutMs>0 && now>=end)
399                         return false;
400                 }
401             }
402             finally
403             {
404                 _readBlocked=false;
405                 setCheckForIdle(check);
406             }
407         }
408         return true;
409     }
410 
411     /* ------------------------------------------------------------ */
412     /*
413      * Allows thread to block waiting for further events.
414      */
415     @Override
416     public boolean blockWritable(long timeoutMs) throws IOException
417     {
418         synchronized (this)
419         {
420             if (isOutputShutdown())
421                 throw new EofException();
422 
423             long now=_selectSet.getNow();
424             long end=now+timeoutMs;
425             boolean check=isCheckForIdle();
426             setCheckForIdle(true);
427             try
428             {
429                 _writeBlocked=true;
430                 while (_writeBlocked && !isOutputShutdown())
431                 {
432                     try
433                     {
434                         updateKey();
435                         this.wait(timeoutMs>=0?(end-now):10000);
436                     }
437                     catch (InterruptedException e)
438                     {
439                         LOG.warn(e);
440                     }
441                     finally
442                     {
443                         now=_selectSet.getNow();
444                     }
445                     if (_writeBlocked && timeoutMs>0 && now>=end)
446                         return false;
447                 }
448             }
449             finally
450             {
451                 _writeBlocked=false;
452                 setCheckForIdle(check);
453             }
454         }
455         return true;
456     }
457 
458     /* ------------------------------------------------------------ */
459     /* short cut for busyselectChannelServerTest */
460     public void clearWritable()
461     {
462         _writable=false;
463     }
464 
465     /* ------------------------------------------------------------ */
466     /**
467      * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite()
468      */
469     public void scheduleWrite()
470     {
471         if (_writable==true)
472             LOG.debug("Required scheduleWrite {}",this);
473 
474         _writable=false;
475         updateKey();
476     }
477 
478     /* ------------------------------------------------------------ */
479     public boolean isWritable()
480     {
481         return _writable;
482     }
483 
484     /* ------------------------------------------------------------ */
485     public boolean hasProgressed()
486     {
487         return false;
488     }
489 
490     /* ------------------------------------------------------------ */
491     /**
492      * Updates selection key. Adds operations types to the selection key as needed. No operations
493      * are removed as this is only done during dispatch. This method records the new key and
494      * schedules a call to doUpdateKey to do the keyChange
495      */
496     private void updateKey()
497     {
498         final boolean changed;
499         synchronized (this)
500         {
501             int current_ops=-1;
502             if (getChannel().isOpen())
503             {
504                 boolean read_interest = _readBlocked || (!_dispatched && !_connection.isSuspended());
505                 boolean write_interest= _writeBlocked || (!_dispatched && !_writable);
506 
507                 _interestOps =
508                     ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ  : 0)
509                 |   ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
510                 try
511                 {
512                     current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
513                 }
514                 catch(Exception e)
515                 {
516                     _key=null;
517                     LOG.ignore(e);
518                 }
519             }
520             changed=_interestOps!=current_ops;
521         }
522 
523         if(changed)
524         {
525             _selectSet.addChange(this);
526             _selectSet.wakeup();
527         }
528     }
529 
530 
531     /* ------------------------------------------------------------ */
532     /**
533      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
534      */
535     void doUpdateKey()
536     {
537         synchronized (this)
538         {
539             if (getChannel().isOpen())
540             {
541                 if (_interestOps>0)
542                 {
543                     if (_key==null || !_key.isValid())
544                     {
545                         SelectableChannel sc = (SelectableChannel)getChannel();
546                         if (sc.isRegistered())
547                         {
548                             updateKey();
549                         }
550                         else
551                         {
552                             try
553                             {
554                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
555                             }
556                             catch (Exception e)
557                             {
558                                 LOG.ignore(e);
559                                 if (_key!=null && _key.isValid())
560                                 {
561                                     _key.cancel();
562                                 }
563 
564                                 if (_open)
565                                 {
566                                     _selectSet.destroyEndPoint(this);
567                                 }
568                                 _open=false;
569                                 _key = null;
570                             }
571                         }
572                     }
573                     else
574                     {
575                         _key.interestOps(_interestOps);
576                     }
577                 }
578                 else
579                 {
580                     if (_key!=null && _key.isValid())
581                         _key.interestOps(0);
582                     else
583                         _key=null;
584                 }
585             }
586             else
587             {
588                 if (_key!=null && _key.isValid())
589                     _key.cancel();
590 
591                 if (_open)
592                 {
593                     _open=false;
594                     _selectSet.destroyEndPoint(this);
595                 }
596                 _key = null;
597             }
598         }
599     }
600 
601     /* ------------------------------------------------------------ */
602     /*
603      */
604     protected void handle()
605     {
606         boolean dispatched=true;
607         try
608         {
609             while(dispatched)
610             {
611                 try
612                 {
613                     while(true)
614                     {
615                         final AsyncConnection next = (AsyncConnection)_connection.handle();
616                         if (next!=_connection)
617                         {
618                             LOG.debug("{} replaced {}",next,_connection);
619                             Connection old=_connection;
620                             _connection=next;
621                             _manager.endPointUpgraded(this,old);
622                             continue;
623                         }
624                         break;
625                     }
626                 }
627                 catch (ClosedChannelException e)
628                 {
629                     LOG.ignore(e);
630                 }
631                 catch (EofException e)
632                 {
633                     LOG.debug("EOF", e);
634                     try{close();}
635                     catch(IOException e2){LOG.ignore(e2);}
636                 }
637                 catch (IOException e)
638                 {
639                     LOG.warn(e.toString());
640                     LOG.debug(e);
641                     try{close();}
642                     catch(IOException e2){LOG.ignore(e2);}
643                 }
644                 catch (Throwable e)
645                 {
646                     LOG.warn("handle failed", e);
647                     try{close();}
648                     catch(IOException e2){LOG.ignore(e2);}
649                 }
650                 finally
651                 {
652                     if (!_ishut && isInputShutdown() && isOpen())
653                     {
654                         _ishut=true;
655                         try
656                         {
657                             _connection.onInputShutdown();
658                         }
659                         catch(Throwable x)
660                         {
661                             LOG.warn("onInputShutdown failed", x);
662                             try{close();}
663                             catch(IOException e2){LOG.ignore(e2);}
664                         }
665                         finally
666                         {
667                             updateKey();
668                         }
669                     }
670                     dispatched=!undispatch();
671                 }
672             }
673         }
674         finally
675         {
676             if (dispatched)
677             {
678                 dispatched=!undispatch();
679                 while (dispatched)
680                 {
681                     LOG.warn("SCEP.run() finally DISPATCHED");
682                     dispatched=!undispatch();
683                 }
684             }
685         }
686     }
687 
688     /* ------------------------------------------------------------ */
689     /*
690      * @see org.eclipse.io.nio.ChannelEndPoint#close()
691      */
692     @Override
693     public void close() throws IOException
694     {
695         try
696         {
697             super.close();
698         }
699         catch (IOException e)
700         {
701             LOG.ignore(e);
702         }
703         finally
704         {
705             updateKey();
706         }
707     }
708 
709     /* ------------------------------------------------------------ */
710     @Override
711     public String toString()
712     {
713         // Do NOT use synchronized (this)
714         // because it's very easy to deadlock when debugging is enabled.
715         // We do a best effort to print the right toString() and that's it.
716         SelectionKey key = _key;
717         String keyString = "";
718         if (key != null)
719         {
720             if (key.isValid())
721             {
722                 if (key.isReadable())
723                     keyString += "r";
724                 if (key.isWritable())
725                     keyString += "w";
726             }
727             else
728             {
729                 keyString += "!";
730             }
731         }
732         else
733         {
734             keyString += "-";
735         }
736         return String.format("SCEP@%x{l(%s)<->r(%s),d=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
737                 hashCode(),
738                 _socket.getRemoteSocketAddress(),
739                 _socket.getLocalSocketAddress(),
740                 _dispatched,
741                 isOpen(),
742                 isInputShutdown(),
743                 isOutputShutdown(),
744                 _readBlocked,
745                 _writeBlocked,
746                 _writable,
747                 _interestOps,
748                 keyString,
749                 _connection);
750     }
751 
752     /* ------------------------------------------------------------ */
753     public SelectSet getSelectSet()
754     {
755         return _selectSet;
756     }
757 
758     /* ------------------------------------------------------------ */
759     /**
760      * Don't set the SoTimeout
761      * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
762      */
763     @Override
764     public void setMaxIdleTime(int timeMs) throws IOException
765     {
766         _maxIdleTime=timeMs;
767     }
768 
769 }