View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.ClosedChannelException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  import java.util.concurrent.atomic.AtomicInteger;
22  
23  import org.eclipse.jetty.io.AsyncEndPoint;
24  import org.eclipse.jetty.io.Buffer;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.thread.Timeout;
30  
31  /* ------------------------------------------------------------ */
32  /**
33   * An Endpoint that can be scheduled by {@link SelectorManager}.
34   * 
35   * 
36   *
37   */
38  public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint
39  {
40      private final SelectorManager.SelectSet _selectSet;
41      private final Connection _connection;
42      private final SelectorManager _manager;
43      private boolean _dispatched = false;
44      private boolean _redispatched = false;
45      private volatile boolean _writable = true; 
46      
47      private  SelectionKey _key;
48      private int _interestOps;
49      private boolean _readBlocked;
50      private boolean _writeBlocked;
51      private boolean _open;
52      private final Timeout.Task _idleTask = new IdleTask();
53  
54      /* ------------------------------------------------------------ */
55      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
56      {
57          super(channel);
58  
59          _manager = selectSet.getManager();
60          _selectSet = selectSet;
61          _connection = _manager.newConnection(channel,this);
62          _dispatched = false;
63          _redispatched = false;
64          _open=true;
65          _manager.endPointOpened(this);
66          
67          _key = key;
68          scheduleIdle();
69      }
70      
71      /* ------------------------------------------------------------ */
72      public Connection getConnection()
73      {
74          return _connection;
75      }
76      
77      /* ------------------------------------------------------------ */
78      public SelectorManager getSelectManager()
79      {
80          return _manager;
81      }
82  
83      /* ------------------------------------------------------------ */
84      /** Called by selectSet to schedule handling
85       * 
86       */
87      public void schedule() 
88      {
89          synchronized (this)
90          {
91              // If there is no key, then do nothing
92              if (_key == null || !_key.isValid())
93              {
94                  _readBlocked=false;
95                  _writeBlocked=false;
96                  this.notifyAll();
97                  return;
98              }
99              
100             // If there are threads dispatched reading and writing
101             if (_readBlocked || _writeBlocked)
102             {
103                 // assert _dispatched;
104                 if (_readBlocked && _key.isReadable())
105                     _readBlocked=false;
106                 if (_writeBlocked && _key.isWritable())
107                     _writeBlocked=false;
108 
109                 // wake them up is as good as a dispatched.
110                 this.notifyAll();
111                 
112                 // we are not interested in further selecting
113                 _key.interestOps(0);
114                 return;
115             }
116 
117             // Otherwise if we are still dispatched
118             if (!isReadyForDispatch())
119             {
120                 // we are not interested in further selecting
121                 _key.interestOps(0);
122                 return;
123             }
124 
125 
126             // Remove writeable op
127             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
128             {
129                 // Remove writeable op
130                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
131                 _key.interestOps(_interestOps);
132                 _writable = true; // Once writable is in ops, only removed with dispatch.
133             }
134 
135             if (_dispatched)
136                 _key.interestOps(0);
137             else
138                 dispatch();
139         }
140     }
141     
142     /* ------------------------------------------------------------ */
143     public void dispatch() 
144     {
145         synchronized(this)
146         {
147             if (_dispatched)
148                 _redispatched=true;
149             else
150             {
151                 _dispatched = _manager.dispatch(this);
152                 if(!_dispatched)
153                 {
154                     Log.warn("Dispatched Failed!");
155                     updateKey();
156                 }
157             }
158         }
159     }
160 
161     /* ------------------------------------------------------------ */
162     /**
163      * Called when a dispatched thread is no longer handling the endpoint. 
164      * The selection key operations are updated.
165      * @return If false is returned, the endpoint has been redispatched and 
166      * thread must keep handling the endpoint.
167      */
168     private boolean undispatch()
169     {
170         synchronized (this)
171         {
172             if (_redispatched)
173             {
174                 _redispatched=false;
175                 return false;
176             }
177             _dispatched = false;
178             updateKey();
179         }
180         return true;
181     }
182 
183     /* ------------------------------------------------------------ */
184     public void scheduleIdle()
185     {
186         _selectSet.scheduleIdle(_idleTask);
187     }
188 
189     /* ------------------------------------------------------------ */
190     public void cancelIdle()
191     {
192         _selectSet.cancelIdle(_idleTask);
193     }
194 
195     /* ------------------------------------------------------------ */
196     protected void idleExpired()
197     {
198         try
199         {
200             close();
201         }
202         catch (IOException e)
203         {
204             Log.ignore(e);
205         }
206     }
207 
208     /* ------------------------------------------------------------ */
209     /*
210      */
211     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
212     {
213         int l = super.flush(header, buffer, trailer);
214         _writable = l!=0;
215         return l;
216     }
217 
218     /* ------------------------------------------------------------ */
219     /*
220      */
221     public int flush(Buffer buffer) throws IOException
222     {
223         int l = super.flush(buffer);
224         _writable = l!=0;
225         return l;
226     }
227 
228     /* ------------------------------------------------------------ */
229     public boolean isReadyForDispatch()
230     {
231         synchronized (this)
232         {
233             return !(_dispatched || getConnection().isSuspended());
234         }
235     }
236     
237     /* ------------------------------------------------------------ */
238     /*
239      * Allows thread to block waiting for further events.
240      */
241     public boolean blockReadable(long timeoutMs) throws IOException
242     {
243         synchronized (this)
244         {
245             long start=_selectSet.getNow();
246             try
247             {   
248                 _readBlocked=true;
249                 while (isOpen() && _readBlocked)
250                 {
251                     try
252                     {
253                         updateKey();
254                         this.wait(timeoutMs);
255 
256                         if (_readBlocked && timeoutMs<(_selectSet.getNow()-start))
257                             return false;
258                     }
259                     catch (InterruptedException e)
260                     {
261                         Log.warn(e);
262                     }
263                 }
264             }
265             finally
266             {
267                 _readBlocked=false;
268             }
269         }
270         return true;
271     }
272 
273     /* ------------------------------------------------------------ */
274     /*
275      * Allows thread to block waiting for further events.
276      */
277     public boolean blockWritable(long timeoutMs) throws IOException
278     {
279         synchronized (this)
280         {
281             long start=_selectSet.getNow();
282             try
283             {   
284                 _writeBlocked=true;
285                 while (isOpen() && _writeBlocked)
286                 {
287                     try
288                     {
289                         updateKey();
290                         this.wait(timeoutMs);
291 
292                         if (_writeBlocked && timeoutMs<(_selectSet.getNow()-start))
293                             return false;
294                     }
295                     catch (InterruptedException e)
296                     {
297                         Log.warn(e);
298                     }
299                 }
300             }
301             finally
302             {
303                 _writeBlocked=false;
304             }
305         }
306         return true;
307     }
308 
309     /* ------------------------------------------------------------ */
310     public void setWritable(boolean writable)
311     {
312         _writable=writable;
313     }
314     
315     /* ------------------------------------------------------------ */
316     public void scheduleWrite()
317     {
318         _writable=false;
319         updateKey();
320     }
321     
322     /* ------------------------------------------------------------ */
323     /**
324      * Updates selection key. Adds operations types to the selection key as needed. No operations
325      * are removed as this is only done during dispatch. This method records the new key and
326      * schedules a call to doUpdateKey to do the keyChange
327      */
328     private void updateKey()
329     {
330         synchronized (this)
331         {
332             int ops=-1;
333             if (getChannel().isOpen())
334             {
335                 _interestOps = 
336                     ((!_dispatched || _readBlocked)  ? SelectionKey.OP_READ  : 0) 
337                 |   ((!_writable   || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
338                 try
339                 {
340                     ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
341                 }
342                 catch(Exception e)
343                 {
344                     _key=null;
345                     Log.ignore(e);
346                 }
347             }
348 
349             if(_interestOps == ops && getChannel().isOpen())
350                 return;
351             
352         }
353         _selectSet.addChange(this);
354         _selectSet.wakeup();
355     }
356     
357     /* ------------------------------------------------------------ */
358     /**
359      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
360      */
361     void doUpdateKey()
362     {
363         synchronized (this)
364         {
365             if (getChannel().isOpen())
366             {
367                 if (_interestOps>0)
368                 {
369                     if (_key==null || !_key.isValid())
370                     {
371                         SelectableChannel sc = (SelectableChannel)getChannel();
372                         if (sc.isRegistered())
373                         {
374                             updateKey();   
375                         }
376                         else
377                         {
378                             try
379                             {
380                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
381                             }
382                             catch (Exception e)
383                             {
384                                 Log.ignore(e);
385                                 if (_key!=null && _key.isValid())
386                                 {
387                                     _key.cancel();
388                                 }
389                                 cancelIdle();
390 
391                                 if (_open)
392                                     _manager.endPointClosed(this);
393                                 _open=false;
394                                 _key = null;
395                             }
396                         }
397                     }
398                     else
399                     {
400                         _key.interestOps(_interestOps);
401                     }
402                 }
403                 else
404                 {
405                     if (_key.isValid())
406                         _key.interestOps(0);
407                     else
408                         _key=null;
409                 }
410             }
411             else    
412             {
413                 if (_key!=null && _key.isValid())
414                     _key.cancel(); 
415                 
416                 cancelIdle();
417                 if (_open)
418                     _manager.endPointClosed(this);
419                 _open=false;
420                 _key = null;
421             }
422         }
423     }
424 
425     /* ------------------------------------------------------------ */
426     /* 
427      */
428     public void run()
429     {
430         boolean dispatched=true;
431         try
432         {
433             while(dispatched)
434             {
435                 try
436                 {
437                     _connection.handle();
438                 }
439                 catch (ClosedChannelException e)
440                 {
441                     Log.ignore(e);
442                 }
443                 catch (EofException e)
444                 {
445                     Log.debug("EOF", e);
446                     try{close();}
447                     catch(IOException e2){Log.ignore(e2);}
448                 }
449                 catch (IOException e)
450                 {
451                     Log.warn(e.toString());
452                     Log.debug(e);
453                     try{close();}
454                     catch(IOException e2){Log.ignore(e2);}
455                 }
456                 catch (Throwable e)
457                 {
458                     Log.warn("handle failed", e);
459                     try{close();}
460                     catch(IOException e2){Log.ignore(e2);}
461                 }
462                 dispatched=!undispatch();
463             }
464         }
465         finally
466         {
467             if (dispatched)
468             {
469                 dispatched=!undispatch();
470                 while (dispatched)
471                 {
472                     Log.warn("SCEP.run() finally DISPATCHED");
473                     dispatched=!undispatch();
474                 }
475             }
476         }
477     }
478 
479     /* ------------------------------------------------------------ */
480     /*
481      * @see org.eclipse.io.nio.ChannelEndPoint#close()
482      */
483     public void close() throws IOException
484     {
485         try
486         {
487             super.close();
488         }
489         catch (IOException e)
490         {
491             Log.ignore(e);
492         }   
493         finally
494         {
495             updateKey();
496         }
497     }
498     
499     /* ------------------------------------------------------------ */
500     public String toString()
501     {
502         synchronized(this)
503         {
504             return "SCEP@" + hashCode() + "\t[d=" + _dispatched + ",io=" + _interestOps+
505             ",w=" + _writable + ",b=" + _readBlocked + "|" + _writeBlocked + "]";
506         }
507     }
508 
509     /* ------------------------------------------------------------ */
510     public Timeout.Task getTimeoutTask()
511     {
512         return _idleTask;
513     }
514 
515     /* ------------------------------------------------------------ */
516     public SelectSet getSelectSet()
517     {
518         return _selectSet;
519     }
520 
521     /* ------------------------------------------------------------ */
522     /* ------------------------------------------------------------ */
523     /* ------------------------------------------------------------ */
524     private class IdleTask extends Timeout.Task
525     {
526         /* ------------------------------------------------------------ */
527         /*
528          * @see org.eclipse.thread.Timeout.Task#expire()
529          */
530         public void expired()
531         {
532             idleExpired();
533         }
534 
535         public String toString()
536         {
537             return "TimeoutTask:" + SelectChannelEndPoint.this.toString();
538         }
539 
540     }
541 
542 }