1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.concurrent.BlockingDeque;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.LinkedBlockingDeque;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import org.eclipse.jetty.client.api.Connection;
29 import org.eclipse.jetty.client.api.Destination;
30 import org.eclipse.jetty.util.BlockingArrayQueue;
31 import org.eclipse.jetty.util.Promise;
32 import org.eclipse.jetty.util.component.ContainerLifeCycle;
33 import org.eclipse.jetty.util.component.Dumpable;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.log.Logger;
36
37 public class ConnectionPool implements Closeable, Dumpable
38 {
39 protected static final Logger LOG = Log.getLogger(ConnectionPool.class);
40
41 private final AtomicInteger connectionCount = new AtomicInteger();
42 private final Destination destination;
43 private final int maxConnections;
44 private final Promise<Connection> connectionPromise;
45 private final BlockingDeque<Connection> idleConnections;
46 private final BlockingQueue<Connection> activeConnections;
47
48 public ConnectionPool(Destination destination, int maxConnections, Promise<Connection> connectionPromise)
49 {
50 this.destination = destination;
51 this.maxConnections = maxConnections;
52 this.connectionPromise = connectionPromise;
53 this.idleConnections = new LinkedBlockingDeque<>(maxConnections);
54 this.activeConnections = new BlockingArrayQueue<>(maxConnections);
55 }
56
57 public BlockingQueue<Connection> getIdleConnections()
58 {
59 return idleConnections;
60 }
61
62 public BlockingQueue<Connection> getActiveConnections()
63 {
64 return activeConnections;
65 }
66
67 public Connection acquire()
68 {
69 Connection connection = acquireIdleConnection();
70 if (connection == null)
71 connection = tryCreate();
72 return connection;
73 }
74
75 private Connection tryCreate()
76 {
77 while (true)
78 {
79 int current = connectionCount.get();
80 final int next = current + 1;
81
82 if (next > maxConnections)
83 {
84 if (LOG.isDebugEnabled())
85 LOG.debug("Max connections {}/{} reached", current, maxConnections);
86
87 return acquireIdleConnection();
88 }
89
90 if (connectionCount.compareAndSet(current, next))
91 {
92 if (LOG.isDebugEnabled())
93 LOG.debug("Connection {}/{} creation", next, maxConnections);
94
95 destination.newConnection(new Promise<Connection>()
96 {
97 @Override
98 public void succeeded(Connection connection)
99 {
100 if (LOG.isDebugEnabled())
101 LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection);
102 if (activate(connection))
103 connectionPromise.succeeded(connection);
104 }
105
106 @Override
107 public void failed(Throwable x)
108 {
109 if (LOG.isDebugEnabled())
110 LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x);
111 connectionCount.decrementAndGet();
112 connectionPromise.failed(x);
113 }
114 });
115
116
117 return acquireIdleConnection();
118 }
119 }
120 }
121
122 private Connection acquireIdleConnection()
123 {
124 Connection connection = idleConnections.pollFirst();
125 if (connection == null)
126 return null;
127 return activate(connection) ? connection : null;
128 }
129
130 private boolean activate(Connection connection)
131 {
132 if (activeConnections.offer(connection))
133 {
134 if (LOG.isDebugEnabled())
135 LOG.debug("Connection active {}", connection);
136 acquired(connection);
137 return true;
138 }
139 else
140 {
141 if (LOG.isDebugEnabled())
142 LOG.debug("Connection active overflow {}", connection);
143 connection.close();
144 return false;
145 }
146 }
147
148 protected void acquired(Connection connection)
149 {
150 }
151
152 public boolean release(Connection connection)
153 {
154 released(connection);
155 if (activeConnections.remove(connection))
156 {
157
158 if (idleConnections.offerFirst(connection))
159 {
160 if (LOG.isDebugEnabled())
161 LOG.debug("Connection idle {}", connection);
162 return true;
163 }
164 else
165 {
166 if (LOG.isDebugEnabled())
167 LOG.debug("Connection idle overflow {}", connection);
168 connection.close();
169 }
170 }
171 return false;
172 }
173
174 protected void released(Connection connection)
175 {
176 }
177
178 public boolean remove(Connection connection)
179 {
180 boolean activeRemoved = activeConnections.remove(connection);
181 boolean idleRemoved = idleConnections.remove(connection);
182 if (!idleRemoved)
183 released(connection);
184 boolean removed = activeRemoved || idleRemoved;
185 if (removed)
186 {
187 int pooled = connectionCount.decrementAndGet();
188 if (LOG.isDebugEnabled())
189 LOG.debug("Connection removed {} - pooled: {}", connection, pooled);
190 }
191 return removed;
192 }
193
194 public boolean isActive(Connection connection)
195 {
196 return activeConnections.contains(connection);
197 }
198
199 public boolean isIdle(Connection connection)
200 {
201 return idleConnections.contains(connection);
202 }
203
204 public boolean isEmpty()
205 {
206 return connectionCount.get() == 0;
207 }
208
209 public void close()
210 {
211 for (Connection connection : idleConnections)
212 connection.close();
213 idleConnections.clear();
214
215
216 for (Connection connection : activeConnections)
217 connection.close();
218 activeConnections.clear();
219
220 connectionCount.set(0);
221 }
222
223 @Override
224 public String dump()
225 {
226 return ContainerLifeCycle.dump(this);
227 }
228
229 @Override
230 public void dump(Appendable out, String indent) throws IOException
231 {
232 ContainerLifeCycle.dumpObject(out, this);
233 ContainerLifeCycle.dump(out, indent, activeConnections, idleConnections);
234 }
235
236 @Override
237 public String toString()
238 {
239 return String.format("%s[c=%d/%d,a=%d,i=%d]",
240 getClass().getSimpleName(),
241 connectionCount.get(),
242 maxConnections,
243 activeConnections.size(),
244 idleConnections.size());
245 }
246 }