View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.ClosedChannelException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  
22  import org.eclipse.jetty.io.AsyncEndPoint;
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.ConnectedEndPoint;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28  import org.eclipse.jetty.util.log.Log;
29  
30  /* ------------------------------------------------------------ */
31  /**
32   * An Endpoint that can be scheduled by {@link SelectorManager}.
33   */
34  public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
35  {
36      private final SelectorManager.SelectSet _selectSet;
37      private final SelectorManager _manager;
38      private final Runnable _handler = new Runnable()
39          {
40              public void run() { handle(); }
41          };
42  
43      private volatile Connection _connection;
44      private boolean _dispatched = false;
45      private boolean _redispatched = false;
46      private volatile boolean _writable = true;
47  
48      private  SelectionKey _key;
49      private int _interestOps;
50      private boolean _readBlocked;
51      private boolean _writeBlocked;
52      private boolean _open;
53      private volatile long _idleTimestamp;
54  
55      /* ------------------------------------------------------------ */
56      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
57          throws IOException
58      {
59          super(channel, maxIdleTime);
60  
61          _manager = selectSet.getManager();
62          _selectSet = selectSet;
63          _dispatched = false;
64          _redispatched = false;
65          _open=true;
66          _key = key;
67  
68          _connection = _manager.newConnection(channel,this);
69  
70          scheduleIdle();
71      }
72  
73      /* ------------------------------------------------------------ */
74      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
75          throws IOException
76      {
77          super(channel);
78  
79          _manager = selectSet.getManager();
80          _selectSet = selectSet;
81          _dispatched = false;
82          _redispatched = false;
83          _open=true;
84          _key = key;
85  
86          _connection = _manager.newConnection(channel,this);
87  
88          scheduleIdle();
89      }
90      /* ------------------------------------------------------------ */
91      public SelectionKey getSelectionKey()
92      {
93          synchronized (this)
94          {
95              return _key;
96          }
97      }
98  
99      /* ------------------------------------------------------------ */
100     public SelectorManager getSelectManager()
101     {
102         return _manager;
103     }
104 
105     /* ------------------------------------------------------------ */
106     public Connection getConnection()
107     {
108         return _connection;
109     }
110 
111     /* ------------------------------------------------------------ */
112     public void setConnection(Connection connection)
113     {
114         Connection old=_connection;
115         _connection=connection;
116         _manager.endPointUpgraded(this,old);
117     }
118 
119     /* ------------------------------------------------------------ */
120     /** Called by selectSet to schedule handling
121      *
122      */
123     public void schedule()
124     {
125         synchronized (this)
126         {
127             // If there is no key, then do nothing
128             if (_key == null || !_key.isValid())
129             {
130                 _readBlocked=false;
131                 _writeBlocked=false;
132                 this.notifyAll();
133                 return;
134             }
135 
136             // If there are threads dispatched reading and writing
137             if (_readBlocked || _writeBlocked)
138             {
139                 // assert _dispatched;
140                 if (_readBlocked && _key.isReadable())
141                     _readBlocked=false;
142                 if (_writeBlocked && _key.isWritable())
143                     _writeBlocked=false;
144 
145                 // wake them up is as good as a dispatched.
146                 this.notifyAll();
147 
148                 // we are not interested in further selecting
149                 if (_dispatched)
150                     _key.interestOps(0);
151                 return;
152             }
153 
154             // Otherwise if we are still dispatched
155             if (!isReadyForDispatch())
156             {
157                 // we are not interested in further selecting
158                 _key.interestOps(0);
159                 return;
160             }
161 
162             // Remove writeable op
163             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
164             {
165                 // Remove writeable op
166                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
167                 _key.interestOps(_interestOps);
168                 _writable = true; // Once writable is in ops, only removed with dispatch.
169             }
170 
171             // Dispatch if we are not already
172             if (!_dispatched)
173             {
174                 dispatch();
175                 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
176                 {
177                     _key.interestOps(0);
178                 }
179             }
180         }
181     }
182 
183     /* ------------------------------------------------------------ */
184     public void dispatch()
185     {
186         synchronized(this)
187         {
188             if (_dispatched)
189             {
190                 _redispatched=true;
191             }
192             else
193             {
194                 _dispatched = true;
195                 boolean dispatched = _manager.dispatch(_handler);
196                 if(!dispatched)
197                 {
198                     _dispatched = false;
199                     Log.warn("Dispatched Failed! "+this+" to "+_manager);
200                     updateKey();
201                 }
202             }
203         }
204     }
205 
206     /* ------------------------------------------------------------ */
207     /**
208      * Called when a dispatched thread is no longer handling the endpoint.
209      * The selection key operations are updated.
210      * @return If false is returned, the endpoint has been redispatched and
211      * thread must keep handling the endpoint.
212      */
213     protected boolean undispatch()
214     {
215         synchronized (this)
216         {
217             if (_redispatched)
218             {
219                 _redispatched=false;
220                 return false;
221             }
222             _dispatched = false;
223             updateKey();
224         }
225         return true;
226     }
227 
228     /* ------------------------------------------------------------ */
229     public void scheduleIdle()
230     {
231         _idleTimestamp=System.currentTimeMillis();
232     }
233 
234     /* ------------------------------------------------------------ */
235     public void cancelIdle()
236     {
237         _idleTimestamp=0;
238     }
239 
240     /* ------------------------------------------------------------ */
241     public void checkIdleTimestamp(long now)
242     {
243         if (_idleTimestamp!=0 && _maxIdleTime!=0 && now>(_idleTimestamp+_maxIdleTime))
244             idleExpired();
245     }
246 
247     /* ------------------------------------------------------------ */
248     protected void idleExpired()
249     {
250         _connection.idleExpired();
251     }
252 
253     /* ------------------------------------------------------------ */
254     /*
255      */
256     @Override
257     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
258     {
259         int l = super.flush(header, buffer, trailer);
260         
261         // If there was something to write and it wasn't written, then we are not writable.
262         if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
263         {
264             synchronized (this)
265             {
266                 _writable=false;
267                 if (!_dispatched)
268                     updateKey();
269             }
270         }
271         else
272             _writable=true;
273         return l;
274     }
275 
276     /* ------------------------------------------------------------ */
277     /*
278      */
279     @Override
280     public int flush(Buffer buffer) throws IOException
281     {
282         int l = super.flush(buffer);
283         
284         // If there was something to write and it wasn't written, then we are not writable.
285         if (l==0 && buffer!=null && buffer.hasContent())
286         {
287             synchronized (this)
288             {
289                 _writable=false;
290                 if (!_dispatched)
291                     updateKey();
292             }
293         }
294         else
295             _writable=true;
296         
297         return l;
298     }
299 
300     /* ------------------------------------------------------------ */
301     public boolean isReadyForDispatch()
302     {
303         synchronized (this)
304         {
305             // Ready if not dispatched and not suspended
306             return !(_dispatched || getConnection().isSuspended());
307         }
308     }
309 
310     /* ------------------------------------------------------------ */
311     /*
312      * Allows thread to block waiting for further events.
313      */
314     @Override
315     public boolean blockReadable(long timeoutMs) throws IOException
316     {
317         synchronized (this)
318         {
319             long start=_selectSet.getNow();
320             try
321             {
322                 _readBlocked=true;
323                 while (isOpen() && _readBlocked)
324                 {
325                     try
326                     {
327                         updateKey();
328                         this.wait(timeoutMs);
329                     }
330                     catch (InterruptedException e)
331                     {
332                         Log.warn(e);
333                     }
334 
335                     timeoutMs -= _selectSet.getNow()-start;
336                     if (_readBlocked && timeoutMs<=0)
337                         return false;
338                 }
339             }
340             finally
341             {
342                 _readBlocked=false;
343             }
344         }
345         return true;
346     }
347 
348     /* ------------------------------------------------------------ */
349     /*
350      * Allows thread to block waiting for further events.
351      */
352     @Override
353     public boolean blockWritable(long timeoutMs) throws IOException
354     {
355         synchronized (this)
356         {
357             long start=_selectSet.getNow();
358             try
359             {
360                 _writeBlocked=true;
361                 while (isOpen() && _writeBlocked)
362                 {
363                     try
364                     {
365                         updateKey();
366                         this.wait(timeoutMs);
367                     }
368                     catch (InterruptedException e)
369                     {
370                         Log.warn(e);
371                     }
372 
373                     timeoutMs -= _selectSet.getNow()-start;
374                     if (_writeBlocked && timeoutMs<=0)
375                         return false;
376                 }
377             }
378             finally
379             {
380                 _writeBlocked=false;
381                 if (_idleTimestamp!=-1)
382                     scheduleIdle();
383             }
384         }
385         return true;
386     }
387     
388     /* ------------------------------------------------------------ */
389     public void scheduleWrite()
390     {
391         _writable=false;
392         updateKey();
393     }
394 
395     /* ------------------------------------------------------------ */
396     /**
397      * Updates selection key. Adds operations types to the selection key as needed. No operations
398      * are removed as this is only done during dispatch. This method records the new key and
399      * schedules a call to doUpdateKey to do the keyChange
400      */
401     private void updateKey()
402     {
403         synchronized (this)
404         {
405             int ops=-1;
406             if (getChannel().isOpen())
407             {
408                 _interestOps =
409                     ((!_socket.isInputShutdown() && (!_dispatched || _readBlocked))  ? SelectionKey.OP_READ  : 0)
410                 |   ((!_socket.isOutputShutdown()&& (!_writable   || _writeBlocked)) ? SelectionKey.OP_WRITE : 0);
411                 try
412                 {
413                     ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
414                 }
415                 catch(Exception e)
416                 {
417                     _key=null;
418                     Log.ignore(e);
419                 }
420             }
421 
422             if(_interestOps == ops && getChannel().isOpen())
423                 return;
424         }
425         _selectSet.addChange(this);
426         _selectSet.wakeup();
427     }
428 
429     /* ------------------------------------------------------------ */
430     /**
431      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
432      */
433     void doUpdateKey()
434     {
435         synchronized (this)
436         {
437             if (getChannel().isOpen())
438             {
439                 if (_interestOps>0)
440                 {
441                     if (_key==null || !_key.isValid())
442                     {
443                         SelectableChannel sc = (SelectableChannel)getChannel();
444                         if (sc.isRegistered())
445                         {
446                             updateKey();
447                         }
448                         else
449                         {
450                             try
451                             {
452                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
453                             }
454                             catch (Exception e)
455                             {
456                                 Log.ignore(e);
457                                 if (_key!=null && _key.isValid())
458                                 {
459                                     _key.cancel();
460                                 }
461                                 cancelIdle();
462 
463                                 if (_open)
464                                 {
465                                     _selectSet.destroyEndPoint(this);
466                                 }
467                                 _open=false;
468                                 _key = null;
469                             }
470                         }
471                     }
472                     else
473                     {
474                         _key.interestOps(_interestOps);
475                     }
476                 }
477                 else
478                 {
479                     if (_key!=null && _key.isValid())
480                         _key.interestOps(0);
481                     else
482                         _key=null;
483                 }
484             }
485             else
486             {
487                 if (_key!=null && _key.isValid())
488                     _key.cancel();
489 
490                 cancelIdle();
491                 if (_open)
492                 {
493                     _selectSet.destroyEndPoint(this);
494                 }
495                 _open=false;
496                 _key = null;
497             }
498         }
499     }
500 
501     /* ------------------------------------------------------------ */
502     /*
503      */
504     protected void handle()
505     {
506         boolean dispatched=true;
507         try
508         {
509             while(dispatched)
510             {
511                 try
512                 {
513                     while(true)
514                     {
515                         final Connection next = _connection.handle();
516                         if (next!=_connection)
517                         {
518                             Log.debug("{} replaced {}",next,_connection);
519                             _connection=next;
520                             continue;
521                         }
522                         break;
523                     }
524                 }
525                 catch (ClosedChannelException e)
526                 {
527                     Log.ignore(e);
528                 }
529                 catch (EofException e)
530                 {
531                     Log.debug("EOF", e);
532                     try{close();}
533                     catch(IOException e2){Log.ignore(e2);}
534                 }
535                 catch (IOException e)
536                 {
537                     Log.warn(e.toString());
538                     Log.debug(e);
539                     try{close();}
540                     catch(IOException e2){Log.ignore(e2);}
541                 }
542                 catch (Throwable e)
543                 {
544                     Log.warn("handle failed", e);
545                     try{close();}
546                     catch(IOException e2){Log.ignore(e2);}
547                 }
548                 dispatched=!undispatch();
549             }
550         }
551         finally
552         {
553             if (dispatched)
554             {
555                 dispatched=!undispatch();
556                 while (dispatched)
557                 {
558                     Log.warn("SCEP.run() finally DISPATCHED");
559                     dispatched=!undispatch();
560                 }
561             }
562         }
563     }
564 
565     /* ------------------------------------------------------------ */
566     /*
567      * @see org.eclipse.io.nio.ChannelEndPoint#close()
568      */
569     @Override
570     public void close() throws IOException
571     {
572         try
573         {
574             super.close();
575         }
576         catch (IOException e)
577         {
578             Log.ignore(e);
579         }
580         finally
581         {
582             updateKey();
583         }
584     }
585 
586     /* ------------------------------------------------------------ */
587     @Override
588     public String toString()
589     {
590         synchronized(this)
591         {
592             return "SCEP@" + hashCode() + _channel+            
593             "[d=" + _dispatched + ",io=" + _interestOps+
594             ",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
595         }
596     }
597 
598     /* ------------------------------------------------------------ */
599     public SelectSet getSelectSet()
600     {
601         return _selectSet;
602     }
603 
604     /* ------------------------------------------------------------ */
605     /**
606      * Don't set the SoTimeout
607      * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
608      */
609     @Override
610     public void setMaxIdleTime(int timeMs) throws IOException
611     {
612         _maxIdleTime=timeMs;
613     }
614 
615 }