View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.CancelledKeyException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.Selector;
21  import java.nio.channels.ServerSocketChannel;
22  import java.nio.channels.SocketChannel;
23  import java.util.ArrayList;
24  import java.util.Date;
25  import java.util.Iterator;
26  import java.util.List;
27  
28  import org.eclipse.jetty.io.Connection;
29  import org.eclipse.jetty.io.EndPoint;
30  import org.eclipse.jetty.util.component.AbstractLifeCycle;
31  import org.eclipse.jetty.util.log.Log;
32  import org.eclipse.jetty.util.thread.Timeout;
33  
34  
35  /* ------------------------------------------------------------ */
36  /**
37   * The Selector Manager manages and number of SelectSets to allow
38   * NIO scheduling to scale to large numbers of connections.
39   * 
40   * 
41   *
42   */
43  public abstract class SelectorManager extends AbstractLifeCycle
44  {
45      private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.eclipse.jetty.io.nio.JVMBUG_THRESHHOLD",128);
46      private static final int __JVMBUG_THRESHHOLD2=__JVMBUG_THRESHHOLD*2;
47      private static final int __JVMBUG_THRESHHOLD1=(__JVMBUG_THRESHHOLD2+__JVMBUG_THRESHHOLD)/2;
48      private long _maxIdleTime;
49      private long _lowResourcesConnections;
50      private long _lowResourcesMaxIdleTime;
51      private transient SelectSet[] _selectSet;
52      private int _selectSets=1;
53      private volatile int _set;
54      private boolean _jvmBug0;
55      private boolean _jvmBug1;
56      
57  
58      /* ------------------------------------------------------------ */
59      /**
60       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
61       * @see {@link #setLowResourcesMaxIdleTime(long)}
62       */
63      public void setMaxIdleTime(long maxIdleTime)
64      {
65          _maxIdleTime=maxIdleTime;
66      }
67      
68      /* ------------------------------------------------------------ */
69      /**
70       * @param selectSets number of select sets to create
71       */
72      public void setSelectSets(int selectSets)
73      {
74          long lrc = _lowResourcesConnections * _selectSets; 
75          _selectSets=selectSets;
76          _lowResourcesConnections=lrc/_selectSets;
77      }
78      
79      /* ------------------------------------------------------------ */
80      /**
81       * @return
82       */
83      public long getMaxIdleTime()
84      {
85          return _maxIdleTime;
86      }
87      
88      /* ------------------------------------------------------------ */
89      /**
90       * @return
91       */
92      public int getSelectSets()
93      {
94          return _selectSets;
95      }
96      
97      /* ------------------------------------------------------------ */
98      /** Register a channel
99       * @param channel
100      * @param att Attached Object
101      * @throws IOException
102      */
103     public void register(SocketChannel channel, Object att)
104     {
105         int s=_set++; 
106         s=s%_selectSets;
107         SelectSet[] sets=_selectSet;
108         if (sets!=null)
109         {
110             SelectSet set=sets[s];
111             set.addChange(channel,att);
112             set.wakeup();
113         }
114     }
115     
116     /* ------------------------------------------------------------ */
117     /** Register a serverchannel
118      * @param acceptChannel
119      * @return
120      * @throws IOException
121      */
122     public void register(ServerSocketChannel acceptChannel)
123     {
124         int s=_set++; 
125         s=s%_selectSets;
126         SelectSet set=_selectSet[s];
127         set.addChange(acceptChannel);
128         set.wakeup();
129     }
130 
131     /* ------------------------------------------------------------ */
132     /**
133      * @return the lowResourcesConnections
134      */
135     public long getLowResourcesConnections()
136     {
137         return _lowResourcesConnections*_selectSets;
138     }
139 
140     /* ------------------------------------------------------------ */
141     /**
142      * Set the number of connections, which if exceeded places this manager in low resources state.
143      * This is not an exact measure as the connection count is averaged over the select sets.
144      * @param lowResourcesConnections the number of connections
145      * @see {@link #setLowResourcesMaxIdleTime(long)}
146      */
147     public void setLowResourcesConnections(long lowResourcesConnections)
148     {
149         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
150     }
151 
152     /* ------------------------------------------------------------ */
153     /**
154      * @return the lowResourcesMaxIdleTime
155      */
156     public long getLowResourcesMaxIdleTime()
157     {
158         return _lowResourcesMaxIdleTime;
159     }
160 
161     /* ------------------------------------------------------------ */
162     /**
163      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
164      * @see {@link #setMaxIdleTime(long)}
165      */
166     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
167     {
168         _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
169     }
170     
171     /* ------------------------------------------------------------ */
172     /**
173      * @param acceptorID
174      * @throws IOException
175      */
176     public void doSelect(int acceptorID) throws IOException
177     {
178         SelectSet[] sets= _selectSet;
179         if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
180             sets[acceptorID].doSelect();
181     }
182 
183     /* ------------------------------------------------------------ */
184     /**
185      * @param key
186      * @return
187      * @throws IOException 
188      */
189     protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
190 
191     /* ------------------------------------------------------------------------------- */
192     public abstract boolean dispatch(Runnable task);
193 
194     /* ------------------------------------------------------------ */
195     /* (non-Javadoc)
196      * @see org.eclipse.component.AbstractLifeCycle#doStart()
197      */
198     protected void doStart() throws Exception
199     {
200         _selectSet = new SelectSet[_selectSets];
201         for (int i=0;i<_selectSet.length;i++)
202             _selectSet[i]= new SelectSet(i);
203 
204         super.doStart();
205     }
206 
207 
208     /* ------------------------------------------------------------------------------- */
209     protected void doStop() throws Exception
210     {
211         SelectSet[] sets= _selectSet;
212         _selectSet=null;
213         if (sets!=null)
214         {
215             for (SelectSet set : sets)
216             {
217                 if (set!=null)
218                     set.stop();
219             }
220         }
221         super.doStop();
222     }
223 
224     /* ------------------------------------------------------------ */
225     /**
226      * @param endpoint
227      */
228     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
229 
230     /* ------------------------------------------------------------ */
231     /**
232      * @param endpoint
233      */
234     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
235 
236     /* ------------------------------------------------------------------------------- */
237     protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
238 
239     /* ------------------------------------------------------------ */
240     /**
241      * @param channel
242      * @param selectSet
243      * @param sKey
244      * @return
245      * @throws IOException
246      */
247     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
248 
249     /* ------------------------------------------------------------------------------- */
250     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
251     {
252         Log.warn(ex+","+channel+","+attachment);
253         Log.debug(ex);
254     }
255 
256     /* ------------------------------------------------------------------------------- */
257     public void dump()
258     {
259         for (final SelectSet set :_selectSet)
260         {
261             Thread selecting = set._selecting;
262             Log.info("SelectSet "+set._setID+" : "+selecting);
263             if (selecting!=null)
264             {
265                 StackTraceElement[] trace =selecting.getStackTrace();
266                 if (trace!=null)
267                 {
268                     for (StackTraceElement e : trace)
269                     {
270                         Log.info("\tat "+e.toString());
271                     }
272                 }
273             }
274                 
275             set.addChange(new ChangeTask(){
276                 public void run()
277                 {
278                     set.dump();
279                 }
280             });
281         }
282     }
283     
284     
285     /* ------------------------------------------------------------------------------- */
286     /* ------------------------------------------------------------------------------- */
287     /* ------------------------------------------------------------------------------- */
288     public class SelectSet 
289     {
290         private final int _setID;
291         private final Timeout _idleTimeout;
292         private final Timeout _timeout;
293         private final List<Object>[] _changes;
294 
295         private int _change;
296         private int _nextSet;
297         private Selector _selector;
298         private int _jvmBug;
299         private volatile Thread _selecting;
300         private long _lastJVMBug;
301         
302         /* ------------------------------------------------------------ */
303         SelectSet(int acceptorID) throws Exception
304         {
305             _setID=acceptorID;
306 
307             _idleTimeout = new Timeout(this);
308             _idleTimeout.setDuration(getMaxIdleTime());
309             _timeout = new Timeout(this);
310             _timeout.setDuration(0L);
311             _changes = new List[] {new ArrayList(),new ArrayList()};
312 
313             // create a selector;
314             _selector = Selector.open();
315             _change=0;
316         }
317         
318         /* ------------------------------------------------------------ */
319         public void addChange(Object point)
320         {
321             synchronized (_changes)
322             {
323                 _changes[_change].add(point);
324             }
325         }
326         
327         /* ------------------------------------------------------------ */
328         public void addChange(SelectableChannel channel, Object att)
329         {   
330             if (att==null)
331                 addChange(channel);
332             else if (att instanceof EndPoint)
333                 addChange(att);
334             else
335                 addChange(new ChangeSelectableChannel(channel,att));
336         }
337         
338         /* ------------------------------------------------------------ */
339         public void cancelIdle(Timeout.Task task)
340         {
341             task.cancel();
342         }
343 
344         /* ------------------------------------------------------------ */
345         /**
346          * Select and dispatch tasks found from changes and the selector.
347          * 
348          * @throws IOException
349          */
350         public void doSelect() throws IOException
351         {
352             try
353             {
354                 _selecting=Thread.currentThread();
355                 List<?> changes;
356                 final Selector selector;
357                 synchronized (_changes)
358                 {
359                     changes=_changes[_change];
360                     _change=_change==0?1:0;
361                     selector=_selector;
362                 }
363                 
364 
365                 // Make any key changes required
366                 final int size=changes.size();
367                 for (int i = 0; i < size; i++)
368                 {
369                     try
370                     {
371                         Object o = changes.get(i);
372                         
373                         if (o instanceof EndPoint)
374                         {
375                             // Update the operations for a key.
376                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
377                             endpoint.doUpdateKey();
378                         }
379                         else if (o instanceof Runnable)
380                         {
381                             dispatch((Runnable)o);
382                         }
383                         else if (o instanceof ChangeSelectableChannel)
384                         {
385                             // finish accepting/connecting this connection
386                             final ChangeSelectableChannel asc = (ChangeSelectableChannel)o;
387                             final SelectableChannel channel=asc._channel;
388                             final Object att = asc._attachment;
389 
390                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
391                             {
392                                 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
393                                 SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key);
394                                 key.attach(endpoint);
395                                 endpoint.schedule();
396                             }
397                             else if (channel.isOpen())
398                             {
399                                 channel.register(selector,SelectionKey.OP_CONNECT,att);
400                             }
401                         }
402                         else if (o instanceof SocketChannel)
403                         {
404                             final SocketChannel channel=(SocketChannel)o;
405 
406                             if (channel.isConnected())
407                             {
408                                 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
409                                 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
410                                 key.attach(endpoint);
411                                 endpoint.schedule();
412                             }
413                             else if (channel.isOpen())
414                             {
415                                 channel.register(selector,SelectionKey.OP_CONNECT,null);
416                             }
417                         }
418                         else if (o instanceof ServerSocketChannel)
419                         {
420                             ServerSocketChannel channel = (ServerSocketChannel)o;
421                             channel.register(getSelector(),SelectionKey.OP_ACCEPT);
422                         }
423                         else if (o instanceof ChangeTask)
424                         {
425                             ((ChangeTask)o).run();
426                         }
427                         else
428                             throw new IllegalArgumentException(o.toString());
429                     }
430                     catch (CancelledKeyException e)
431                     {
432                         if (isRunning())
433                             Log.warn(e);
434                         else
435                             Log.debug(e);
436                     }
437                 }
438                 changes.clear();
439 
440                 long idle_next;
441                 long retry_next;
442                 long now=System.currentTimeMillis();
443                 synchronized (this)
444                 {
445                     _idleTimeout.setNow(now);
446                     _timeout.setNow(now);
447                     
448                     if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)
449                         _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
450                     else 
451                         _idleTimeout.setDuration(_maxIdleTime);
452                     idle_next=_idleTimeout.getTimeToNext();
453                     retry_next=_timeout.getTimeToNext();
454                 }
455 
456                 // workout how low to wait in select
457                 long wait = 1000L;  // not getMaxIdleTime() as the now value of the idle timers needs to be updated.
458                 if (idle_next >= 0 && wait > idle_next)
459                     wait = idle_next;
460                 if (wait > 0 && retry_next >= 0 && wait > retry_next)
461                     wait = retry_next;
462     
463                 // Do the select.
464                 if (wait > 0) 
465                 {
466                     long before=now;
467                     int selected=selector.select(wait);
468                     now = System.currentTimeMillis();
469                     _idleTimeout.setNow(now);
470                     _timeout.setNow(now);
471 
472                     // Look for JVM bugs
473                     // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
474                     if (__JVMBUG_THRESHHOLD>0 && selected==0 && wait>__JVMBUG_THRESHHOLD && (now-before)<(wait/2) )
475                     {
476                         _jvmBug++;
477                         if (_jvmBug>=(__JVMBUG_THRESHHOLD2))
478                         {
479                             synchronized (this)
480                             {
481                                 _lastJVMBug=now;
482                                 if (_jvmBug1)
483                                     Log.debug("seeing JVM BUG(s) - recreating selector");
484                                 else
485                                 {
486                                     _jvmBug1=true;
487                                     Log.info("seeing JVM BUG(s) - recreating selector");
488                                 }
489                                 // BLOODY SUN BUG !!!  Try refreshing the entire selector.
490                                 final Selector new_selector = Selector.open();
491                                 for (SelectionKey k: selector.keys())
492                                 {
493                                     if (!k.isValid() || k.interestOps()==0)
494                                         continue;
495                                     
496                                     final SelectableChannel channel = k.channel();
497                                     final Object attachment = k.attachment();
498                                     
499                                     if (attachment==null)
500                                         addChange(channel);
501                                     else
502                                         addChange(channel,attachment);
503                                 }
504                                 _selector.close();
505                                 _selector=new_selector;
506                                 _jvmBug=0;
507                                 return;
508                             }
509                         }
510                         else if (_jvmBug==__JVMBUG_THRESHHOLD || _jvmBug==__JVMBUG_THRESHHOLD1)
511                         {
512                             // Cancel keys with 0 interested ops
513                             if (_jvmBug0)
514                                 Log.debug("seeing JVM BUG(s) - cancelling interestOps==0");
515                             else
516                             {
517                                 _jvmBug0=true;
518                                 Log.info("seeing JVM BUG(s) - cancelling interestOps==0");
519                             }
520                             for (SelectionKey k: selector.keys())
521                             {
522                                 if (k.isValid()&&k.interestOps()==0)
523                                 {
524                                     k.cancel();
525                                 }
526                             }
527                             return;
528                         }
529                     }
530                     else
531                         _jvmBug=0;
532                 }
533                 else 
534                 {
535                     selector.selectNow();
536                     _jvmBug=0;
537                 }
538 
539                 // have we been destroyed while sleeping
540                 if (_selector==null || !selector.isOpen())
541                     return;
542 
543                 // Look for things to do
544                 for (SelectionKey key: selector.selectedKeys())
545                 {   
546                     try
547                     {
548                         if (!key.isValid())
549                         {
550                             key.cancel();
551                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
552                             if (endpoint != null)
553                                 endpoint.doUpdateKey();
554                             continue;
555                         }
556 
557                         Object att = key.attachment();
558                         if (att instanceof SelectChannelEndPoint)
559                         {
560                             ((SelectChannelEndPoint)att).schedule();
561                         }
562                         else if (key.isAcceptable())
563                         {
564                             SocketChannel channel = acceptChannel(key);
565                             if (channel==null)
566                                 continue;
567 
568                             channel.configureBlocking(false);
569 
570                             // TODO make it reluctant to leave 0
571                             _nextSet=++_nextSet%_selectSet.length;
572 
573                             // Is this for this selectset
574                             if (_nextSet==_setID)
575                             {
576                                 // bind connections to this select set.
577                                 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
578                                 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
579                                 cKey.attach(endpoint);
580                                 if (endpoint != null)
581                                     endpoint.schedule();
582                             }
583                             else
584                             {
585                                 // nope - give it to another.
586                                 _selectSet[_nextSet].addChange(channel);
587                                 _selectSet[_nextSet].wakeup();
588                             }
589                         }
590                         else if (key.isConnectable())
591                         {
592                             // Complete a connection of a registered channel
593                             SocketChannel channel = (SocketChannel)key.channel();
594                             boolean connected=false;
595                             try
596                             {
597                                 connected=channel.finishConnect();
598                             }
599                             catch(Exception e)
600                             {
601                                 connectionFailed(channel,e,att);
602                             }
603                             finally
604                             {
605                                 if (connected)
606                                 {
607                                     key.interestOps(SelectionKey.OP_READ);
608                                     SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
609                                     key.attach(endpoint);
610                                     endpoint.schedule();
611                                 }
612                                 else
613                                 {
614                                     key.cancel();
615                                 }
616                             }
617                         }
618                         else
619                         {
620                             // Wrap readable registered channel in an endpoint
621                             SocketChannel channel = (SocketChannel)key.channel();
622                             SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
623                             key.attach(endpoint);
624                             if (key.isReadable())
625                                 endpoint.schedule();                           
626                         }
627                         key = null;
628                     }
629                     catch (CancelledKeyException e)
630                     {
631                         Log.ignore(e);
632                     }
633                     catch (Exception e)
634                     {
635                         if (isRunning())
636                             Log.warn(e);
637                         else
638                             Log.ignore(e);
639 
640                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
641                             key.cancel();
642                     }
643                 }
644                 
645                 // Everything always handled
646                 selector.selectedKeys().clear();
647                 
648                 // tick over the timers
649                 _idleTimeout.tick(now);
650                 _timeout.tick(now);
651             }
652             catch (CancelledKeyException e)
653             {
654                 Log.ignore(e);
655             }
656             finally
657             {
658                 _selecting=null;
659             }
660         }
661 
662         /* ------------------------------------------------------------ */
663         public SelectorManager getManager()
664         {
665             return SelectorManager.this;
666         }
667 
668         /* ------------------------------------------------------------ */
669         public long getNow()
670         {
671             return _idleTimeout.getNow();
672         }
673         
674         /* ------------------------------------------------------------ */
675         public void scheduleIdle(Timeout.Task task)
676         {
677             if (_idleTimeout.getDuration() <= 0)
678                 return;
679             _idleTimeout.schedule(task);
680         }
681 
682         /* ------------------------------------------------------------ */
683         public void scheduleTimeout(Timeout.Task task, long timeoutMs)
684         {
685             _timeout.schedule(task, timeoutMs);
686         }
687         
688         /* ------------------------------------------------------------ */
689         public void cancelTimeout(Timeout.Task task)
690         {
691             task.cancel();
692         }
693 
694         /* ------------------------------------------------------------ */
695         public void wakeup()
696         {
697             Selector selector = _selector;
698             if (selector!=null)
699                 selector.wakeup();
700         }
701 
702         /* ------------------------------------------------------------ */
703         Selector getSelector()
704         {
705             return _selector;
706         }
707 
708         /* ------------------------------------------------------------ */
709         void stop() throws Exception
710         {
711             boolean selecting=true;
712             while(selecting)
713             {
714                 wakeup();
715                 selecting=_selecting!=null;
716             }
717 
718             for (SelectionKey key:_selector.keys())
719             {
720                 if (key==null)
721                     continue;
722                 Object att=key.attachment();
723                 if (att instanceof EndPoint)
724                 {
725                     EndPoint endpoint = (EndPoint)att;
726                     try
727                     {
728                         endpoint.close();
729                     }
730                     catch(IOException e)
731                     {
732                         Log.ignore(e);
733                     }
734                 }
735             }
736             
737             synchronized (this)
738             {
739                 selecting=_selecting!=null;
740                 while(selecting)
741                 {
742                     wakeup();
743                     selecting=_selecting!=null;
744                 }
745                 
746                 _idleTimeout.cancelAll();
747                 _timeout.cancelAll();
748                 try
749                 {
750                     if (_selector != null)
751                         _selector.close();
752                 }
753                 catch (IOException e)
754                 {
755                     Log.ignore(e);
756                 } 
757                 _selector=null;
758             }
759         }
760         
761         public void dump()
762         {
763             synchronized (System.err)
764             {
765                 Selector selector=_selector;
766                 Log.info("SelectSet "+_setID+" "+selector.keys().size()+" lastJVMBug="+new Date(_lastJVMBug));
767                 for (SelectionKey key: selector.keys())
768                 {
769                     if (key.isValid())
770                         Log.info(key.channel()+" "+key.interestOps()+" "+key.readyOps()+" "+key.attachment());
771                     else
772                         Log.info(key.channel()+" - - "+key.attachment());
773                 }
774             }
775         }
776     }
777 
778     /* ------------------------------------------------------------ */
779     private static class ChangeSelectableChannel
780     {
781         final SelectableChannel _channel;
782         final Object _attachment;
783         
784         public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
785         {
786             super();
787             _channel = channel;
788             _attachment = attachment;
789         }
790     }
791 
792     /* ------------------------------------------------------------ */
793     private interface ChangeTask
794     {
795         public void run();
796     }
797 }