View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.util.concurrent.BlockingDeque;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.LinkedBlockingDeque;
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import org.eclipse.jetty.client.api.Connection;
28  import org.eclipse.jetty.client.api.Destination;
29  import org.eclipse.jetty.util.BlockingArrayQueue;
30  import org.eclipse.jetty.util.Promise;
31  import org.eclipse.jetty.util.component.ContainerLifeCycle;
32  import org.eclipse.jetty.util.component.Dumpable;
33  import org.eclipse.jetty.util.log.Log;
34  import org.eclipse.jetty.util.log.Logger;
35  
36  public class ConnectionPool implements Dumpable
37  {
38      private static final Logger LOG = Log.getLogger(ConnectionPool.class);
39  
40      private final AtomicInteger connectionCount = new AtomicInteger();
41      private final Destination destination;
42      private final int maxConnections;
43      private final Promise<Connection> connectionPromise;
44      private final BlockingDeque<Connection> idleConnections;
45      private final BlockingQueue<Connection> activeConnections;
46  
47      public ConnectionPool(Destination destination, int maxConnections, Promise<Connection> connectionPromise)
48      {
49          this.destination = destination;
50          this.maxConnections = maxConnections;
51          this.connectionPromise = connectionPromise;
52          this.idleConnections = new LinkedBlockingDeque<>(maxConnections);
53          this.activeConnections = new BlockingArrayQueue<>(maxConnections);
54      }
55  
56      public BlockingQueue<Connection> getIdleConnections()
57      {
58          return idleConnections;
59      }
60  
61      public BlockingQueue<Connection> getActiveConnections()
62      {
63          return activeConnections;
64      }
65  
66      public Connection acquire()
67      {
68          Connection result = acquireIdleConnection();
69          if (result != null)
70              return result;
71  
72          while (true)
73          {
74              int current = connectionCount.get();
75              final int next = current + 1;
76  
77              if (next > maxConnections)
78              {
79                  LOG.debug("Max connections {}/{} reached", current, maxConnections);
80                  // Try again the idle connections
81                  return acquireIdleConnection();
82              }
83  
84              if (connectionCount.compareAndSet(current, next))
85              {
86                  LOG.debug("Connection {}/{} creation", next, maxConnections);
87  
88                  destination.newConnection(new Promise<Connection>()
89                  {
90                      @Override
91                      public void succeeded(Connection connection)
92                      {
93                          LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection);
94                          activate(connection);
95                          connectionPromise.succeeded(connection);
96                      }
97  
98                      @Override
99                      public void failed(Throwable x)
100                     {
101                         LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x);
102                         connectionCount.decrementAndGet();
103                         connectionPromise.failed(x);
104                     }
105                 });
106 
107                 // Try again the idle connections
108                 return acquireIdleConnection();
109             }
110         }
111     }
112 
113     private Connection acquireIdleConnection()
114     {
115         Connection connection = idleConnections.pollFirst();
116         if (connection != null)
117             activate(connection);
118         return connection;
119     }
120 
121     private boolean activate(Connection connection)
122     {
123         if (activeConnections.offer(connection))
124         {
125             LOG.debug("Connection active {}", connection);
126             return true;
127         }
128         else
129         {
130             LOG.debug("Connection active overflow {}", connection);
131             return false;
132         }
133     }
134 
135     public boolean release(Connection connection)
136     {
137         if (activeConnections.remove(connection))
138         {
139             // Make sure we use "hot" connections first
140             if (idleConnections.offerFirst(connection))
141             {
142                 LOG.debug("Connection idle {}", connection);
143                 return true;
144             }
145             else
146             {
147                 LOG.debug("Connection idle overflow {}", connection);
148             }
149         }
150         return false;
151     }
152 
153     public boolean remove(Connection connection)
154     {
155         boolean removed = activeConnections.remove(connection);
156         removed |= idleConnections.remove(connection);
157         if (removed)
158         {
159             int pooled = connectionCount.decrementAndGet();
160             LOG.debug("Connection removed {} - pooled: {}", connection, pooled);
161         }
162         return removed;
163     }
164 
165     public boolean isActive(Connection connection)
166     {
167         return activeConnections.contains(connection);
168     }
169 
170     public boolean isIdle(Connection connection)
171     {
172         return idleConnections.contains(connection);
173     }
174 
175     public void close()
176     {
177         for (Connection connection : idleConnections)
178             connection.close();
179         idleConnections.clear();
180 
181         // A bit drastic, but we cannot wait for all requests to complete
182         for (Connection connection : activeConnections)
183             connection.close();
184         activeConnections.clear();
185 
186         connectionCount.set(0);
187     }
188 
189     @Override
190     public String dump()
191     {
192         return ContainerLifeCycle.dump(this);
193     }
194 
195     @Override
196     public void dump(Appendable out, String indent) throws IOException
197     {
198         ContainerLifeCycle.dumpObject(out, this);
199         ContainerLifeCycle.dump(out, indent, activeConnections, idleConnections);
200     }
201 
202     @Override
203     public String toString()
204     {
205         return String.format("%s %d/%d", getClass().getSimpleName(), connectionCount.get(), maxConnections);
206     }
207 }