View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.concurrent.BlockingDeque;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.LinkedBlockingDeque;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import org.eclipse.jetty.client.api.Connection;
29  import org.eclipse.jetty.client.api.Destination;
30  import org.eclipse.jetty.util.BlockingArrayQueue;
31  import org.eclipse.jetty.util.Promise;
32  import org.eclipse.jetty.util.component.ContainerLifeCycle;
33  import org.eclipse.jetty.util.component.Dumpable;
34  import org.eclipse.jetty.util.log.Log;
35  import org.eclipse.jetty.util.log.Logger;
36  
37  public class ConnectionPool implements Closeable, Dumpable
38  {
39      protected static final Logger LOG = Log.getLogger(ConnectionPool.class);
40  
41      private final AtomicInteger connectionCount = new AtomicInteger();
42      private final Destination destination;
43      private final int maxConnections;
44      private final Promise<Connection> connectionPromise;
45      private final BlockingDeque<Connection> idleConnections;
46      private final BlockingQueue<Connection> activeConnections;
47  
48      public ConnectionPool(Destination destination, int maxConnections, Promise<Connection> connectionPromise)
49      {
50          this.destination = destination;
51          this.maxConnections = maxConnections;
52          this.connectionPromise = connectionPromise;
53          this.idleConnections = new LinkedBlockingDeque<>(maxConnections);
54          this.activeConnections = new BlockingArrayQueue<>(maxConnections);
55      }
56  
57      public BlockingQueue<Connection> getIdleConnections()
58      {
59          return idleConnections;
60      }
61  
62      public BlockingQueue<Connection> getActiveConnections()
63      {
64          return activeConnections;
65      }
66  
67      public Connection acquire()
68      {
69          Connection connection = acquireIdleConnection();
70          if (connection == null)
71              connection = tryCreate();
72          return connection;
73      }
74  
75      private Connection tryCreate()
76      {
77          while (true)
78          {
79              int current = connectionCount.get();
80              final int next = current + 1;
81  
82              if (next > maxConnections)
83              {
84                  if (LOG.isDebugEnabled())
85                      LOG.debug("Max connections {}/{} reached", current, maxConnections);
86                  // Try again the idle connections
87                  return acquireIdleConnection();
88              }
89  
90              if (connectionCount.compareAndSet(current, next))
91              {
92                  if (LOG.isDebugEnabled())
93                      LOG.debug("Connection {}/{} creation", next, maxConnections);
94  
95                  destination.newConnection(new Promise<Connection>()
96                  {
97                      @Override
98                      public void succeeded(Connection connection)
99                      {
100                         if (LOG.isDebugEnabled())
101                             LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection);
102                         if (activate(connection))
103                             connectionPromise.succeeded(connection);
104                     }
105 
106                     @Override
107                     public void failed(Throwable x)
108                     {
109                         if (LOG.isDebugEnabled())
110                             LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x);
111                         connectionCount.decrementAndGet();
112                         connectionPromise.failed(x);
113                     }
114                 });
115 
116                 // Try again the idle connections
117                 return acquireIdleConnection();
118             }
119         }
120     }
121 
122     private Connection acquireIdleConnection()
123     {
124         Connection connection = idleConnections.pollFirst();
125         if (connection == null)
126             return null;
127         return activate(connection) ? connection : null;
128     }
129 
130     private boolean activate(Connection connection)
131     {
132         if (activeConnections.offer(connection))
133         {
134             if (LOG.isDebugEnabled())
135                 LOG.debug("Connection active {}", connection);
136             acquired(connection);
137             return true;
138         }
139         else
140         {
141             if (LOG.isDebugEnabled())
142                 LOG.debug("Connection active overflow {}", connection);
143             connection.close();
144             return false;
145         }
146     }
147 
148     protected void acquired(Connection connection)
149     {
150     }
151 
152     public boolean release(Connection connection)
153     {
154         released(connection);
155         if (activeConnections.remove(connection))
156         {
157             // Make sure we use "hot" connections first
158             if (idleConnections.offerFirst(connection))
159             {
160                 if (LOG.isDebugEnabled())
161                     LOG.debug("Connection idle {}", connection);
162                 return true;
163             }
164             else
165             {
166                 if (LOG.isDebugEnabled())
167                     LOG.debug("Connection idle overflow {}", connection);
168                 connection.close();
169             }
170         }
171         return false;
172     }
173 
174     protected void released(Connection connection)
175     {
176     }
177 
178     public boolean remove(Connection connection)
179     {
180         boolean activeRemoved = activeConnections.remove(connection);
181         boolean idleRemoved = idleConnections.remove(connection);
182         if (!idleRemoved)
183             released(connection);
184         boolean removed = activeRemoved || idleRemoved;
185         if (removed)
186         {
187             int pooled = connectionCount.decrementAndGet();
188             if (LOG.isDebugEnabled())
189                 LOG.debug("Connection removed {} - pooled: {}", connection, pooled);
190         }
191         return removed;
192     }
193 
194     public boolean isActive(Connection connection)
195     {
196         return activeConnections.contains(connection);
197     }
198 
199     public boolean isIdle(Connection connection)
200     {
201         return idleConnections.contains(connection);
202     }
203 
204     public boolean isEmpty()
205     {
206         return connectionCount.get() == 0;
207     }
208 
209     public void close()
210     {
211         for (Connection connection : idleConnections)
212             connection.close();
213         idleConnections.clear();
214 
215         // A bit drastic, but we cannot wait for all requests to complete
216         for (Connection connection : activeConnections)
217             connection.close();
218         activeConnections.clear();
219 
220         connectionCount.set(0);
221     }
222 
223     @Override
224     public String dump()
225     {
226         return ContainerLifeCycle.dump(this);
227     }
228 
229     @Override
230     public void dump(Appendable out, String indent) throws IOException
231     {
232         ContainerLifeCycle.dumpObject(out, this);
233         ContainerLifeCycle.dump(out, indent, activeConnections, idleConnections);
234     }
235 
236     @Override
237     public String toString()
238     {
239         return String.format("%s[c=%d/%d,a=%d,i=%d]",
240                 getClass().getSimpleName(),
241                 connectionCount.get(),
242                 maxConnections,
243                 activeConnections.size(),
244                 idleConnections.size());
245     }
246 }