View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.ClosedChannelException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  
22  import org.eclipse.jetty.io.AsyncEndPoint;
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.ConnectedEndPoint;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  
31  /* ------------------------------------------------------------ */
32  /**
33   * An Endpoint that can be scheduled by {@link SelectorManager}.
34   */
35  public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
36  {
37      public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
38      
39      private final SelectorManager.SelectSet _selectSet;
40      private final SelectorManager _manager;
41      private  SelectionKey _key;
42      private final Runnable _handler = new Runnable()
43          {
44              public void run() { handle(); }
45          };
46  
47      /** The desired value for {@link SelectionKey#interestOps()} */
48      private int _interestOps;
49          
50      /**
51       * The connection instance is the handler for any IO activity on the endpoint.
52       * There is a different type of connection for HTTP, AJP, WebSocket and 
53       * ProxyConnect.   The connection may change for an SCEP as it is upgraded 
54       * from HTTP to proxy connect or websocket.
55       */
56      private volatile Connection _connection;
57      
58      /** true if a thread has been dispatched to handle this endpoint */
59      private boolean _dispatched = false;
60      
61      /** true if a non IO dispatch (eg async resume) is outstanding */
62      private boolean _redispatched = false;
63      
64      /** true if the last write operation succeed and wrote all offered bytes */
65      private volatile boolean _writable = true;
66  
67      
68      /** True if a thread has is blocked in {@link #blockReadable(long)} */
69      private boolean _readBlocked;
70  
71      /** True if a thread has is blocked in {@link #blockWritable(long)} */
72      private boolean _writeBlocked;
73      
74      /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
75      private boolean _open;
76      
77      private volatile long _idleTimestamp;
78      
79      /* ------------------------------------------------------------ */
80      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
81          throws IOException
82      {
83          super(channel, maxIdleTime);
84  
85          _manager = selectSet.getManager();
86          _selectSet = selectSet;
87          _dispatched = false;
88          _redispatched = false;
89          _open=true;
90          _key = key;
91  
92          _connection = _manager.newConnection(channel,this);
93  
94          scheduleIdle();
95      }
96  
97      /* ------------------------------------------------------------ */
98      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
99          throws IOException
100     {
101         super(channel);
102 
103         _manager = selectSet.getManager();
104         _selectSet = selectSet;
105         _dispatched = false;
106         _redispatched = false;
107         _open=true;
108         _key = key;
109 
110         _connection = _manager.newConnection(channel,this);
111 
112         scheduleIdle();
113     }
114     
115     /* ------------------------------------------------------------ */
116     public SelectionKey getSelectionKey()
117     {
118         synchronized (this)
119         {
120             return _key;
121         }
122     }
123 
124     /* ------------------------------------------------------------ */
125     public SelectorManager getSelectManager()
126     {
127         return _manager;
128     }
129 
130     /* ------------------------------------------------------------ */
131     public Connection getConnection()
132     {
133         return _connection;
134     }
135 
136     /* ------------------------------------------------------------ */
137     public void setConnection(Connection connection)
138     {
139         Connection old=_connection;
140         _connection=connection;
141         _manager.endPointUpgraded(this,old);
142     }
143 
144     /* ------------------------------------------------------------ */
145     public long getIdleTimestamp()
146     {
147         return _idleTimestamp;
148     }
149     
150     /* ------------------------------------------------------------ */
151     /** Called by selectSet to schedule handling
152      *
153      */
154     public void schedule()
155     {
156         synchronized (this)
157         {
158             // If there is no key, then do nothing
159             if (_key == null || !_key.isValid())
160             {
161                 _readBlocked=false;
162                 _writeBlocked=false;
163                 this.notifyAll();
164                 return;
165             }
166 
167             // If there are threads dispatched reading and writing
168             if (_readBlocked || _writeBlocked)
169             {
170                 // assert _dispatched;
171                 if (_readBlocked && _key.isReadable())
172                     _readBlocked=false;
173                 if (_writeBlocked && _key.isWritable())
174                     _writeBlocked=false;
175 
176                 // wake them up is as good as a dispatched.
177                 this.notifyAll();
178 
179                 // we are not interested in further selecting
180                 if (_dispatched)
181                     _key.interestOps(0);
182                 return;
183             }
184 
185             // Otherwise if we are still dispatched
186             if (!isReadyForDispatch())
187             {
188                 // we are not interested in further selecting
189                 _key.interestOps(0);
190                 return;
191             }
192 
193             // Remove writeable op
194             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
195             {
196                 // Remove writeable op
197                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
198                 _key.interestOps(_interestOps);
199                 _writable = true; // Once writable is in ops, only removed with dispatch.
200             }
201 
202             // Dispatch if we are not already
203             if (!_dispatched)
204             {
205                 dispatch();
206                 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
207                 {
208                     _key.interestOps(0);
209                 }
210             }
211         }
212     }
213 
214     /* ------------------------------------------------------------ */
215     public void dispatch()
216     {
217         synchronized(this)
218         {
219             if (_dispatched)
220             {
221                 _redispatched=true;
222             }
223             else
224             {
225                 _dispatched = true;
226                 boolean dispatched = _manager.dispatch(_handler);
227                 if(!dispatched)
228                 {
229                     _dispatched = false;
230                     LOG.warn("Dispatched Failed! "+this+" to "+_manager);
231                     updateKey();
232                 }
233             }
234         }
235     }
236 
237     /* ------------------------------------------------------------ */
238     /**
239      * Called when a dispatched thread is no longer handling the endpoint.
240      * The selection key operations are updated.
241      * @return If false is returned, the endpoint has been redispatched and
242      * thread must keep handling the endpoint.
243      */
244     protected boolean undispatch()
245     {
246         synchronized (this)
247         {
248             if (_redispatched)
249             {
250                 _redispatched=false;
251                 return false;
252             }
253             _dispatched = false;
254             updateKey();
255         }
256         return true;
257     }
258 
259     /* ------------------------------------------------------------ */
260     public void scheduleIdle()
261     {
262         _idleTimestamp=System.currentTimeMillis();
263     }
264 
265     /* ------------------------------------------------------------ */
266     public void cancelIdle()
267     {
268         _idleTimestamp=0;
269     }
270 
271     /* ------------------------------------------------------------ */
272     public void checkIdleTimestamp(long now)
273     {
274         long idleTimestamp=_idleTimestamp;
275         if (!getChannel().isOpen() || idleTimestamp!=0 && _maxIdleTime>0 && now>(idleTimestamp+_maxIdleTime))
276             idleExpired();
277     }
278 
279     /* ------------------------------------------------------------ */
280     protected void idleExpired()
281     {
282         _connection.idleExpired();
283     }
284 
285     /* ------------------------------------------------------------ */
286     /**
287      * @return True if the endpoint has produced/consumed bytes itself (non application data).
288      */
289     public boolean isProgressing()
290     {
291         return false;
292     }
293     
294     /* ------------------------------------------------------------ */
295     /*
296      */
297     @Override
298     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
299     {
300         int l = super.flush(header, buffer, trailer);
301         
302         // If there was something to write and it wasn't written, then we are not writable.
303         if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
304         {
305             synchronized (this)
306             {
307                 _writable=false;
308                 if (!_dispatched)
309                     updateKey();
310             }
311         }
312         else
313             _writable=true;
314         return l;
315     }
316 
317     /* ------------------------------------------------------------ */
318     /*
319      */
320     @Override
321     public int flush(Buffer buffer) throws IOException
322     {
323         int l = super.flush(buffer);
324         
325         // If there was something to write and it wasn't written, then we are not writable.
326         if (l==0 && buffer!=null && buffer.hasContent())
327         {
328             synchronized (this)
329             {
330                 _writable=false;
331                 if (!_dispatched)
332                     updateKey();
333             }
334         }
335         else
336             _writable=true;
337         
338         return l;
339     }
340 
341     /* ------------------------------------------------------------ */
342     public boolean isReadyForDispatch()
343     {
344         synchronized (this)
345         {
346             // Ready if not dispatched and not suspended
347             return !(_dispatched || getConnection().isSuspended());
348         }
349     }
350 
351     /* ------------------------------------------------------------ */
352     /*
353      * Allows thread to block waiting for further events.
354      */
355     @Override
356     public boolean blockReadable(long timeoutMs) throws IOException
357     {
358         synchronized (this)
359         {
360             long now=_selectSet.getNow();
361             long end=now+timeoutMs;
362             try
363             {
364                 _readBlocked=true;
365                 while (isOpen() && _readBlocked)
366                 {
367                     try
368                     {
369                         updateKey();
370                         this.wait(timeoutMs>=0?(end-now):10000);
371                     }
372                     catch (InterruptedException e)
373                     {
374                         LOG.warn(e);
375                     }
376                     finally
377                     {
378                         now=_selectSet.getNow();
379                     }
380 
381                     if (_readBlocked && timeoutMs>0 && now>=end)
382                         return false;
383                 }
384             }
385             finally
386             {
387                 _readBlocked=false;
388             }
389         }
390         return true;
391     }
392 
393     /* ------------------------------------------------------------ */
394     /*
395      * Allows thread to block waiting for further events.
396      */
397     @Override
398     public boolean blockWritable(long timeoutMs) throws IOException
399     {
400         synchronized (this)
401         {
402             if (!isOpen() || isOutputShutdown())
403                 throw new EofException();
404             
405             long now=_selectSet.getNow();
406             long end=now+timeoutMs;
407             try
408             {
409                 _writeBlocked=true;
410                 while (isOpen() && _writeBlocked && !isOutputShutdown())
411                 {
412                     try
413                     {
414                         updateKey();
415                         this.wait(timeoutMs>=0?(end-now):10000);
416                     }
417                     catch (InterruptedException e)
418                     {
419                         LOG.warn(e);
420                     }
421                     finally
422                     {
423                         now=_selectSet.getNow();
424                     }
425                     if (_writeBlocked && timeoutMs>0 && now>=end)
426                         return false;
427                 }
428             }
429             catch(Throwable e)
430             {
431                 // TODO remove this if it finds nothing
432                 LOG.warn(e);
433                 if (e instanceof RuntimeException)
434                     throw (RuntimeException)e;
435                 if (e instanceof Error)
436                     throw (Error)e;
437                 throw new RuntimeException(e);
438             }
439             finally
440             {
441                 _writeBlocked=false;
442                 if (_idleTimestamp!=-1)
443                     scheduleIdle();
444             }
445         }
446         return true;
447     }
448 
449     /* ------------------------------------------------------------ */
450     /* short cut for busyselectChannelServerTest */
451     public void clearWritable()
452     {
453         _writable=false;
454     }
455     
456     /* ------------------------------------------------------------ */
457     public void scheduleWrite()
458     {
459         if (_writable==true)
460             LOG.debug("Required scheduleWrite {}",this);
461         
462         _writable=false;
463         updateKey();
464     }
465 
466     /* ------------------------------------------------------------ */
467     /**
468      * Updates selection key. Adds operations types to the selection key as needed. No operations
469      * are removed as this is only done during dispatch. This method records the new key and
470      * schedules a call to doUpdateKey to do the keyChange
471      */
472     private void updateKey()
473     {
474         synchronized (this)
475         {
476             int ops=-1;
477             if (getChannel().isOpen())
478             {
479                 _interestOps =
480                     ((!_socket.isInputShutdown() && (!_dispatched || _readBlocked))  ? SelectionKey.OP_READ  : 0)
481                 |   ((!_socket.isOutputShutdown()&& (!_writable   || _writeBlocked)) ? SelectionKey.OP_WRITE : 0);
482                 try
483                 {
484                     ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
485                 }
486                 catch(Exception e)
487                 {
488                     _key=null;
489                     LOG.ignore(e);
490                 }
491             }
492 
493             if(_interestOps == ops && getChannel().isOpen())
494                 return;
495         }
496         _selectSet.addChange(this);
497         _selectSet.wakeup();
498     }
499 
500     /* ------------------------------------------------------------ */
501     /**
502      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
503      */
504     void doUpdateKey()
505     {
506         synchronized (this)
507         {
508             if (getChannel().isOpen())
509             {
510                 if (_interestOps>0)
511                 {
512                     if (_key==null || !_key.isValid())
513                     {
514                         SelectableChannel sc = (SelectableChannel)getChannel();
515                         if (sc.isRegistered())
516                         {
517                             updateKey();
518                         }
519                         else
520                         {
521                             try
522                             {
523                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
524                             }
525                             catch (Exception e)
526                             {
527                                 LOG.ignore(e);
528                                 if (_key!=null && _key.isValid())
529                                 {
530                                     _key.cancel();
531                                 }
532                                 cancelIdle();
533 
534                                 if (_open)
535                                 {
536                                     _selectSet.destroyEndPoint(this);
537                                 }
538                                 _open=false;
539                                 _key = null;
540                             }
541                         }
542                     }
543                     else
544                     {
545                         _key.interestOps(_interestOps);
546                     }
547                 }
548                 else
549                 {
550                     if (_key!=null && _key.isValid())
551                         _key.interestOps(0);
552                     else
553                         _key=null;
554                 }
555             }
556             else
557             {
558                 if (_key!=null && _key.isValid())
559                     _key.cancel();
560 
561                 cancelIdle();
562                 if (_open)
563                 {
564                     _open=false;
565                     _selectSet.destroyEndPoint(this);
566                 }
567                 _key = null;
568             }
569         }
570     }
571 
572     /* ------------------------------------------------------------ */
573     /*
574      */
575     protected void handle()
576     {
577         boolean dispatched=true;
578         try
579         {
580             while(dispatched)
581             {
582                 try
583                 {
584                     while(true)
585                     {
586                         final Connection next = _connection.handle();
587                         if (next!=_connection)
588                         {
589                             LOG.debug("{} replaced {}",next,_connection);
590                             _connection=next;
591                             continue;
592                         }
593                         break;
594                     }
595                 }
596                 catch (ClosedChannelException e)
597                 {
598                     LOG.ignore(e);
599                 }
600                 catch (EofException e)
601                 {
602                     LOG.debug("EOF", e);
603                     try{getChannel().close();}
604                     catch(IOException e2){LOG.ignore(e2);}
605                 }
606                 catch (IOException e)
607                 {
608                     LOG.warn(e.toString());
609                     LOG.debug(e);
610                     try{getChannel().close();}
611                     catch(IOException e2){LOG.ignore(e2);}
612                 }
613                 catch (Throwable e)
614                 {
615                     LOG.warn("handle failed", e);
616                     try{getChannel().close();}
617                     catch(IOException e2){LOG.ignore(e2);}
618                 }
619                 dispatched=!undispatch();
620             }
621         }
622         finally
623         {
624             if (dispatched)
625             {
626                 dispatched=!undispatch();
627                 while (dispatched)
628                 {
629                     LOG.warn("SCEP.run() finally DISPATCHED");
630                     dispatched=!undispatch();
631                 }
632             }
633         }
634     }
635 
636     /* ------------------------------------------------------------ */
637     /*
638      * @see org.eclipse.io.nio.ChannelEndPoint#close()
639      */
640     @Override
641     public void close() throws IOException
642     {
643         try
644         {
645             super.close();
646         }
647         catch (IOException e)
648         {
649             LOG.ignore(e);
650         }
651         finally
652         {
653             updateKey();
654         }
655     }
656 
657     /* ------------------------------------------------------------ */
658     @Override
659     public String toString()
660     {
661         synchronized(this)
662         {
663             return "SCEP@" + hashCode() + _channel+            
664             "[o="+isOpen()+" d=" + _dispatched + ",io=" + _interestOps+
665             ",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
666         }
667     }
668 
669     /* ------------------------------------------------------------ */
670     public SelectSet getSelectSet()
671     {
672         return _selectSet;
673     }
674 
675     /* ------------------------------------------------------------ */
676     /**
677      * Don't set the SoTimeout
678      * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
679      */
680     @Override
681     public void setMaxIdleTime(int timeMs) throws IOException
682     {
683         _maxIdleTime=timeMs;
684     }
685 
686 }