View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.util.thread;
15  
16  import java.util.concurrent.ArrayBlockingQueue;
17  import java.util.concurrent.BlockingQueue;
18  import java.util.concurrent.ExecutorService;
19  import java.util.concurrent.LinkedBlockingQueue;
20  import java.util.concurrent.RejectedExecutionException;
21  import java.util.concurrent.SynchronousQueue;
22  import java.util.concurrent.ThreadPoolExecutor;
23  import java.util.concurrent.TimeUnit;
24  
25  import org.eclipse.jetty.util.component.AbstractLifeCycle;
26  import org.eclipse.jetty.util.component.LifeCycle;
27  import org.eclipse.jetty.util.log.Log;
28  
29  /* ------------------------------------------------------------ */
30  /** 
31   * Jetty ThreadPool using java 5 ThreadPoolExecutor
32   * This class wraps a {@link ExecutorService} as a {@link ThreadPool} and 
33   * {@link LifeCycle} interfaces so that it may be used by the Jetty <code>org.eclipse.jetty.server.Server</code>
34   */
35  public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool, LifeCycle
36  {
37      private final ExecutorService _executor;
38  
39      /* ------------------------------------------------------------ */
40      public ExecutorThreadPool(ExecutorService executor)
41      {
42          _executor=executor;
43      }
44      
45      /* ------------------------------------------------------------ */
46      /** constructor.
47       * Wraps an {@link ThreadPoolExecutor}.
48       * Core size is 32, max pool size is 256, pool thread timeout after 60 seconds and
49       * an unbounded {@link LinkedBlockingQueue} is used for the job queue;
50       */
51      public ExecutorThreadPool()
52      {
53          this(new ThreadPoolExecutor(32,256,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()));
54      }
55      
56      /* ------------------------------------------------------------ */
57      /** constructor.
58       * Wraps an {@link ThreadPoolExecutor}.
59       * Core size is 32, max pool size is 256, pool thread timeout after 60 seconds
60       * @param queueSize if -1, an unbounded {@link LinkedBlockingQueue} is used, if 0 then a
61       * {@link SynchronousQueue} is used, other a {@link ArrayBlockingQueue} of the given size is used.
62       */
63      public ExecutorThreadPool(int queueSize)
64      {
65          this(new ThreadPoolExecutor(32,256,60,TimeUnit.SECONDS,
66                  queueSize<0?new LinkedBlockingQueue<Runnable>()
67                          : (queueSize==0?new SynchronousQueue<Runnable>()
68                                  :new ArrayBlockingQueue<Runnable>(queueSize))));
69      }
70  
71      /* ------------------------------------------------------------ */
72      /** constructor.
73       * Wraps an {@link ThreadPoolExecutor} using
74       * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue;
75       */
76      public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime)
77      {
78          this(new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
79      }
80  
81      /* ------------------------------------------------------------ */
82      /** constructor.
83       * Wraps an {@link ThreadPoolExecutor} using
84       * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue;
85       */
86      public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit)
87      {
88          this(new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,new LinkedBlockingQueue<Runnable>()));
89      }
90  
91      /* ------------------------------------------------------------ */
92      public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
93      {
94          this(new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue));
95      }
96  
97  
98      /* ------------------------------------------------------------ */
99      public boolean dispatch(Runnable job)
100     {
101         try
102         {       
103             _executor.execute(job);
104             return true;
105         }
106         catch(RejectedExecutionException e)
107         {
108             Log.warn(e);
109             return false;
110         }
111     }
112 
113     /* ------------------------------------------------------------ */
114     public int getIdleThreads()
115     {
116         if (_executor instanceof ThreadPoolExecutor)
117         {
118             final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
119             return tpe.getPoolSize() -tpe.getActiveCount();
120         }
121         return -1;
122     }
123 
124     /* ------------------------------------------------------------ */
125     public int getThreads()
126     {
127         if (_executor instanceof ThreadPoolExecutor)
128         {
129             final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
130             return tpe.getPoolSize();
131         }
132         return -1;
133     }
134 
135     /* ------------------------------------------------------------ */
136     public boolean isLowOnThreads()
137     {
138         if (_executor instanceof ThreadPoolExecutor)
139         {
140             final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;            
141             return tpe.getTaskCount()>=(tpe.getMaximumPoolSize());
142         }
143         return false;
144     }
145 
146     /* ------------------------------------------------------------ */
147     public void join() throws InterruptedException
148     {
149         _executor.awaitTermination(Long.MAX_VALUE,TimeUnit.MILLISECONDS);
150     }
151 
152     /* ------------------------------------------------------------ */
153     @Override
154     protected void doStart() throws Exception
155     {
156         super.doStart();
157     }
158 
159     /* ------------------------------------------------------------ */
160     @Override
161     protected void doStop() throws Exception
162     {
163         super.doStop();
164         _executor.shutdownNow();
165     }
166 
167 }