View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Deque;
25  import java.util.List;
26  import java.util.Queue;
27  import java.util.concurrent.LinkedBlockingDeque;
28  import java.util.concurrent.atomic.AtomicInteger;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import org.eclipse.jetty.client.api.Connection;
32  import org.eclipse.jetty.client.api.Destination;
33  import org.eclipse.jetty.util.BlockingArrayQueue;
34  import org.eclipse.jetty.util.Callback;
35  import org.eclipse.jetty.util.Promise;
36  import org.eclipse.jetty.util.annotation.ManagedAttribute;
37  import org.eclipse.jetty.util.annotation.ManagedObject;
38  import org.eclipse.jetty.util.component.ContainerLifeCycle;
39  import org.eclipse.jetty.util.component.Dumpable;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  import org.eclipse.jetty.util.thread.Sweeper;
43  
44  @ManagedObject("The connection pool")
45  public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
46  {
47      private static final Logger LOG = Log.getLogger(DuplexConnectionPool.class);
48  
49      private final AtomicInteger connectionCount = new AtomicInteger();
50      private final ReentrantLock lock = new ReentrantLock();
51      private final Destination destination;
52      private final int maxConnections;
53      private final Callback requester;
54      private final Deque<Connection> idleConnections;
55      private final Queue<Connection> activeConnections;
56  
57      public DuplexConnectionPool(Destination destination, int maxConnections, Callback requester)
58      {
59          this.destination = destination;
60          this.maxConnections = maxConnections;
61          this.requester = requester;
62          this.idleConnections = new LinkedBlockingDeque<>(maxConnections);
63          this.activeConnections = new BlockingArrayQueue<>(maxConnections);
64      }
65  
66      @ManagedAttribute(value = "The number of connections", readonly = true)
67      public int getConnectionCount()
68      {
69          return connectionCount.get();
70      }
71  
72      @ManagedAttribute(value = "The number of idle connections", readonly = true)
73      public int getIdleConnectionCount()
74      {
75          lock();
76          try
77          {
78              return idleConnections.size();
79          }
80          finally
81          {
82              unlock();
83          }
84      }
85  
86      @ManagedAttribute(value = "The number of active connections", readonly = true)
87      public int getActiveConnectionCount()
88      {
89          lock();
90          try
91          {
92              return activeConnections.size();
93          }
94          finally
95          {
96              unlock();
97          }
98      }
99  
100     public Queue<Connection> getIdleConnections()
101     {
102         return idleConnections;
103     }
104 
105     public Queue<Connection> getActiveConnections()
106     {
107         return activeConnections;
108     }
109 
110     public Connection acquire()
111     {
112         Connection connection = activateIdle();
113         if (connection == null)
114             connection = tryCreate();
115         return connection;
116     }
117 
118     private Connection tryCreate()
119     {
120         while (true)
121         {
122             int current = getConnectionCount();
123             final int next = current + 1;
124 
125             if (next > maxConnections)
126             {
127                 if (LOG.isDebugEnabled())
128                     LOG.debug("Max connections {}/{} reached", current, maxConnections);
129                 // Try again the idle connections
130                 return activateIdle();
131             }
132 
133             if (connectionCount.compareAndSet(current, next))
134             {
135                 if (LOG.isDebugEnabled())
136                     LOG.debug("Connection {}/{} creation", next, maxConnections);
137 
138                 destination.newConnection(new Promise<Connection>()
139                 {
140                     @Override
141                     public void succeeded(Connection connection)
142                     {
143                         if (LOG.isDebugEnabled())
144                             LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection);
145 
146                         idleCreated(connection);
147 
148                         proceed();
149                     }
150 
151                     @Override
152                     public void failed(Throwable x)
153                     {
154                         if (LOG.isDebugEnabled())
155                             LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x);
156 
157                         connectionCount.decrementAndGet();
158 
159                         requester.failed(x);
160                     }
161                 });
162 
163                 // Try again the idle connections
164                 return activateIdle();
165             }
166         }
167     }
168 
169     protected void proceed()
170     {
171         requester.succeeded();
172     }
173 
174     protected void idleCreated(Connection connection)
175     {
176         boolean idle;
177         lock();
178         try
179         {
180             // Use "cold" new connections as last.
181             idle = idleConnections.offerLast(connection);
182         }
183         finally
184         {
185             unlock();
186         }
187 
188         idle(connection, idle);
189     }
190 
191     private Connection activateIdle()
192     {
193         boolean acquired;
194         Connection connection;
195         lock();
196         try
197         {
198             connection = idleConnections.pollFirst();
199             if (connection == null)
200                 return null;
201             acquired = activeConnections.offer(connection);
202         }
203         finally
204         {
205             unlock();
206         }
207 
208         if (acquired)
209         {
210             if (LOG.isDebugEnabled())
211                 LOG.debug("Connection active {}", connection);
212             acquired(connection);
213             return connection;
214         }
215         else
216         {
217             if (LOG.isDebugEnabled())
218                 LOG.debug("Connection active overflow {}", connection);
219             connection.close();
220             return null;
221         }
222     }
223 
224     protected void acquired(Connection connection)
225     {
226     }
227 
228     public boolean release(Connection connection)
229     {
230         boolean idle;
231         lock();
232         try
233         {
234             if (!activeConnections.remove(connection))
235                 return false;
236             // Make sure we use "hot" connections first.
237             idle = offerIdle(connection);
238         }
239         finally
240         {
241             unlock();
242         }
243 
244         released(connection);
245         return idle(connection, idle);
246     }
247 
248     protected boolean offerIdle(Connection connection)
249     {
250         return idleConnections.offerFirst(connection);
251     }
252 
253     protected boolean idle(Connection connection, boolean idle)
254     {
255         if (idle)
256         {
257             if (LOG.isDebugEnabled())
258                 LOG.debug("Connection idle {}", connection);
259             return true;
260         }
261         else
262         {
263             if (LOG.isDebugEnabled())
264                 LOG.debug("Connection idle overflow {}", connection);
265             connection.close();
266             return false;
267         }
268     }
269 
270     protected void released(Connection connection)
271     {
272     }
273 
274     public boolean remove(Connection connection)
275     {
276         return remove(connection, false);
277     }
278 
279     protected boolean remove(Connection connection, boolean force)
280     {
281         boolean activeRemoved;
282         boolean idleRemoved;
283         lock();
284         try
285         {
286             activeRemoved = activeConnections.remove(connection);
287             idleRemoved = idleConnections.remove(connection);
288         }
289         finally
290         {
291             unlock();
292         }
293 
294         if (activeRemoved || force)
295             released(connection);
296         boolean removed = activeRemoved || idleRemoved || force;
297         if (removed)
298         {
299             int pooled = connectionCount.decrementAndGet();
300             if (LOG.isDebugEnabled())
301                 LOG.debug("Connection removed {} - pooled: {}", connection, pooled);
302         }
303         return removed;
304     }
305 
306     public boolean isActive(Connection connection)
307     {
308         lock();
309         try
310         {
311             return activeConnections.contains(connection);
312         }
313         finally
314         {
315             unlock();
316         }
317     }
318 
319     public boolean isIdle(Connection connection)
320     {
321         lock();
322         try
323         {
324             return idleConnections.contains(connection);
325         }
326         finally
327         {
328             unlock();
329         }
330     }
331 
332     public boolean isEmpty()
333     {
334         return connectionCount.get() == 0;
335     }
336 
337     public void close()
338     {
339         List<Connection> idles = new ArrayList<>();
340         List<Connection> actives = new ArrayList<>();
341         lock();
342         try
343         {
344             idles.addAll(idleConnections);
345             idleConnections.clear();
346             actives.addAll(activeConnections);
347             activeConnections.clear();
348         }
349         finally
350         {
351             unlock();
352         }
353 
354         connectionCount.set(0);
355 
356         for (Connection connection : idles)
357             connection.close();
358 
359         // A bit drastic, but we cannot wait for all requests to complete
360         for (Connection connection : actives)
361             connection.close();
362     }
363 
364     @Override
365     public String dump()
366     {
367         return ContainerLifeCycle.dump(this);
368     }
369 
370     @Override
371     public void dump(Appendable out, String indent) throws IOException
372     {
373         List<Connection> actives = new ArrayList<>();
374         List<Connection> idles = new ArrayList<>();
375         lock();
376         try
377         {
378             actives.addAll(activeConnections);
379             idles.addAll(idleConnections);
380         }
381         finally
382         {
383             unlock();
384         }
385 
386         ContainerLifeCycle.dumpObject(out, this);
387         ContainerLifeCycle.dump(out, indent, actives, idles);
388     }
389 
390     @Override
391     public boolean sweep()
392     {
393         List<Connection> toSweep = new ArrayList<>();
394         lock();
395         try
396         {
397             for (Connection connection : activeConnections)
398             {
399                 if (connection instanceof Sweeper.Sweepable)
400                     toSweep.add(connection);
401             }
402         }
403         finally
404         {
405             unlock();
406         }
407 
408         for (Connection connection : toSweep)
409         {
410             if (((Sweeper.Sweepable)connection).sweep())
411             {
412                 boolean removed = remove(connection, true);
413                 LOG.warn("Connection swept: {}{}{} from active connections{}{}",
414                         connection,
415                         System.lineSeparator(),
416                         removed ? "Removed" : "Not removed",
417                         System.lineSeparator(),
418                         dump());
419             }
420         }
421 
422         return false;
423     }
424 
425     protected void lock()
426     {
427         lock.lock();
428     }
429 
430     protected void unlock()
431     {
432         lock.unlock();
433     }
434 
435     @Override
436     public String toString()
437     {
438         int activeSize;
439         int idleSize;
440         lock();
441         try
442         {
443             activeSize = activeConnections.size();
444             idleSize = idleConnections.size();
445         }
446         finally
447         {
448             unlock();
449         }
450 
451         return String.format("%s[c=%d/%d,a=%d,i=%d]",
452                 getClass().getSimpleName(),
453                 connectionCount.get(),
454                 maxConnections,
455                 activeSize,
456                 idleSize);
457     }
458 }