View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.ClosedChannelException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  
22  import org.eclipse.jetty.io.AsyncEndPoint;
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.ConnectedEndPoint;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28  import org.eclipse.jetty.util.log.Log;
29  
30  /* ------------------------------------------------------------ */
31  /**
32   * An Endpoint that can be scheduled by {@link SelectorManager}.
33   */
34  public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
35  {
36      private final SelectorManager.SelectSet _selectSet;
37      private final SelectorManager _manager;
38      private final Runnable _handler = new Runnable()
39          {
40              public void run() { handle(); }
41          };
42  
43      private volatile Connection _connection;
44      private boolean _dispatched = false;
45      private boolean _redispatched = false;
46      private volatile boolean _writable = true;
47  
48      private  SelectionKey _key;
49      private int _interestOps;
50      private boolean _readBlocked;
51      private boolean _writeBlocked;
52      private boolean _open;
53      private volatile long _idleTimestamp;
54  
55      /* ------------------------------------------------------------ */
56      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
57          throws IOException
58      {
59          super(channel, maxIdleTime);
60  
61          _manager = selectSet.getManager();
62          _selectSet = selectSet;
63          _dispatched = false;
64          _redispatched = false;
65          _open=true;
66          _key = key;
67  
68          _connection = _manager.newConnection(channel,this);
69  
70          scheduleIdle();
71      }
72  
73      /* ------------------------------------------------------------ */
74      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
75          throws IOException
76      {
77          super(channel);
78  
79          _manager = selectSet.getManager();
80          _selectSet = selectSet;
81          _dispatched = false;
82          _redispatched = false;
83          _open=true;
84          _key = key;
85  
86          _connection = _manager.newConnection(channel,this);
87  
88          scheduleIdle();
89      }
90      /* ------------------------------------------------------------ */
91      public SelectionKey getSelectionKey()
92      {
93          synchronized (this)
94          {
95              return _key;
96          }
97      }
98  
99      /* ------------------------------------------------------------ */
100     public SelectorManager getSelectManager()
101     {
102         return _manager;
103     }
104 
105     /* ------------------------------------------------------------ */
106     public Connection getConnection()
107     {
108         return _connection;
109     }
110 
111     /* ------------------------------------------------------------ */
112     public void setConnection(Connection connection)
113     {
114         Connection old=_connection;
115         _connection=connection;
116         _manager.endPointUpgraded(this,old);
117     }
118 
119     /* ------------------------------------------------------------ */
120     /** Called by selectSet to schedule handling
121      *
122      */
123     public void schedule()
124     {
125         synchronized (this)
126         {
127             // If there is no key, then do nothing
128             if (_key == null || !_key.isValid())
129             {
130                 _readBlocked=false;
131                 _writeBlocked=false;
132                 this.notifyAll();
133                 return;
134             }
135 
136             // If there are threads dispatched reading and writing
137             if (_readBlocked || _writeBlocked)
138             {
139                 // assert _dispatched;
140                 if (_readBlocked && _key.isReadable())
141                     _readBlocked=false;
142                 if (_writeBlocked && _key.isWritable())
143                     _writeBlocked=false;
144 
145                 // wake them up is as good as a dispatched.
146                 this.notifyAll();
147 
148                 // we are not interested in further selecting
149                 if (_dispatched)
150                     _key.interestOps(0);
151                 return;
152             }
153 
154             // Otherwise if we are still dispatched
155             if (!isReadyForDispatch())
156             {
157                 // we are not interested in further selecting
158                 _key.interestOps(0);
159                 return;
160             }
161 
162             // Remove writeable op
163             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
164             {
165                 // Remove writeable op
166                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
167                 _key.interestOps(_interestOps);
168                 _writable = true; // Once writable is in ops, only removed with dispatch.
169             }
170 
171             // Dispatch if we are not already
172             if (!_dispatched)
173             {
174                 dispatch();
175                 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
176                 {
177                     _key.interestOps(0);
178                 }
179             }
180         }
181     }
182 
183     /* ------------------------------------------------------------ */
184     public void dispatch()
185     {
186         synchronized(this)
187         {
188             if (_dispatched)
189             {
190                 _redispatched=true;
191             }
192             else
193             {
194                 _dispatched = true;
195                 boolean dispatched = _manager.dispatch(_handler);
196                 if(!dispatched)
197                 {
198                     _dispatched = false;
199                     Log.warn("Dispatched Failed! "+this+" to "+_manager);
200                     updateKey();
201                 }
202             }
203         }
204     }
205 
206     /* ------------------------------------------------------------ */
207     /**
208      * Called when a dispatched thread is no longer handling the endpoint.
209      * The selection key operations are updated.
210      * @return If false is returned, the endpoint has been redispatched and
211      * thread must keep handling the endpoint.
212      */
213     protected boolean undispatch()
214     {
215         synchronized (this)
216         {
217             if (_redispatched)
218             {
219                 _redispatched=false;
220                 return false;
221             }
222             _dispatched = false;
223             updateKey();
224         }
225         return true;
226     }
227 
228     /* ------------------------------------------------------------ */
229     public void scheduleIdle()
230     {
231         _idleTimestamp=System.currentTimeMillis();
232     }
233 
234     /* ------------------------------------------------------------ */
235     public void cancelIdle()
236     {
237         _idleTimestamp=0;
238     }
239 
240     /* ------------------------------------------------------------ */
241     public void checkIdleTimestamp(long now)
242     {
243         if (_idleTimestamp!=0 && _maxIdleTime!=0 && now>(_idleTimestamp+_maxIdleTime))
244             idleExpired();
245     }
246 
247     /* ------------------------------------------------------------ */
248     protected void idleExpired()
249     {
250         _connection.idleExpired();
251     }
252 
253     /* ------------------------------------------------------------ */
254     /*
255      */
256     @Override
257     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
258     {
259         int l = super.flush(header, buffer, trailer);
260         
261         // If there was something to write and it wasn't written, then we are not writable.
262         if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
263         {
264             synchronized (this)
265             {
266                 _writable=false;
267                 if (!_dispatched)
268                     updateKey();
269             }
270         }
271         else
272             _writable=true;
273         return l;
274     }
275 
276     /* ------------------------------------------------------------ */
277     /*
278      */
279     @Override
280     public int flush(Buffer buffer) throws IOException
281     {
282         int l = super.flush(buffer);
283         
284         // If there was something to write and it wasn't written, then we are not writable.
285         if (l==0 && buffer!=null && buffer.hasContent())
286         {
287             synchronized (this)
288             {
289                 _writable=false;
290                 if (!_dispatched)
291                     updateKey();
292             }
293         }
294         else
295             _writable=true;
296         
297         return l;
298     }
299 
300     /* ------------------------------------------------------------ */
301     public boolean isReadyForDispatch()
302     {
303         synchronized (this)
304         {
305             // Ready if not dispatched and not suspended
306             return !(_dispatched || getConnection().isSuspended());
307         }
308     }
309 
310     /* ------------------------------------------------------------ */
311     /*
312      * Allows thread to block waiting for further events.
313      */
314     @Override
315     public boolean blockReadable(long timeoutMs) throws IOException
316     {
317         synchronized (this)
318         {
319             long now=_selectSet.getNow();
320             long end=now+timeoutMs;
321             try
322             {
323                 _readBlocked=true;
324                 while (isOpen() && _readBlocked)
325                 {
326                     try
327                     {
328                         updateKey();
329                         this.wait(timeoutMs>=0?(end-now):10000);
330                     }
331                     catch (InterruptedException e)
332                     {
333                         Log.warn(e);
334                     }
335                     finally
336                     {
337                         now=_selectSet.getNow();
338                     }
339 
340                     if (_readBlocked && timeoutMs>0 && now>=end)
341                         return false;
342                 }
343             }
344             finally
345             {
346                 _readBlocked=false;
347             }
348         }
349         return true;
350     }
351 
352     /* ------------------------------------------------------------ */
353     /*
354      * Allows thread to block waiting for further events.
355      */
356     @Override
357     public boolean blockWritable(long timeoutMs) throws IOException
358     {
359         synchronized (this)
360         {
361             if (!isOpen() || isOutputShutdown())
362                 throw new EofException();
363             
364             long now=_selectSet.getNow();
365             long end=now+timeoutMs;
366             try
367             {
368                 _writeBlocked=true;
369                 while (isOpen() && _writeBlocked && !isOutputShutdown())
370                 {
371                     try
372                     {
373                         updateKey();
374                         this.wait(timeoutMs>=0?(end-now):10000);
375                     }
376                     catch (InterruptedException e)
377                     {
378                         Log.warn(e);
379                     }
380                     finally
381                     {
382                         now=_selectSet.getNow();
383                     }
384                     if (_writeBlocked && timeoutMs>0 && now>=end)
385                         return false;
386                 }
387             }
388             catch(Throwable e)
389             {
390                 // TODO remove this if it finds nothing
391                 Log.warn(e);
392                 if (e instanceof RuntimeException)
393                     throw (RuntimeException)e;
394                 if (e instanceof Error)
395                     throw (Error)e;
396                 throw new RuntimeException(e);
397             }
398             finally
399             {
400                 _writeBlocked=false;
401                 if (_idleTimestamp!=-1)
402                     scheduleIdle();
403             }
404         }
405         return true;
406     }
407     
408     /* ------------------------------------------------------------ */
409     public void scheduleWrite()
410     {
411         _writable=false;
412         updateKey();
413     }
414 
415     /* ------------------------------------------------------------ */
416     /**
417      * Updates selection key. Adds operations types to the selection key as needed. No operations
418      * are removed as this is only done during dispatch. This method records the new key and
419      * schedules a call to doUpdateKey to do the keyChange
420      */
421     private void updateKey()
422     {
423         synchronized (this)
424         {
425             int ops=-1;
426             if (getChannel().isOpen())
427             {
428                 _interestOps =
429                     ((!_socket.isInputShutdown() && (!_dispatched || _readBlocked))  ? SelectionKey.OP_READ  : 0)
430                 |   ((!_socket.isOutputShutdown()&& (!_writable   || _writeBlocked)) ? SelectionKey.OP_WRITE : 0);
431                 try
432                 {
433                     ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
434                 }
435                 catch(Exception e)
436                 {
437                     _key=null;
438                     Log.ignore(e);
439                 }
440             }
441 
442             if(_interestOps == ops && getChannel().isOpen())
443                 return;
444         }
445         _selectSet.addChange(this);
446         _selectSet.wakeup();
447     }
448 
449     /* ------------------------------------------------------------ */
450     /**
451      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
452      */
453     void doUpdateKey()
454     {
455         synchronized (this)
456         {
457             if (getChannel().isOpen())
458             {
459                 if (_interestOps>0)
460                 {
461                     if (_key==null || !_key.isValid())
462                     {
463                         SelectableChannel sc = (SelectableChannel)getChannel();
464                         if (sc.isRegistered())
465                         {
466                             updateKey();
467                         }
468                         else
469                         {
470                             try
471                             {
472                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
473                             }
474                             catch (Exception e)
475                             {
476                                 Log.ignore(e);
477                                 if (_key!=null && _key.isValid())
478                                 {
479                                     _key.cancel();
480                                 }
481                                 cancelIdle();
482 
483                                 if (_open)
484                                 {
485                                     _selectSet.destroyEndPoint(this);
486                                 }
487                                 _open=false;
488                                 _key = null;
489                             }
490                         }
491                     }
492                     else
493                     {
494                         _key.interestOps(_interestOps);
495                     }
496                 }
497                 else
498                 {
499                     if (_key!=null && _key.isValid())
500                         _key.interestOps(0);
501                     else
502                         _key=null;
503                 }
504             }
505             else
506             {
507                 if (_key!=null && _key.isValid())
508                     _key.cancel();
509 
510                 cancelIdle();
511                 if (_open)
512                 {
513                     _selectSet.destroyEndPoint(this);
514                 }
515                 _open=false;
516                 _key = null;
517             }
518         }
519     }
520 
521     /* ------------------------------------------------------------ */
522     /*
523      */
524     protected void handle()
525     {
526         boolean dispatched=true;
527         try
528         {
529             while(dispatched)
530             {
531                 try
532                 {
533                     while(true)
534                     {
535                         final Connection next = _connection.handle();
536                         if (next!=_connection)
537                         {
538                             Log.debug("{} replaced {}",next,_connection);
539                             _connection=next;
540                             continue;
541                         }
542                         break;
543                     }
544                 }
545                 catch (ClosedChannelException e)
546                 {
547                     Log.ignore(e);
548                 }
549                 catch (EofException e)
550                 {
551                     Log.debug("EOF", e);
552                     try{close();}
553                     catch(IOException e2){Log.ignore(e2);}
554                 }
555                 catch (IOException e)
556                 {
557                     Log.warn(e.toString());
558                     Log.debug(e);
559                     try{close();}
560                     catch(IOException e2){Log.ignore(e2);}
561                 }
562                 catch (Throwable e)
563                 {
564                     Log.warn("handle failed", e);
565                     try{close();}
566                     catch(IOException e2){Log.ignore(e2);}
567                 }
568                 dispatched=!undispatch();
569             }
570         }
571         finally
572         {
573             if (dispatched)
574             {
575                 dispatched=!undispatch();
576                 while (dispatched)
577                 {
578                     Log.warn("SCEP.run() finally DISPATCHED");
579                     dispatched=!undispatch();
580                 }
581             }
582         }
583     }
584 
585     /* ------------------------------------------------------------ */
586     /*
587      * @see org.eclipse.io.nio.ChannelEndPoint#close()
588      */
589     @Override
590     public void close() throws IOException
591     {
592         try
593         {
594             super.close();
595         }
596         catch (IOException e)
597         {
598             Log.ignore(e);
599         }
600         finally
601         {
602             updateKey();
603         }
604     }
605 
606     /* ------------------------------------------------------------ */
607     @Override
608     public String toString()
609     {
610         synchronized(this)
611         {
612             return "SCEP@" + hashCode() + _channel+            
613             "[d=" + _dispatched + ",io=" + _interestOps+
614             ",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
615         }
616     }
617 
618     /* ------------------------------------------------------------ */
619     public SelectSet getSelectSet()
620     {
621         return _selectSet;
622     }
623 
624     /* ------------------------------------------------------------ */
625     /**
626      * Don't set the SoTimeout
627      * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
628      */
629     @Override
630     public void setMaxIdleTime(int timeMs) throws IOException
631     {
632         _maxIdleTime=timeMs;
633     }
634 
635 }