View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.ClosedChannelException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  import java.util.concurrent.atomic.AtomicInteger;
22  
23  import org.eclipse.jetty.io.AsyncEndPoint;
24  import org.eclipse.jetty.io.Buffer;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.thread.Timeout;
30  
31  /* ------------------------------------------------------------ */
32  /**
33   * An Endpoint that can be scheduled by {@link SelectorManager}.
34   * 
35   * 
36   *
37   */
38  public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint
39  {
40      private final SelectorManager.SelectSet _selectSet;
41      private final Connection _connection;
42      private final SelectorManager _manager;
43      private boolean _dispatched = false;
44      private boolean _redispatched = false;
45      private volatile boolean _writable = true; 
46      
47      private  SelectionKey _key;
48      private int _interestOps;
49      private boolean _readBlocked;
50      private boolean _writeBlocked;
51      private boolean _open;
52      private final Timeout.Task _idleTask = new IdleTask();
53  
54      /* ------------------------------------------------------------ */
55      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
56      {
57          super(channel);
58  
59          _manager = selectSet.getManager();
60          _selectSet = selectSet;
61          _connection = _manager.newConnection(channel,this);
62          _dispatched = false;
63          _redispatched = false;
64          _open=true;
65          _manager.endPointOpened(this);
66          
67          _key = key;
68          scheduleIdle();
69      }
70      
71      /* ------------------------------------------------------------ */
72      public Connection getConnection()
73      {
74          return _connection;
75      }
76      
77      /* ------------------------------------------------------------ */
78      public SelectorManager getSelectManager()
79      {
80          return _manager;
81      }
82  
83      /* ------------------------------------------------------------ */
84      /** Called by selectSet to schedule handling
85       * 
86       */
87      public void schedule() 
88      {
89          // If threads are blocked on this
90          synchronized (this)
91          {
92              // If there is no key, then do nothing
93              if (_key == null || !_key.isValid())
94              {
95                  _readBlocked=false;
96                  _writeBlocked=false;
97                  this.notifyAll();
98                  return;
99              }
100             
101             // If there are threads dispatched reading and writing
102             if (_readBlocked || _writeBlocked)
103             {
104                 // assert _dispatched;
105                 if (_readBlocked && _key.isReadable())
106                     _readBlocked=false;
107                 if (_writeBlocked && _key.isWritable())
108                     _writeBlocked=false;
109 
110                 // wake them up is as good as a dispatched.
111                 this.notifyAll();
112                 
113                 // we are not interested in further selecting
114                 _key.interestOps(0);
115                 return;
116             }
117 
118             // Otherwise if we are still dispatched
119             if (!isReadyForDispatch())
120             {
121                 // we are not interested in further selecting
122                 _key.interestOps(0);
123                 return;
124             }
125 
126 
127             // Remove writeable op
128             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
129             {
130                 // Remove writeable op
131                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
132                 _key.interestOps(_interestOps);
133                 _writable = true; // Once writable is in ops, only removed with dispatch.
134             }
135 
136             if (_dispatched)
137                 _key.interestOps(0);
138             else
139                 dispatch();
140         }
141     }
142     
143     /* ------------------------------------------------------------ */
144     public void dispatch() 
145     {
146         synchronized(this)
147         {
148             if (_dispatched)
149                 _redispatched=true;
150             else
151             {
152                 _dispatched = _manager.dispatch(this);
153                 if(!_dispatched)
154                 {
155                     Log.warn("Dispatched Failed!");
156                     updateKey();
157                 }
158             }
159         }
160     }
161 
162     /* ------------------------------------------------------------ */
163     /**
164      * Called when a dispatched thread is no longer handling the endpoint. 
165      * The selection key operations are updated.
166      * @return If false is returned, the endpoint has been redispatched and 
167      * thread must keep handling the endpoint.
168      */
169     private boolean undispatch()
170     {
171         synchronized (this)
172         {
173             if (_redispatched)
174             {
175                 _redispatched=false;
176                 return false;
177             }
178             _dispatched = false;
179             updateKey();
180         }
181         return true;
182     }
183 
184     /* ------------------------------------------------------------ */
185     public void scheduleIdle()
186     {
187         _selectSet.scheduleIdle(_idleTask);
188     }
189 
190     /* ------------------------------------------------------------ */
191     public void cancelIdle()
192     {
193         _selectSet.cancelIdle(_idleTask);
194     }
195 
196     /* ------------------------------------------------------------ */
197     protected void idleExpired()
198     {
199         try
200         {
201             close();
202         }
203         catch (IOException e)
204         {
205             Log.ignore(e);
206         }
207     }
208 
209     /* ------------------------------------------------------------ */
210     /*
211      */
212     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
213     {
214         int l = super.flush(header, buffer, trailer);
215         _writable = l!=0;
216         return l;
217     }
218 
219     /* ------------------------------------------------------------ */
220     /*
221      */
222     public int flush(Buffer buffer) throws IOException
223     {
224         int l = super.flush(buffer);
225         _writable = l!=0;
226         return l;
227     }
228 
229     /* ------------------------------------------------------------ */
230     public boolean isReadyForDispatch()
231     {
232         synchronized (this)
233         {
234             return !(_dispatched || getConnection().isSuspended());
235         }
236     }
237     
238     /* ------------------------------------------------------------ */
239     /*
240      * Allows thread to block waiting for further events.
241      */
242     public boolean blockReadable(long timeoutMs) throws IOException
243     {
244         synchronized (this)
245         {
246             long start=_selectSet.getNow();
247             try
248             {   
249                 _readBlocked=true;
250                 while (isOpen() && _readBlocked)
251                 {
252                     try
253                     {
254                         updateKey();
255                         this.wait(timeoutMs);
256 
257                         if (_readBlocked && timeoutMs<(_selectSet.getNow()-start))
258                             return false;
259                     }
260                     catch (InterruptedException e)
261                     {
262                         Log.warn(e);
263                     }
264                 }
265             }
266             finally
267             {
268                 _readBlocked=false;
269             }
270         }
271         return true;
272     }
273 
274     /* ------------------------------------------------------------ */
275     /*
276      * Allows thread to block waiting for further events.
277      */
278     public boolean blockWritable(long timeoutMs) throws IOException
279     {
280         synchronized (this)
281         {
282             long start=_selectSet.getNow();
283             try
284             {   
285                 _writeBlocked=true;
286                 while (isOpen() && _writeBlocked)
287                 {
288                     try
289                     {
290                         updateKey();
291                         this.wait(timeoutMs);
292 
293                         if (_writeBlocked && timeoutMs<(_selectSet.getNow()-start))
294                             return false;
295                     }
296                     catch (InterruptedException e)
297                     {
298                         Log.warn(e);
299                     }
300                 }
301             }
302             finally
303             {
304                 _writeBlocked=false;
305             }
306         }
307         return true;
308     }
309 
310     /* ------------------------------------------------------------ */
311     public void setWritable(boolean writable)
312     {
313         _writable=writable;
314     }
315     
316     /* ------------------------------------------------------------ */
317     public void scheduleWrite()
318     {
319         _writable=false;
320         updateKey();
321     }
322     
323     /* ------------------------------------------------------------ */
324     /**
325      * Updates selection key. Adds operations types to the selection key as needed. No operations
326      * are removed as this is only done during dispatch. This method records the new key and
327      * schedules a call to doUpdateKey to do the keyChange
328      */
329     private void updateKey()
330     {
331         synchronized (this)
332         {
333             int ops=-1;
334             if (getChannel().isOpen())
335             {
336                 _interestOps = 
337                     ((!_dispatched || _readBlocked)  ? SelectionKey.OP_READ  : 0) 
338                 |   ((!_writable   || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
339                 try
340                 {
341                     ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
342                 }
343                 catch(Exception e)
344                 {
345                     _key=null;
346                     Log.ignore(e);
347                 }
348             }
349 
350             if(_interestOps == ops && getChannel().isOpen())
351                 return;
352             
353         }
354         _selectSet.addChange(this);
355         _selectSet.wakeup();
356     }
357     
358     /* ------------------------------------------------------------ */
359     /**
360      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
361      */
362     void doUpdateKey()
363     {
364         synchronized (this)
365         {
366             if (getChannel().isOpen())
367             {
368                 if (_interestOps>0)
369                 {
370                     if (_key==null || !_key.isValid())
371                     {
372                         SelectableChannel sc = (SelectableChannel)getChannel();
373                         if (sc.isRegistered())
374                         {
375                             updateKey();   
376                         }
377                         else
378                         {
379                             try
380                             {
381                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
382                             }
383                             catch (Exception e)
384                             {
385                                 Log.ignore(e);
386                                 if (_key!=null && _key.isValid())
387                                 {
388                                     _key.cancel();
389                                 }
390                                 cancelIdle();
391 
392                                 if (_open)
393                                     _manager.endPointClosed(this);
394                                 _open=false;
395                                 _key = null;
396                             }
397                         }
398                     }
399                     else
400                     {
401                         _key.interestOps(_interestOps);
402                     }
403                 }
404                 else
405                 {
406                     if (_key.isValid())
407                         _key.interestOps(0);
408                     else
409                         _key=null;
410                 }
411             }
412             else    
413             {
414                 if (_key!=null && _key.isValid())
415                     _key.cancel(); 
416                 
417                 cancelIdle();
418                 if (_open)
419                     _manager.endPointClosed(this);
420                 _open=false;
421                 _key = null;
422             }
423         }
424     }
425 
426     /* ------------------------------------------------------------ */
427     /* 
428      */
429     public void run()
430     {
431         boolean dispatched=true;
432         try
433         {
434             while(dispatched)
435             {
436                 try
437                 {
438                     _connection.handle();
439                 }
440                 catch (ClosedChannelException e)
441                 {
442                     Log.ignore(e);
443                 }
444                 catch (EofException e)
445                 {
446                     Log.debug("EOF", e);
447                     try{close();}
448                     catch(IOException e2){Log.ignore(e2);}
449                 }
450                 catch (IOException e)
451                 {
452                     Log.warn(e.toString());
453                     Log.debug(e);
454                     try{close();}
455                     catch(IOException e2){Log.ignore(e2);}
456                 }
457                 catch (Throwable e)
458                 {
459                     Log.warn("handle failed", e);
460                     try{close();}
461                     catch(IOException e2){Log.ignore(e2);}
462                 }
463                 dispatched=!undispatch();
464             }
465         }
466         finally
467         {
468             if (dispatched)
469             {
470                 dispatched=!undispatch();
471                 while (dispatched)
472                 {
473                     Log.warn("SCEP.run() finally DISPATCHED");
474                     dispatched=!undispatch();
475                 }
476             }
477         }
478     }
479 
480     /* ------------------------------------------------------------ */
481     /*
482      * @see org.eclipse.io.nio.ChannelEndPoint#close()
483      */
484     public void close() throws IOException
485     {
486         try
487         {
488             super.close();
489         }
490         catch (IOException e)
491         {
492             Log.ignore(e);
493         }   
494         finally
495         {
496             updateKey();
497         }
498     }
499     
500     /* ------------------------------------------------------------ */
501     public String toString()
502     {
503         synchronized(this)
504         {
505             return "SCEP@" + hashCode() + "\t[d=" + _dispatched + ",io=" + _interestOps+
506             ",w=" + _writable + ",b=" + _readBlocked + "|" + _writeBlocked + "]";
507         }
508     }
509 
510     /* ------------------------------------------------------------ */
511     public Timeout.Task getTimeoutTask()
512     {
513         return _idleTask;
514     }
515 
516     /* ------------------------------------------------------------ */
517     public SelectSet getSelectSet()
518     {
519         return _selectSet;
520     }
521 
522     /* ------------------------------------------------------------ */
523     /* ------------------------------------------------------------ */
524     /* ------------------------------------------------------------ */
525     private class IdleTask extends Timeout.Task
526     {
527         /* ------------------------------------------------------------ */
528         /*
529          * @see org.eclipse.thread.Timeout.Task#expire()
530          */
531         public void expired()
532         {
533             idleExpired();
534         }
535 
536         public String toString()
537         {
538             return "TimeoutTask:" + SelectChannelEndPoint.this.toString();
539         }
540 
541     }
542 
543 }