View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Deque;
25  import java.util.List;
26  import java.util.Queue;
27  import java.util.concurrent.LinkedBlockingDeque;
28  import java.util.concurrent.atomic.AtomicInteger;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import org.eclipse.jetty.client.api.Connection;
32  import org.eclipse.jetty.client.api.Destination;
33  import org.eclipse.jetty.util.BlockingArrayQueue;
34  import org.eclipse.jetty.util.Callback;
35  import org.eclipse.jetty.util.Promise;
36  import org.eclipse.jetty.util.annotation.ManagedAttribute;
37  import org.eclipse.jetty.util.annotation.ManagedObject;
38  import org.eclipse.jetty.util.component.ContainerLifeCycle;
39  import org.eclipse.jetty.util.component.Dumpable;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  import org.eclipse.jetty.util.thread.Sweeper;
43  
44  @ManagedObject("The connection pool")
45  public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
46  {
47      private static final Logger LOG = Log.getLogger(ConnectionPool.class);
48  
49      private final AtomicInteger connectionCount = new AtomicInteger();
50      private final ReentrantLock lock = new ReentrantLock();
51      private final Destination destination;
52      private final int maxConnections;
53      private final Callback requester;
54      private final Deque<Connection> idleConnections;
55      private final Queue<Connection> activeConnections;
56  
57      public ConnectionPool(Destination destination, int maxConnections, Callback requester)
58      {
59          this.destination = destination;
60          this.maxConnections = maxConnections;
61          this.requester = requester;
62          this.idleConnections = new LinkedBlockingDeque<>(maxConnections);
63          this.activeConnections = new BlockingArrayQueue<>(maxConnections);
64      }
65  
66      @ManagedAttribute(value = "The number of connections", readonly = true)
67      public int getConnectionCount()
68      {
69          return connectionCount.get();
70      }
71  
72      @ManagedAttribute(value = "The number of idle connections", readonly = true)
73      public int getIdleConnectionCount()
74      {
75          return idleConnections.size();
76      }
77  
78      @ManagedAttribute(value = "The number of active connections", readonly = true)
79      public int getActiveConnectionCount()
80      {
81          return activeConnections.size();
82      }
83  
84      public Queue<Connection> getIdleConnections()
85      {
86          return idleConnections;
87      }
88  
89      public Queue<Connection> getActiveConnections()
90      {
91          return activeConnections;
92      }
93  
94      public Connection acquire()
95      {
96          Connection connection = activateIdle();
97          if (connection == null)
98              connection = tryCreate();
99          return connection;
100     }
101 
102     private Connection tryCreate()
103     {
104         while (true)
105         {
106             int current = getConnectionCount();
107             final int next = current + 1;
108 
109             if (next > maxConnections)
110             {
111                 if (LOG.isDebugEnabled())
112                     LOG.debug("Max connections {}/{} reached", current, maxConnections);
113                 // Try again the idle connections
114                 return activateIdle();
115             }
116 
117             if (connectionCount.compareAndSet(current, next))
118             {
119                 if (LOG.isDebugEnabled())
120                     LOG.debug("Connection {}/{} creation", next, maxConnections);
121 
122                 destination.newConnection(new Promise<Connection>()
123                 {
124                     @Override
125                     public void succeeded(Connection connection)
126                     {
127                         if (LOG.isDebugEnabled())
128                             LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection);
129 
130                         idleCreated(connection);
131 
132                         proceed();
133                     }
134 
135                     @Override
136                     public void failed(Throwable x)
137                     {
138                         if (LOG.isDebugEnabled())
139                             LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x);
140 
141                         connectionCount.decrementAndGet();
142 
143                         requester.failed(x);
144                     }
145                 });
146 
147                 // Try again the idle connections
148                 return activateIdle();
149             }
150         }
151     }
152 
153     protected void proceed()
154     {
155         requester.succeeded();
156     }
157 
158     protected void idleCreated(Connection connection)
159     {
160         boolean idle;
161         lock();
162         try
163         {
164             // Use "cold" new connections as last.
165             idle = idleConnections.offerLast(connection);
166         }
167         finally
168         {
169             unlock();
170         }
171 
172         idle(connection, idle);
173     }
174 
175     private Connection activateIdle()
176     {
177         boolean acquired;
178         Connection connection;
179         lock();
180         try
181         {
182             connection = idleConnections.pollFirst();
183             if (connection == null)
184                 return null;
185             acquired = activeConnections.offer(connection);
186         }
187         finally
188         {
189             unlock();
190         }
191 
192         if (acquired)
193         {
194             if (LOG.isDebugEnabled())
195                 LOG.debug("Connection active {}", connection);
196             acquired(connection);
197             return connection;
198         }
199         else
200         {
201             if (LOG.isDebugEnabled())
202                 LOG.debug("Connection active overflow {}", connection);
203             connection.close();
204             return null;
205         }
206     }
207 
208     protected void acquired(Connection connection)
209     {
210     }
211 
212     public boolean release(Connection connection)
213     {
214         boolean idle;
215         lock();
216         try
217         {
218             if (!activeConnections.remove(connection))
219                 return false;
220             // Make sure we use "hot" connections first.
221             idle = offerIdle(connection);
222         }
223         finally
224         {
225             unlock();
226         }
227 
228         released(connection);
229         return idle(connection, idle);
230     }
231 
232     protected boolean offerIdle(Connection connection)
233     {
234         return idleConnections.offerFirst(connection);
235     }
236 
237     protected boolean idle(Connection connection, boolean idle)
238     {
239         if (idle)
240         {
241             if (LOG.isDebugEnabled())
242                 LOG.debug("Connection idle {}", connection);
243             return true;
244         }
245         else
246         {
247             if (LOG.isDebugEnabled())
248                 LOG.debug("Connection idle overflow {}", connection);
249             connection.close();
250             return false;
251         }
252     }
253 
254     protected void released(Connection connection)
255     {
256     }
257 
258     public boolean remove(Connection connection)
259     {
260         return remove(connection, false);
261     }
262 
263     protected boolean remove(Connection connection, boolean force)
264     {
265         boolean activeRemoved;
266         boolean idleRemoved;
267         lock();
268         try
269         {
270             activeRemoved = activeConnections.remove(connection);
271             idleRemoved = idleConnections.remove(connection);
272         }
273         finally
274         {
275             unlock();
276         }
277 
278         if (activeRemoved)
279             released(connection);
280         boolean removed = activeRemoved || idleRemoved || force;
281         if (removed)
282         {
283             int pooled = connectionCount.decrementAndGet();
284             if (LOG.isDebugEnabled())
285                 LOG.debug("Connection removed {} - pooled: {}", connection, pooled);
286         }
287         return removed;
288     }
289 
290     public boolean isActive(Connection connection)
291     {
292         lock();
293         try
294         {
295             return activeConnections.contains(connection);
296         }
297         finally
298         {
299             unlock();
300         }
301     }
302 
303     public boolean isIdle(Connection connection)
304     {
305         lock();
306         try
307         {
308             return idleConnections.contains(connection);
309         }
310         finally
311         {
312             unlock();
313         }
314     }
315 
316     public boolean isEmpty()
317     {
318         return connectionCount.get() == 0;
319     }
320 
321     public void close()
322     {
323         List<Connection> idles = new ArrayList<>();
324         List<Connection> actives = new ArrayList<>();
325         lock();
326         try
327         {
328             idles.addAll(idleConnections);
329             idleConnections.clear();
330             actives.addAll(activeConnections);
331             activeConnections.clear();
332         }
333         finally
334         {
335             unlock();
336         }
337 
338         connectionCount.set(0);
339 
340         for (Connection connection : idles)
341             connection.close();
342 
343         // A bit drastic, but we cannot wait for all requests to complete
344         for (Connection connection : actives)
345             connection.close();
346     }
347 
348     @Override
349     public String dump()
350     {
351         return ContainerLifeCycle.dump(this);
352     }
353 
354     @Override
355     public void dump(Appendable out, String indent) throws IOException
356     {
357         List<Connection> actives = new ArrayList<>();
358         List<Connection> idles = new ArrayList<>();
359         lock();
360         try
361         {
362             actives.addAll(activeConnections);
363             idles.addAll(idleConnections);
364         }
365         finally
366         {
367             unlock();
368         }
369 
370         ContainerLifeCycle.dumpObject(out, this);
371         ContainerLifeCycle.dump(out, indent, actives, idles);
372     }
373 
374     @Override
375     public boolean sweep()
376     {
377         List<Sweeper.Sweepable> toSweep = new ArrayList<>();
378         lock();
379         try
380         {
381             for (Connection connection : getActiveConnections())
382             {
383                 if (connection instanceof Sweeper.Sweepable)
384                     toSweep.add(((Sweeper.Sweepable)connection));
385             }
386         }
387         finally
388         {
389             unlock();
390         }
391 
392         for (Sweeper.Sweepable candidate : toSweep)
393         {
394             if (candidate.sweep())
395             {
396                 boolean removed = getActiveConnections().remove(candidate);
397                 LOG.warn("Connection swept: {}{}{} from active connections{}{}",
398                         candidate,
399                         System.lineSeparator(),
400                         removed ? "Removed" : "Not removed",
401                         System.lineSeparator(),
402                         dump());
403             }
404         }
405 
406         return false;
407     }
408 
409     protected void lock()
410     {
411         lock.lock();
412     }
413 
414     protected void unlock()
415     {
416         lock.unlock();
417     }
418 
419     @Override
420     public String toString()
421     {
422         int activeSize;
423         int idleSize;
424         lock();
425         try
426         {
427             activeSize = activeConnections.size();
428             idleSize = idleConnections.size();
429         }
430         finally
431         {
432             unlock();
433         }
434 
435         return String.format("%s[c=%d/%d,a=%d,i=%d]",
436                 getClass().getSimpleName(),
437                 connectionCount.get(),
438                 maxConnections,
439                 activeSize,
440                 idleSize);
441     }
442 }