1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Deque;
25 import java.util.List;
26 import java.util.Queue;
27 import java.util.concurrent.LinkedBlockingDeque;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import java.util.concurrent.locks.ReentrantLock;
30
31 import org.eclipse.jetty.client.api.Connection;
32 import org.eclipse.jetty.client.api.Destination;
33 import org.eclipse.jetty.util.BlockingArrayQueue;
34 import org.eclipse.jetty.util.Callback;
35 import org.eclipse.jetty.util.Promise;
36 import org.eclipse.jetty.util.annotation.ManagedAttribute;
37 import org.eclipse.jetty.util.annotation.ManagedObject;
38 import org.eclipse.jetty.util.component.ContainerLifeCycle;
39 import org.eclipse.jetty.util.component.Dumpable;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 import org.eclipse.jetty.util.thread.Sweeper;
43
44 @ManagedObject("The connection pool")
45 public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
46 {
47 private static final Logger LOG = Log.getLogger(DuplexConnectionPool.class);
48
49 private final AtomicInteger connectionCount = new AtomicInteger();
50 private final ReentrantLock lock = new ReentrantLock();
51 private final Destination destination;
52 private final int maxConnections;
53 private final Callback requester;
54 private final Deque<Connection> idleConnections;
55 private final Queue<Connection> activeConnections;
56
57 public DuplexConnectionPool(Destination destination, int maxConnections, Callback requester)
58 {
59 this.destination = destination;
60 this.maxConnections = maxConnections;
61 this.requester = requester;
62 this.idleConnections = new LinkedBlockingDeque<>(maxConnections);
63 this.activeConnections = new BlockingArrayQueue<>(maxConnections);
64 }
65
66 @ManagedAttribute(value = "The number of connections", readonly = true)
67 public int getConnectionCount()
68 {
69 return connectionCount.get();
70 }
71
72 @ManagedAttribute(value = "The number of idle connections", readonly = true)
73 public int getIdleConnectionCount()
74 {
75 lock();
76 try
77 {
78 return idleConnections.size();
79 }
80 finally
81 {
82 unlock();
83 }
84 }
85
86 @ManagedAttribute(value = "The number of active connections", readonly = true)
87 public int getActiveConnectionCount()
88 {
89 lock();
90 try
91 {
92 return activeConnections.size();
93 }
94 finally
95 {
96 unlock();
97 }
98 }
99
100 public Queue<Connection> getIdleConnections()
101 {
102 return idleConnections;
103 }
104
105 public Queue<Connection> getActiveConnections()
106 {
107 return activeConnections;
108 }
109
110 public Connection acquire()
111 {
112 Connection connection = activateIdle();
113 if (connection == null)
114 connection = tryCreate();
115 return connection;
116 }
117
118 private Connection tryCreate()
119 {
120 while (true)
121 {
122 int current = getConnectionCount();
123 final int next = current + 1;
124
125 if (next > maxConnections)
126 {
127 if (LOG.isDebugEnabled())
128 LOG.debug("Max connections {}/{} reached", current, maxConnections);
129
130 return activateIdle();
131 }
132
133 if (connectionCount.compareAndSet(current, next))
134 {
135 if (LOG.isDebugEnabled())
136 LOG.debug("Connection {}/{} creation", next, maxConnections);
137
138 destination.newConnection(new Promise<Connection>()
139 {
140 @Override
141 public void succeeded(Connection connection)
142 {
143 if (LOG.isDebugEnabled())
144 LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection);
145
146 idleCreated(connection);
147
148 proceed();
149 }
150
151 @Override
152 public void failed(Throwable x)
153 {
154 if (LOG.isDebugEnabled())
155 LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x);
156
157 connectionCount.decrementAndGet();
158
159 requester.failed(x);
160 }
161 });
162
163
164 return activateIdle();
165 }
166 }
167 }
168
169 protected void proceed()
170 {
171 requester.succeeded();
172 }
173
174 protected void idleCreated(Connection connection)
175 {
176 boolean idle;
177 lock();
178 try
179 {
180
181 idle = idleConnections.offerLast(connection);
182 }
183 finally
184 {
185 unlock();
186 }
187
188 idle(connection, idle);
189 }
190
191 private Connection activateIdle()
192 {
193 boolean acquired;
194 Connection connection;
195 lock();
196 try
197 {
198 connection = idleConnections.pollFirst();
199 if (connection == null)
200 return null;
201 acquired = activeConnections.offer(connection);
202 }
203 finally
204 {
205 unlock();
206 }
207
208 if (acquired)
209 {
210 if (LOG.isDebugEnabled())
211 LOG.debug("Connection active {}", connection);
212 acquired(connection);
213 return connection;
214 }
215 else
216 {
217 if (LOG.isDebugEnabled())
218 LOG.debug("Connection active overflow {}", connection);
219 connection.close();
220 return null;
221 }
222 }
223
224 protected void acquired(Connection connection)
225 {
226 }
227
228 public boolean release(Connection connection)
229 {
230 boolean idle;
231 lock();
232 try
233 {
234 if (!activeConnections.remove(connection))
235 return false;
236
237 idle = offerIdle(connection);
238 }
239 finally
240 {
241 unlock();
242 }
243
244 released(connection);
245 return idle(connection, idle);
246 }
247
248 protected boolean offerIdle(Connection connection)
249 {
250 return idleConnections.offerFirst(connection);
251 }
252
253 protected boolean idle(Connection connection, boolean idle)
254 {
255 if (idle)
256 {
257 if (LOG.isDebugEnabled())
258 LOG.debug("Connection idle {}", connection);
259 return true;
260 }
261 else
262 {
263 if (LOG.isDebugEnabled())
264 LOG.debug("Connection idle overflow {}", connection);
265 connection.close();
266 return false;
267 }
268 }
269
270 protected void released(Connection connection)
271 {
272 }
273
274 public boolean remove(Connection connection)
275 {
276 return remove(connection, false);
277 }
278
279 protected boolean remove(Connection connection, boolean force)
280 {
281 boolean activeRemoved;
282 boolean idleRemoved;
283 lock();
284 try
285 {
286 activeRemoved = activeConnections.remove(connection);
287 idleRemoved = idleConnections.remove(connection);
288 }
289 finally
290 {
291 unlock();
292 }
293
294 if (activeRemoved || force)
295 released(connection);
296 boolean removed = activeRemoved || idleRemoved || force;
297 if (removed)
298 {
299 int pooled = connectionCount.decrementAndGet();
300 if (LOG.isDebugEnabled())
301 LOG.debug("Connection removed {} - pooled: {}", connection, pooled);
302 }
303 return removed;
304 }
305
306 public boolean isActive(Connection connection)
307 {
308 lock();
309 try
310 {
311 return activeConnections.contains(connection);
312 }
313 finally
314 {
315 unlock();
316 }
317 }
318
319 public boolean isIdle(Connection connection)
320 {
321 lock();
322 try
323 {
324 return idleConnections.contains(connection);
325 }
326 finally
327 {
328 unlock();
329 }
330 }
331
332 public boolean isEmpty()
333 {
334 return connectionCount.get() == 0;
335 }
336
337 public void close()
338 {
339 List<Connection> idles = new ArrayList<>();
340 List<Connection> actives = new ArrayList<>();
341 lock();
342 try
343 {
344 idles.addAll(idleConnections);
345 idleConnections.clear();
346 actives.addAll(activeConnections);
347 activeConnections.clear();
348 }
349 finally
350 {
351 unlock();
352 }
353
354 connectionCount.set(0);
355
356 for (Connection connection : idles)
357 connection.close();
358
359
360 for (Connection connection : actives)
361 connection.close();
362 }
363
364 @Override
365 public String dump()
366 {
367 return ContainerLifeCycle.dump(this);
368 }
369
370 @Override
371 public void dump(Appendable out, String indent) throws IOException
372 {
373 List<Connection> actives = new ArrayList<>();
374 List<Connection> idles = new ArrayList<>();
375 lock();
376 try
377 {
378 actives.addAll(activeConnections);
379 idles.addAll(idleConnections);
380 }
381 finally
382 {
383 unlock();
384 }
385
386 ContainerLifeCycle.dumpObject(out, this);
387 ContainerLifeCycle.dump(out, indent, actives, idles);
388 }
389
390 @Override
391 public boolean sweep()
392 {
393 List<Connection> toSweep = new ArrayList<>();
394 lock();
395 try
396 {
397 for (Connection connection : activeConnections)
398 {
399 if (connection instanceof Sweeper.Sweepable)
400 toSweep.add(connection);
401 }
402 }
403 finally
404 {
405 unlock();
406 }
407
408 for (Connection connection : toSweep)
409 {
410 if (((Sweeper.Sweepable)connection).sweep())
411 {
412 boolean removed = remove(connection, true);
413 LOG.warn("Connection swept: {}{}{} from active connections{}{}",
414 connection,
415 System.lineSeparator(),
416 removed ? "Removed" : "Not removed",
417 System.lineSeparator(),
418 dump());
419 }
420 }
421
422 return false;
423 }
424
425 protected void lock()
426 {
427 lock.lock();
428 }
429
430 protected void unlock()
431 {
432 lock.unlock();
433 }
434
435 @Override
436 public String toString()
437 {
438 int activeSize;
439 int idleSize;
440 lock();
441 try
442 {
443 activeSize = activeConnections.size();
444 idleSize = idleConnections.size();
445 }
446 finally
447 {
448 unlock();
449 }
450
451 return String.format("%s[c=%d/%d,a=%d,i=%d]",
452 getClass().getSimpleName(),
453 connectionCount.get(),
454 maxConnections,
455 activeSize,
456 idleSize);
457 }
458 }