View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.ClosedChannelException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  
22  import org.eclipse.jetty.io.AsyncEndPoint;
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.ConnectedEndPoint;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  import org.eclipse.jetty.util.thread.Timeout.Task;
31  
32  /* ------------------------------------------------------------ */
33  /**
34   * An Endpoint that can be scheduled by {@link SelectorManager}.
35   */
36  public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
37  {
38      public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
39  
40      private final SelectorManager.SelectSet _selectSet;
41      private final SelectorManager _manager;
42      private  SelectionKey _key;
43      private final Runnable _handler = new Runnable()
44          {
45              public void run() { handle(); }
46          };
47  
48      /** The desired value for {@link SelectionKey#interestOps()} */
49      private int _interestOps;
50  
51      /**
52       * The connection instance is the handler for any IO activity on the endpoint.
53       * There is a different type of connection for HTTP, AJP, WebSocket and
54       * ProxyConnect.   The connection may change for an SCEP as it is upgraded
55       * from HTTP to proxy connect or websocket.
56       */
57      private volatile AsyncConnection _connection;
58  
59      /** true if a thread has been dispatched to handle this endpoint */
60      private boolean _dispatched = false;
61  
62      /** true if a non IO dispatch (eg async resume) is outstanding */
63      private boolean _asyncDispatch = false;
64  
65      /** true if the last write operation succeed and wrote all offered bytes */
66      private volatile boolean _writable = true;
67  
68  
69      /** True if a thread has is blocked in {@link #blockReadable(long)} */
70      private boolean _readBlocked;
71  
72      /** True if a thread has is blocked in {@link #blockWritable(long)} */
73      private boolean _writeBlocked;
74  
75      /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
76      private boolean _open;
77  
78      private volatile long _idleTimestamp;
79  
80      private boolean _ishut;
81  
82      /* ------------------------------------------------------------ */
83      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
84          throws IOException
85      {
86          super(channel, maxIdleTime);
87  
88          _manager = selectSet.getManager();
89          _selectSet = selectSet;
90          _dispatched = false;
91          _asyncDispatch = false;
92          _open=true;
93          _key = key;
94  
95          setCheckForIdle(true);
96      }
97  
98      /* ------------------------------------------------------------ */
99      public SelectionKey getSelectionKey()
100     {
101         synchronized (this)
102         {
103             return _key;
104         }
105     }
106 
107     /* ------------------------------------------------------------ */
108     public SelectorManager getSelectManager()
109     {
110         return _manager;
111     }
112 
113     /* ------------------------------------------------------------ */
114     public Connection getConnection()
115     {
116         return _connection;
117     }
118 
119     /* ------------------------------------------------------------ */
120     public void setConnection(Connection connection)
121     {
122         Connection old=_connection;
123         _connection=(AsyncConnection)connection;
124         if (old!=null && old!=_connection)
125             _manager.endPointUpgraded(this,old);
126     }
127 
128     /* ------------------------------------------------------------ */
129     public long getIdleTimestamp()
130     {
131         return _idleTimestamp;
132     }
133 
134     /* ------------------------------------------------------------ */
135     /** Called by selectSet to schedule handling
136      *
137      */
138     public void schedule()
139     {
140         synchronized (this)
141         {
142             // If there is no key, then do nothing
143             if (_key == null || !_key.isValid())
144             {
145                 _readBlocked=false;
146                 _writeBlocked=false;
147                 this.notifyAll();
148                 return;
149             }
150 
151             // If there are threads dispatched reading and writing
152             if (_readBlocked || _writeBlocked)
153             {
154                 // assert _dispatched;
155                 if (_readBlocked && _key.isReadable())
156                     _readBlocked=false;
157                 if (_writeBlocked && _key.isWritable())
158                     _writeBlocked=false;
159 
160                 // wake them up is as good as a dispatched.
161                 this.notifyAll();
162 
163                 // we are not interested in further selecting
164                 _key.interestOps(0);
165                 if (!_dispatched)
166                     updateKey();
167                 return;
168             }
169 
170             // Remove writeable op
171             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
172             {
173                 // Remove writeable op
174                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
175                 _key.interestOps(_interestOps);
176                 _writable = true; // Once writable is in ops, only removed with dispatch.
177             }
178 
179             // If dispatched, then deregister interest
180             if (_dispatched)
181                 _key.interestOps(0);
182             else
183             {
184                 // other wise do the dispatch
185                 dispatch();
186                 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
187                 {
188                     _key.interestOps(0);
189                 }
190             }
191         }
192     }
193 
194     /* ------------------------------------------------------------ */
195     public void asyncDispatch()
196     {
197         synchronized(this)
198         {
199             if (_dispatched)
200                 _asyncDispatch=true;
201             else
202                 dispatch();
203         }
204     }
205 
206     /* ------------------------------------------------------------ */
207     public void dispatch()
208     {
209         synchronized(this)
210         {
211             if (_dispatched)
212             {
213                 throw new IllegalStateException("dispatched");
214             }
215             else
216             {
217                 _dispatched = true;
218                 boolean dispatched = _manager.dispatch(_handler);
219                 if(!dispatched)
220                 {
221                     _dispatched = false;
222                     LOG.warn("Dispatched Failed! "+this+" to "+_manager);
223                     updateKey();
224                 }
225             }
226         }
227     }
228 
229     /* ------------------------------------------------------------ */
230     /**
231      * Called when a dispatched thread is no longer handling the endpoint.
232      * The selection key operations are updated.
233      * @return If false is returned, the endpoint has been redispatched and
234      * thread must keep handling the endpoint.
235      */
236     protected boolean undispatch()
237     {
238         synchronized (this)
239         {
240             if (_asyncDispatch)
241             {
242                 _asyncDispatch=false;
243                 return false;
244             }
245             _dispatched = false;
246             updateKey();
247         }
248         return true;
249     }
250 
251     /* ------------------------------------------------------------ */
252     public void cancelTimeout(Task task)
253     {
254         getSelectSet().cancelTimeout(task);
255     }
256 
257     /* ------------------------------------------------------------ */
258     public void scheduleTimeout(Task task, long timeoutMs)
259     {
260         getSelectSet().scheduleTimeout(task,timeoutMs);
261     }
262 
263     /* ------------------------------------------------------------ */
264     public void setCheckForIdle(boolean check)
265     {
266         _idleTimestamp=check?System.currentTimeMillis():0;
267     }
268 
269     /* ------------------------------------------------------------ */
270     public boolean isCheckForIdle()
271     {
272         return _idleTimestamp!=0;
273     }
274 
275     /* ------------------------------------------------------------ */
276     protected void notIdle()
277     {
278         if (_idleTimestamp!=0)
279             _idleTimestamp=System.currentTimeMillis();
280     }
281 
282     /* ------------------------------------------------------------ */
283     public void checkIdleTimestamp(long now)
284     {
285         long idleTimestamp=_idleTimestamp;
286 
287         if (idleTimestamp!=0 && _maxIdleTime>0)
288         {
289             long idleForMs=now-idleTimestamp;
290 
291             if (idleForMs>_maxIdleTime)
292             {
293                 onIdleExpired(idleForMs);
294                 _idleTimestamp=now;
295             }
296         }
297     }
298 
299     /* ------------------------------------------------------------ */
300     public void onIdleExpired(long idleForMs)
301     {
302         _connection.onIdleExpired(idleForMs);
303     }
304 
305     /* ------------------------------------------------------------ */
306     @Override
307     public int fill(Buffer buffer) throws IOException
308     {
309         int fill=super.fill(buffer);
310         if (fill>0)
311             notIdle();
312         return fill;
313     }
314 
315     /* ------------------------------------------------------------ */
316     @Override
317     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
318     {
319         int l = super.flush(header, buffer, trailer);
320 
321         // If there was something to write and it wasn't written, then we are not writable.
322         if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
323         {
324             synchronized (this)
325             {
326                 _writable=false;
327                 if (!_dispatched)
328                     updateKey();
329             }
330         }
331         else if (l>0)
332         {
333             _writable=true;
334             notIdle();
335         }
336         return l;
337     }
338 
339     /* ------------------------------------------------------------ */
340     /*
341      */
342     @Override
343     public int flush(Buffer buffer) throws IOException
344     {
345         int l = super.flush(buffer);
346 
347         // If there was something to write and it wasn't written, then we are not writable.
348         if (l==0 && buffer!=null && buffer.hasContent())
349         {
350             synchronized (this)
351             {
352                 _writable=false;
353                 if (!_dispatched)
354                     updateKey();
355             }
356         }
357         else if (l>0)
358         {
359             _writable=true;
360             notIdle();
361         }
362 
363         return l;
364     }
365 
366     /* ------------------------------------------------------------ */
367     /*
368      * Allows thread to block waiting for further events.
369      */
370     @Override
371     public boolean blockReadable(long timeoutMs) throws IOException
372     {
373         synchronized (this)
374         {
375             if (isInputShutdown())
376                 throw new EofException();
377 
378             long now=_selectSet.getNow();
379             long end=now+timeoutMs;
380             boolean check=isCheckForIdle();
381             setCheckForIdle(true);
382             try
383             {
384                 _readBlocked=true;
385                 while (!isInputShutdown() && _readBlocked)
386                 {
387                     try
388                     {
389                         updateKey();
390                         this.wait(timeoutMs>=0?(end-now):10000);
391                     }
392                     catch (InterruptedException e)
393                     {
394                         LOG.warn(e);
395                     }
396                     finally
397                     {
398                         now=_selectSet.getNow();
399                     }
400 
401                     if (_readBlocked && timeoutMs>0 && now>=end)
402                         return false;
403                 }
404             }
405             finally
406             {
407                 _readBlocked=false;
408                 setCheckForIdle(check);
409             }
410         }
411         return true;
412     }
413 
414     /* ------------------------------------------------------------ */
415     /*
416      * Allows thread to block waiting for further events.
417      */
418     @Override
419     public boolean blockWritable(long timeoutMs) throws IOException
420     {
421         synchronized (this)
422         {
423             if (isOutputShutdown())
424                 throw new EofException();
425 
426             long now=_selectSet.getNow();
427             long end=now+timeoutMs;
428             boolean check=isCheckForIdle();
429             setCheckForIdle(true);
430             try
431             {
432                 _writeBlocked=true;
433                 while (_writeBlocked && !isOutputShutdown())
434                 {
435                     try
436                     {
437                         updateKey();
438                         this.wait(timeoutMs>=0?(end-now):10000);
439                     }
440                     catch (InterruptedException e)
441                     {
442                         LOG.warn(e);
443                     }
444                     finally
445                     {
446                         now=_selectSet.getNow();
447                     }
448                     if (_writeBlocked && timeoutMs>0 && now>=end)
449                         return false;
450                 }
451             }
452             finally
453             {
454                 _writeBlocked=false;
455                 setCheckForIdle(check);
456             }
457         }
458         return true;
459     }
460     
461     /* ------------------------------------------------------------ */
462     /**
463      * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite()
464      */
465     public void scheduleWrite()
466     {
467         if (_writable==true)
468             LOG.debug("Required scheduleWrite {}",this);
469 
470         _writable=false;
471         updateKey();
472     }
473 
474     /* ------------------------------------------------------------ */
475     public boolean isWritable()
476     {
477         return _writable;
478     }
479 
480     /* ------------------------------------------------------------ */
481     public boolean hasProgressed()
482     {
483         return false;
484     }
485 
486     /* ------------------------------------------------------------ */
487     /**
488      * Updates selection key. Adds operations types to the selection key as needed. No operations
489      * are removed as this is only done during dispatch. This method records the new key and
490      * schedules a call to doUpdateKey to do the keyChange
491      */
492     private void updateKey()
493     {
494         final boolean changed;
495         synchronized (this)
496         {
497             int current_ops=-1;
498             if (getChannel().isOpen())
499             {
500                 boolean read_interest = _readBlocked || (!_dispatched && !_connection.isSuspended());
501                 boolean write_interest= _writeBlocked || (!_dispatched && !_writable);
502 
503                 _interestOps =
504                     ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ  : 0)
505                 |   ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
506                 try
507                 {
508                     current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
509                 }
510                 catch(Exception e)
511                 {
512                     _key=null;
513                     LOG.ignore(e);
514                 }
515             }
516             changed=_interestOps!=current_ops;
517         }
518 
519         if(changed)
520         {
521             _selectSet.addChange(this);
522             _selectSet.wakeup();
523         }
524     }
525 
526 
527     /* ------------------------------------------------------------ */
528     /**
529      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
530      */
531     void doUpdateKey()
532     {
533         synchronized (this)
534         {
535             if (getChannel().isOpen())
536             {
537                 if (_interestOps>0)
538                 {
539                     if (_key==null || !_key.isValid())
540                     {
541                         SelectableChannel sc = (SelectableChannel)getChannel();
542                         if (sc.isRegistered())
543                         {
544                             updateKey();
545                         }
546                         else
547                         {
548                             try
549                             {
550                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
551                             }
552                             catch (Exception e)
553                             {
554                                 LOG.ignore(e);
555                                 if (_key!=null && _key.isValid())
556                                 {
557                                     _key.cancel();
558                                 }
559 
560                                 if (_open)
561                                 {
562                                     _selectSet.destroyEndPoint(this);
563                                 }
564                                 _open=false;
565                                 _key = null;
566                             }
567                         }
568                     }
569                     else
570                     {
571                         _key.interestOps(_interestOps);
572                     }
573                 }
574                 else
575                 {
576                     if (_key!=null && _key.isValid())
577                         _key.interestOps(0);
578                     else
579                         _key=null;
580                 }
581             }
582             else
583             {
584                 if (_key!=null && _key.isValid())
585                     _key.cancel();
586 
587                 if (_open)
588                 {
589                     _open=false;
590                     _selectSet.destroyEndPoint(this);
591                 }
592                 _key = null;
593             }
594         }
595     }
596 
597     /* ------------------------------------------------------------ */
598     /*
599      */
600     protected void handle()
601     {
602         boolean dispatched=true;
603         try
604         {
605             while(dispatched)
606             {
607                 try
608                 {
609                     while(true)
610                     {
611                         final AsyncConnection next = (AsyncConnection)_connection.handle();
612                         if (next!=_connection)
613                         {
614                             LOG.debug("{} replaced {}",next,_connection);
615                             Connection old=_connection;
616                             _connection=next;
617                             _manager.endPointUpgraded(this,old);
618                             continue;
619                         }
620                         break;
621                     }
622                 }
623                 catch (ClosedChannelException e)
624                 {
625                     LOG.ignore(e);
626                 }
627                 catch (EofException e)
628                 {
629                     LOG.debug("EOF", e);
630                     try{close();}
631                     catch(IOException e2){LOG.ignore(e2);}
632                 }
633                 catch (IOException e)
634                 {
635                     LOG.warn(e.toString());
636                     try{close();}
637                     catch(IOException e2){LOG.ignore(e2);}
638                 }
639                 catch (Throwable e)
640                 {
641                     LOG.warn("handle failed", e);
642                     try{close();}
643                     catch(IOException e2){LOG.ignore(e2);}
644                 }
645                 finally
646                 {
647                     if (!_ishut && isInputShutdown() && isOpen())
648                     {
649                         _ishut=true;
650                         try
651                         {
652                             _connection.onInputShutdown();
653                         }
654                         catch(Throwable x)
655                         {
656                             LOG.warn("onInputShutdown failed", x);
657                             try{close();}
658                             catch(IOException e2){LOG.ignore(e2);}
659                         }
660                         finally
661                         {
662                             updateKey();
663                         }
664                     }
665                     dispatched=!undispatch();
666                 }
667             }
668         }
669         finally
670         {
671             if (dispatched)
672             {
673                 dispatched=!undispatch();
674                 while (dispatched)
675                 {
676                     LOG.warn("SCEP.run() finally DISPATCHED");
677                     dispatched=!undispatch();
678                 }
679             }
680         }
681     }
682 
683     /* ------------------------------------------------------------ */
684     /*
685      * @see org.eclipse.io.nio.ChannelEndPoint#close()
686      */
687     @Override
688     public void close() throws IOException
689     {
690         try
691         {
692             super.close();
693         }
694         catch (IOException e)
695         {
696             LOG.ignore(e);
697         }
698         finally
699         {
700             updateKey();
701         }
702     }
703 
704     /* ------------------------------------------------------------ */
705     @Override
706     public String toString()
707     {
708         // Do NOT use synchronized (this)
709         // because it's very easy to deadlock when debugging is enabled.
710         // We do a best effort to print the right toString() and that's it.
711         SelectionKey key = _key;
712         String keyString = "";
713         if (key != null)
714         {
715             if (key.isValid())
716             {
717                 if (key.isReadable())
718                     keyString += "r";
719                 if (key.isWritable())
720                     keyString += "w";
721             }
722             else
723             {
724                 keyString += "!";
725             }
726         }
727         else
728         {
729             keyString += "-";
730         }
731         return String.format("SCEP@%x{l(%s)<->r(%s),d=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
732                 hashCode(),
733                 _socket.getRemoteSocketAddress(),
734                 _socket.getLocalSocketAddress(),
735                 _dispatched,
736                 isOpen(),
737                 isInputShutdown(),
738                 isOutputShutdown(),
739                 _readBlocked,
740                 _writeBlocked,
741                 _writable,
742                 _interestOps,
743                 keyString,
744                 _connection);
745     }
746 
747     /* ------------------------------------------------------------ */
748     public SelectSet getSelectSet()
749     {
750         return _selectSet;
751     }
752 
753     /* ------------------------------------------------------------ */
754     /**
755      * Don't set the SoTimeout
756      * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
757      */
758     @Override
759     public void setMaxIdleTime(int timeMs) throws IOException
760     {
761         _maxIdleTime=timeMs;
762     }
763 
764 }