1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Deque;
25 import java.util.List;
26 import java.util.Queue;
27 import java.util.concurrent.LinkedBlockingDeque;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import java.util.concurrent.locks.ReentrantLock;
30
31 import org.eclipse.jetty.client.api.Connection;
32 import org.eclipse.jetty.client.api.Destination;
33 import org.eclipse.jetty.util.BlockingArrayQueue;
34 import org.eclipse.jetty.util.Callback;
35 import org.eclipse.jetty.util.Promise;
36 import org.eclipse.jetty.util.annotation.ManagedAttribute;
37 import org.eclipse.jetty.util.annotation.ManagedObject;
38 import org.eclipse.jetty.util.component.ContainerLifeCycle;
39 import org.eclipse.jetty.util.component.Dumpable;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 import org.eclipse.jetty.util.thread.Sweeper;
43
44 @ManagedObject("The connection pool")
45 public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
46 {
47 private static final Logger LOG = Log.getLogger(ConnectionPool.class);
48
49 private final AtomicInteger connectionCount = new AtomicInteger();
50 private final ReentrantLock lock = new ReentrantLock();
51 private final Destination destination;
52 private final int maxConnections;
53 private final Callback requester;
54 private final Deque<Connection> idleConnections;
55 private final Queue<Connection> activeConnections;
56
57 public ConnectionPool(Destination destination, int maxConnections, Callback requester)
58 {
59 this.destination = destination;
60 this.maxConnections = maxConnections;
61 this.requester = requester;
62 this.idleConnections = new LinkedBlockingDeque<>(maxConnections);
63 this.activeConnections = new BlockingArrayQueue<>(maxConnections);
64 }
65
66 @ManagedAttribute(value = "The number of connections", readonly = true)
67 public int getConnectionCount()
68 {
69 return connectionCount.get();
70 }
71
72 @ManagedAttribute(value = "The number of idle connections", readonly = true)
73 public int getIdleConnectionCount()
74 {
75 return idleConnections.size();
76 }
77
78 @ManagedAttribute(value = "The number of active connections", readonly = true)
79 public int getActiveConnectionCount()
80 {
81 return activeConnections.size();
82 }
83
84 public Queue<Connection> getIdleConnections()
85 {
86 return idleConnections;
87 }
88
89 public Queue<Connection> getActiveConnections()
90 {
91 return activeConnections;
92 }
93
94 public Connection acquire()
95 {
96 Connection connection = activateIdle();
97 if (connection == null)
98 connection = tryCreate();
99 return connection;
100 }
101
102 private Connection tryCreate()
103 {
104 while (true)
105 {
106 int current = getConnectionCount();
107 final int next = current + 1;
108
109 if (next > maxConnections)
110 {
111 if (LOG.isDebugEnabled())
112 LOG.debug("Max connections {}/{} reached", current, maxConnections);
113
114 return activateIdle();
115 }
116
117 if (connectionCount.compareAndSet(current, next))
118 {
119 if (LOG.isDebugEnabled())
120 LOG.debug("Connection {}/{} creation", next, maxConnections);
121
122 destination.newConnection(new Promise<Connection>()
123 {
124 @Override
125 public void succeeded(Connection connection)
126 {
127 if (LOG.isDebugEnabled())
128 LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection);
129
130 idleCreated(connection);
131
132 proceed();
133 }
134
135 @Override
136 public void failed(Throwable x)
137 {
138 if (LOG.isDebugEnabled())
139 LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x);
140
141 connectionCount.decrementAndGet();
142
143 requester.failed(x);
144 }
145 });
146
147
148 return activateIdle();
149 }
150 }
151 }
152
153 protected void proceed()
154 {
155 requester.succeeded();
156 }
157
158 protected void idleCreated(Connection connection)
159 {
160 boolean idle;
161 lock();
162 try
163 {
164
165 idle = idleConnections.offerLast(connection);
166 }
167 finally
168 {
169 unlock();
170 }
171
172 idle(connection, idle);
173 }
174
175 private Connection activateIdle()
176 {
177 boolean acquired;
178 Connection connection;
179 lock();
180 try
181 {
182 connection = idleConnections.pollFirst();
183 if (connection == null)
184 return null;
185 acquired = activeConnections.offer(connection);
186 }
187 finally
188 {
189 unlock();
190 }
191
192 if (acquired)
193 {
194 if (LOG.isDebugEnabled())
195 LOG.debug("Connection active {}", connection);
196 acquired(connection);
197 return connection;
198 }
199 else
200 {
201 if (LOG.isDebugEnabled())
202 LOG.debug("Connection active overflow {}", connection);
203 connection.close();
204 return null;
205 }
206 }
207
208 protected void acquired(Connection connection)
209 {
210 }
211
212 public boolean release(Connection connection)
213 {
214 boolean idle;
215 lock();
216 try
217 {
218 if (!activeConnections.remove(connection))
219 return false;
220
221 idle = offerIdle(connection);
222 }
223 finally
224 {
225 unlock();
226 }
227
228 released(connection);
229 return idle(connection, idle);
230 }
231
232 protected boolean offerIdle(Connection connection)
233 {
234 return idleConnections.offerFirst(connection);
235 }
236
237 protected boolean idle(Connection connection, boolean idle)
238 {
239 if (idle)
240 {
241 if (LOG.isDebugEnabled())
242 LOG.debug("Connection idle {}", connection);
243 return true;
244 }
245 else
246 {
247 if (LOG.isDebugEnabled())
248 LOG.debug("Connection idle overflow {}", connection);
249 connection.close();
250 return false;
251 }
252 }
253
254 protected void released(Connection connection)
255 {
256 }
257
258 public boolean remove(Connection connection)
259 {
260 return remove(connection, false);
261 }
262
263 protected boolean remove(Connection connection, boolean force)
264 {
265 boolean activeRemoved;
266 boolean idleRemoved;
267 lock();
268 try
269 {
270 activeRemoved = activeConnections.remove(connection);
271 idleRemoved = idleConnections.remove(connection);
272 }
273 finally
274 {
275 unlock();
276 }
277
278 if (activeRemoved)
279 released(connection);
280 boolean removed = activeRemoved || idleRemoved || force;
281 if (removed)
282 {
283 int pooled = connectionCount.decrementAndGet();
284 if (LOG.isDebugEnabled())
285 LOG.debug("Connection removed {} - pooled: {}", connection, pooled);
286 }
287 return removed;
288 }
289
290 public boolean isActive(Connection connection)
291 {
292 lock();
293 try
294 {
295 return activeConnections.contains(connection);
296 }
297 finally
298 {
299 unlock();
300 }
301 }
302
303 public boolean isIdle(Connection connection)
304 {
305 lock();
306 try
307 {
308 return idleConnections.contains(connection);
309 }
310 finally
311 {
312 unlock();
313 }
314 }
315
316 public boolean isEmpty()
317 {
318 return connectionCount.get() == 0;
319 }
320
321 public void close()
322 {
323 List<Connection> idles = new ArrayList<>();
324 List<Connection> actives = new ArrayList<>();
325 lock();
326 try
327 {
328 idles.addAll(idleConnections);
329 idleConnections.clear();
330 actives.addAll(activeConnections);
331 activeConnections.clear();
332 }
333 finally
334 {
335 unlock();
336 }
337
338 connectionCount.set(0);
339
340 for (Connection connection : idles)
341 connection.close();
342
343
344 for (Connection connection : actives)
345 connection.close();
346 }
347
348 @Override
349 public String dump()
350 {
351 return ContainerLifeCycle.dump(this);
352 }
353
354 @Override
355 public void dump(Appendable out, String indent) throws IOException
356 {
357 List<Connection> actives = new ArrayList<>();
358 List<Connection> idles = new ArrayList<>();
359 lock();
360 try
361 {
362 actives.addAll(activeConnections);
363 idles.addAll(idleConnections);
364 }
365 finally
366 {
367 unlock();
368 }
369
370 ContainerLifeCycle.dumpObject(out, this);
371 ContainerLifeCycle.dump(out, indent, actives, idles);
372 }
373
374 @Override
375 public boolean sweep()
376 {
377 List<Sweeper.Sweepable> toSweep = new ArrayList<>();
378 lock();
379 try
380 {
381 for (Connection connection : getActiveConnections())
382 {
383 if (connection instanceof Sweeper.Sweepable)
384 toSweep.add(((Sweeper.Sweepable)connection));
385 }
386 }
387 finally
388 {
389 unlock();
390 }
391
392 for (Sweeper.Sweepable candidate : toSweep)
393 {
394 if (candidate.sweep())
395 {
396 boolean removed = getActiveConnections().remove(candidate);
397 LOG.warn("Connection swept: {}{}{} from active connections{}{}",
398 candidate,
399 System.lineSeparator(),
400 removed ? "Removed" : "Not removed",
401 System.lineSeparator(),
402 dump());
403 }
404 }
405
406 return false;
407 }
408
409 protected void lock()
410 {
411 lock.lock();
412 }
413
414 protected void unlock()
415 {
416 lock.unlock();
417 }
418
419 @Override
420 public String toString()
421 {
422 int activeSize;
423 int idleSize;
424 lock();
425 try
426 {
427 activeSize = activeConnections.size();
428 idleSize = idleConnections.size();
429 }
430 finally
431 {
432 unlock();
433 }
434
435 return String.format("%s[c=%d/%d,a=%d,i=%d]",
436 getClass().getSimpleName(),
437 connectionCount.get(),
438 maxConnections,
439 activeSize,
440 idleSize);
441 }
442 }