View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.ClosedChannelException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  
22  import org.eclipse.jetty.io.AsyncEndPoint;
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.ConnectedEndPoint;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  import org.eclipse.jetty.util.thread.Timeout.Task;
31  
32  /* ------------------------------------------------------------ */
33  /**
34   * An Endpoint that can be scheduled by {@link SelectorManager}.
35   */
36  public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
37  {
38      public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
39  
40      private final SelectorManager.SelectSet _selectSet;
41      private final SelectorManager _manager;
42      private  SelectionKey _key;
43      private final Runnable _handler = new Runnable()
44          {
45              public void run() { handle(); }
46          };
47  
48      /** The desired value for {@link SelectionKey#interestOps()} */
49      private int _interestOps;
50  
51      /**
52       * The connection instance is the handler for any IO activity on the endpoint.
53       * There is a different type of connection for HTTP, AJP, WebSocket and
54       * ProxyConnect.   The connection may change for an SCEP as it is upgraded
55       * from HTTP to proxy connect or websocket.
56       */
57      private volatile AsyncConnection _connection;
58  
59      /** true if a thread has been dispatched to handle this endpoint */
60      private boolean _dispatched = false;
61  
62      /** true if a non IO dispatch (eg async resume) is outstanding */
63      private boolean _asyncDispatch = false;
64  
65      /** true if the last write operation succeed and wrote all offered bytes */
66      private volatile boolean _writable = true;
67  
68  
69      /** True if a thread has is blocked in {@link #blockReadable(long)} */
70      private boolean _readBlocked;
71  
72      /** True if a thread has is blocked in {@link #blockWritable(long)} */
73      private boolean _writeBlocked;
74  
75      /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
76      private boolean _open;
77  
78      private volatile long _idleTimestamp;
79  
80      private boolean _ishut;
81  
82      /* ------------------------------------------------------------ */
83      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
84          throws IOException
85      {
86          super(channel, maxIdleTime);
87  
88          _manager = selectSet.getManager();
89          _selectSet = selectSet;
90          _dispatched = false;
91          _asyncDispatch = false;
92          _open=true;
93          _key = key;
94  
95          setCheckForIdle(true);
96      }
97  
98      /* ------------------------------------------------------------ */
99      public SelectionKey getSelectionKey()
100     {
101         synchronized (this)
102         {
103             return _key;
104         }
105     }
106 
107     /* ------------------------------------------------------------ */
108     public SelectorManager getSelectManager()
109     {
110         return _manager;
111     }
112 
113     /* ------------------------------------------------------------ */
114     public Connection getConnection()
115     {
116         return _connection;
117     }
118 
119     /* ------------------------------------------------------------ */
120     public void setConnection(Connection connection)
121     {
122         Connection old=_connection;
123         _connection=(AsyncConnection)connection;
124         if (old!=null && old!=_connection)
125             _manager.endPointUpgraded(this,old);
126     }
127 
128     /* ------------------------------------------------------------ */
129     public long getIdleTimestamp()
130     {
131         return _idleTimestamp;
132     }
133 
134     /* ------------------------------------------------------------ */
135     /** Called by selectSet to schedule handling
136      *
137      */
138     public void schedule()
139     {
140         synchronized (this)
141         {
142             // If there is no key, then do nothing
143             if (_key == null || !_key.isValid())
144             {
145                 _readBlocked=false;
146                 _writeBlocked=false;
147                 this.notifyAll();
148                 return;
149             }
150 
151             // If there are threads dispatched reading and writing
152             if (_readBlocked || _writeBlocked)
153             {
154                 // assert _dispatched;
155                 if (_readBlocked && _key.isReadable())
156                     _readBlocked=false;
157                 if (_writeBlocked && _key.isWritable())
158                     _writeBlocked=false;
159 
160                 // wake them up is as good as a dispatched.
161                 this.notifyAll();
162 
163                 // we are not interested in further selecting
164                 _key.interestOps(0);
165                 if (!_dispatched)
166                     updateKey();
167                 return;
168             }
169 
170             // Remove writeable op
171             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
172             {
173                 // Remove writeable op
174                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
175                 _key.interestOps(_interestOps);
176                 _writable = true; // Once writable is in ops, only removed with dispatch.
177             }
178 
179             // If dispatched, then deregister interest
180             if (_dispatched)
181                 _key.interestOps(0);
182             else
183             {
184                 // other wise do the dispatch
185                 dispatch();
186                 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
187                 {
188                     _key.interestOps(0);
189                 }
190             }
191         }
192     }
193 
194     /* ------------------------------------------------------------ */
195     public void asyncDispatch()
196     {
197         synchronized(this)
198         {
199             if (_dispatched)
200                 _asyncDispatch=true;
201             else
202                 dispatch();
203         }
204     }
205 
206     /* ------------------------------------------------------------ */
207     public void dispatch()
208     {
209         synchronized(this)
210         {
211             if (_dispatched)
212             {
213                 throw new IllegalStateException("dispatched");
214             }
215             else
216             {
217                 _dispatched = true;
218                 boolean dispatched = _manager.dispatch(_handler);
219                 if(!dispatched)
220                 {
221                     _dispatched = false;
222                     LOG.warn("Dispatched Failed! "+this+" to "+_manager);
223                     updateKey();
224                 }
225             }
226         }
227     }
228 
229     /* ------------------------------------------------------------ */
230     /**
231      * Called when a dispatched thread is no longer handling the endpoint.
232      * The selection key operations are updated.
233      * @return If false is returned, the endpoint has been redispatched and
234      * thread must keep handling the endpoint.
235      */
236     protected boolean undispatch()
237     {
238         synchronized (this)
239         {
240             if (_asyncDispatch)
241             {
242                 _asyncDispatch=false;
243                 return false;
244             }
245             _dispatched = false;
246             updateKey();
247         }
248         return true;
249     }
250 
251     /* ------------------------------------------------------------ */
252     public void cancelTimeout(Task task)
253     {
254         getSelectSet().cancelTimeout(task);
255     }
256 
257     /* ------------------------------------------------------------ */
258     public void scheduleTimeout(Task task, long timeoutMs)
259     {
260         getSelectSet().scheduleTimeout(task,timeoutMs);
261     }
262 
263     /* ------------------------------------------------------------ */
264     public void setCheckForIdle(boolean check)
265     {
266         _idleTimestamp=check?System.currentTimeMillis():0;
267     }
268 
269     /* ------------------------------------------------------------ */
270     public boolean isCheckForIdle()
271     {
272         return _idleTimestamp!=0;
273     }
274 
275     /* ------------------------------------------------------------ */
276     protected void notIdle()
277     {
278         if (_idleTimestamp!=0)
279             _idleTimestamp=System.currentTimeMillis();
280     }
281 
282     /* ------------------------------------------------------------ */
283     public void checkIdleTimestamp(long now)
284     {
285         long idleTimestamp=_idleTimestamp;
286 
287         if (idleTimestamp!=0 && _maxIdleTime>0)
288         {
289             long idleForMs=now-idleTimestamp;
290 
291             if (idleForMs>_maxIdleTime)
292             {
293                 onIdleExpired(idleForMs);
294                 _idleTimestamp=now;
295             }
296         }
297     }
298 
299     /* ------------------------------------------------------------ */
300     public void onIdleExpired(long idleForMs)
301     {
302         _connection.onIdleExpired(idleForMs);
303     }
304 
305     /* ------------------------------------------------------------ */
306     @Override
307     public int fill(Buffer buffer) throws IOException
308     {
309         int fill=super.fill(buffer);
310         if (fill>0)
311             notIdle();
312         return fill;
313     }
314 
315     /* ------------------------------------------------------------ */
316     @Override
317     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
318     {
319         int l = super.flush(header, buffer, trailer);
320 
321         // If there was something to write and it wasn't written, then we are not writable.
322         if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
323         {
324             synchronized (this)
325             {
326                 if (_dispatched)
327                     _writable=false;
328             }
329         }
330         else if (l>0)
331         {
332             _writable=true;
333             notIdle();
334         }
335         return l;
336     }
337 
338     /* ------------------------------------------------------------ */
339     /*
340      */
341     @Override
342     public int flush(Buffer buffer) throws IOException
343     {
344         int l = super.flush(buffer);
345 
346         // If there was something to write and it wasn't written, then we are not writable.
347         if (l==0 && buffer!=null && buffer.hasContent())
348         {
349             synchronized (this)
350             {
351                 if (_dispatched)
352                     _writable=false;
353             }
354         }
355         else if (l>0)
356         {
357             _writable=true;
358             notIdle();
359         }
360 
361         return l;
362     }
363 
364     /* ------------------------------------------------------------ */
365     /*
366      * Allows thread to block waiting for further events.
367      */
368     @Override
369     public boolean blockReadable(long timeoutMs) throws IOException
370     {
371         synchronized (this)
372         {
373             if (isInputShutdown())
374                 throw new EofException();
375 
376             long now=_selectSet.getNow();
377             long end=now+timeoutMs;
378             boolean check=isCheckForIdle();
379             setCheckForIdle(true);
380             try
381             {
382                 _readBlocked=true;
383                 while (!isInputShutdown() && _readBlocked)
384                 {
385                     try
386                     {
387                         updateKey();
388                         this.wait(timeoutMs>=0?(end-now):10000);
389                     }
390                     catch (InterruptedException e)
391                     {
392                         LOG.warn(e);
393                     }
394                     finally
395                     {
396                         now=_selectSet.getNow();
397                     }
398 
399                     if (_readBlocked && timeoutMs>0 && now>=end)
400                         return false;
401                 }
402             }
403             finally
404             {
405                 _readBlocked=false;
406                 setCheckForIdle(check);
407             }
408         }
409         return true;
410     }
411 
412     /* ------------------------------------------------------------ */
413     /*
414      * Allows thread to block waiting for further events.
415      */
416     @Override
417     public boolean blockWritable(long timeoutMs) throws IOException
418     {
419         synchronized (this)
420         {
421             if (isOutputShutdown())
422                 throw new EofException();
423 
424             long now=_selectSet.getNow();
425             long end=now+timeoutMs;
426             boolean check=isCheckForIdle();
427             setCheckForIdle(true);
428             try
429             {
430                 _writeBlocked=true;
431                 while (_writeBlocked && !isOutputShutdown())
432                 {
433                     try
434                     {
435                         updateKey();
436                         this.wait(timeoutMs>=0?(end-now):10000);
437                     }
438                     catch (InterruptedException e)
439                     {
440                         LOG.warn(e);
441                     }
442                     finally
443                     {
444                         now=_selectSet.getNow();
445                     }
446                     if (_writeBlocked && timeoutMs>0 && now>=end)
447                         return false;
448                 }
449             }
450             finally
451             {
452                 _writeBlocked=false;
453                 setCheckForIdle(check);
454             }
455         }
456         return true;
457     }
458     
459     /* ------------------------------------------------------------ */
460     /**
461      * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite()
462      */
463     public void scheduleWrite()
464     {
465         if (_writable==true)
466             LOG.debug("Required scheduleWrite {}",this);
467 
468         _writable=false;
469         updateKey();
470     }
471 
472     /* ------------------------------------------------------------ */
473     public boolean isWritable()
474     {
475         return _writable;
476     }
477 
478     /* ------------------------------------------------------------ */
479     public boolean hasProgressed()
480     {
481         return false;
482     }
483 
484     /* ------------------------------------------------------------ */
485     /**
486      * Updates selection key. Adds operations types to the selection key as needed. No operations
487      * are removed as this is only done during dispatch. This method records the new key and
488      * schedules a call to doUpdateKey to do the keyChange
489      */
490     private void updateKey()
491     {
492         final boolean changed;
493         synchronized (this)
494         {
495             int current_ops=-1;
496             if (getChannel().isOpen())
497             {
498                 boolean read_interest = _readBlocked || (!_dispatched && !_connection.isSuspended());
499                 boolean write_interest= _writeBlocked || (!_dispatched && !_writable);
500 
501                 _interestOps =
502                     ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ  : 0)
503                 |   ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
504                 try
505                 {
506                     current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
507                 }
508                 catch(Exception e)
509                 {
510                     _key=null;
511                     LOG.ignore(e);
512                 }
513             }
514             changed=_interestOps!=current_ops;
515         }
516 
517         if(changed)
518         {
519             _selectSet.addChange(this);
520             _selectSet.wakeup();
521         }
522     }
523 
524 
525     /* ------------------------------------------------------------ */
526     /**
527      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
528      */
529     void doUpdateKey()
530     {
531         synchronized (this)
532         {
533             if (getChannel().isOpen())
534             {
535                 if (_interestOps>0)
536                 {
537                     if (_key==null || !_key.isValid())
538                     {
539                         SelectableChannel sc = (SelectableChannel)getChannel();
540                         if (sc.isRegistered())
541                         {
542                             updateKey();
543                         }
544                         else
545                         {
546                             try
547                             {
548                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
549                             }
550                             catch (Exception e)
551                             {
552                                 LOG.ignore(e);
553                                 if (_key!=null && _key.isValid())
554                                 {
555                                     _key.cancel();
556                                 }
557 
558                                 if (_open)
559                                 {
560                                     _selectSet.destroyEndPoint(this);
561                                 }
562                                 _open=false;
563                                 _key = null;
564                             }
565                         }
566                     }
567                     else
568                     {
569                         _key.interestOps(_interestOps);
570                     }
571                 }
572                 else
573                 {
574                     if (_key!=null && _key.isValid())
575                         _key.interestOps(0);
576                     else
577                         _key=null;
578                 }
579             }
580             else
581             {
582                 if (_key!=null && _key.isValid())
583                     _key.cancel();
584 
585                 if (_open)
586                 {
587                     _open=false;
588                     _selectSet.destroyEndPoint(this);
589                 }
590                 _key = null;
591             }
592         }
593     }
594 
595     /* ------------------------------------------------------------ */
596     /*
597      */
598     protected void handle()
599     {
600         boolean dispatched=true;
601         try
602         {
603             while(dispatched)
604             {
605                 try
606                 {
607                     while(true)
608                     {
609                         final AsyncConnection next = (AsyncConnection)_connection.handle();
610                         if (next!=_connection)
611                         {
612                             LOG.debug("{} replaced {}",next,_connection);
613                             Connection old=_connection;
614                             _connection=next;
615                             _manager.endPointUpgraded(this,old);
616                             continue;
617                         }
618                         break;
619                     }
620                 }
621                 catch (ClosedChannelException e)
622                 {
623                     LOG.ignore(e);
624                 }
625                 catch (EofException e)
626                 {
627                     LOG.debug("EOF", e);
628                     try{close();}
629                     catch(IOException e2){LOG.ignore(e2);}
630                 }
631                 catch (IOException e)
632                 {
633                     LOG.warn(e.toString());
634                     try{close();}
635                     catch(IOException e2){LOG.ignore(e2);}
636                 }
637                 catch (Throwable e)
638                 {
639                     LOG.warn("handle failed", e);
640                     try{close();}
641                     catch(IOException e2){LOG.ignore(e2);}
642                 }
643                 finally
644                 {
645                     if (!_ishut && isInputShutdown() && isOpen())
646                     {
647                         _ishut=true;
648                         try
649                         {
650                             _connection.onInputShutdown();
651                         }
652                         catch(Throwable x)
653                         {
654                             LOG.warn("onInputShutdown failed", x);
655                             try{close();}
656                             catch(IOException e2){LOG.ignore(e2);}
657                         }
658                         finally
659                         {
660                             updateKey();
661                         }
662                     }
663                     dispatched=!undispatch();
664                 }
665             }
666         }
667         finally
668         {
669             if (dispatched)
670             {
671                 dispatched=!undispatch();
672                 while (dispatched)
673                 {
674                     LOG.warn("SCEP.run() finally DISPATCHED");
675                     dispatched=!undispatch();
676                 }
677             }
678         }
679     }
680 
681     /* ------------------------------------------------------------ */
682     /*
683      * @see org.eclipse.io.nio.ChannelEndPoint#close()
684      */
685     @Override
686     public void close() throws IOException
687     {
688         try
689         {
690             super.close();
691         }
692         catch (IOException e)
693         {
694             LOG.ignore(e);
695         }
696         finally
697         {
698             updateKey();
699         }
700     }
701 
702     /* ------------------------------------------------------------ */
703     @Override
704     public String toString()
705     {
706         // Do NOT use synchronized (this)
707         // because it's very easy to deadlock when debugging is enabled.
708         // We do a best effort to print the right toString() and that's it.
709         SelectionKey key = _key;
710         String keyString = "";
711         if (key != null)
712         {
713             if (key.isValid())
714             {
715                 if (key.isReadable())
716                     keyString += "r";
717                 if (key.isWritable())
718                     keyString += "w";
719             }
720             else
721             {
722                 keyString += "!";
723             }
724         }
725         else
726         {
727             keyString += "-";
728         }
729         return String.format("SCEP@%x{l(%s)<->r(%s),d=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
730                 hashCode(),
731                 _socket.getRemoteSocketAddress(),
732                 _socket.getLocalSocketAddress(),
733                 _dispatched,
734                 isOpen(),
735                 isInputShutdown(),
736                 isOutputShutdown(),
737                 _readBlocked,
738                 _writeBlocked,
739                 _writable,
740                 _interestOps,
741                 keyString,
742                 _connection);
743     }
744 
745     /* ------------------------------------------------------------ */
746     public SelectSet getSelectSet()
747     {
748         return _selectSet;
749     }
750 
751     /* ------------------------------------------------------------ */
752     /**
753      * Don't set the SoTimeout
754      * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
755      */
756     @Override
757     public void setMaxIdleTime(int timeMs) throws IOException
758     {
759         _maxIdleTime=timeMs;
760     }
761 
762 }