1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.util.concurrent.BlockingDeque;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingDeque;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import org.eclipse.jetty.client.api.Connection;
28 import org.eclipse.jetty.client.api.Destination;
29 import org.eclipse.jetty.util.BlockingArrayQueue;
30 import org.eclipse.jetty.util.Promise;
31 import org.eclipse.jetty.util.component.ContainerLifeCycle;
32 import org.eclipse.jetty.util.component.Dumpable;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35
36 public class ConnectionPool implements Dumpable
37 {
38 private static final Logger LOG = Log.getLogger(ConnectionPool.class);
39
40 private final AtomicInteger connectionCount = new AtomicInteger();
41 private final Destination destination;
42 private final int maxConnections;
43 private final Promise<Connection> connectionPromise;
44 private final BlockingDeque<Connection> idleConnections;
45 private final BlockingQueue<Connection> activeConnections;
46
47 public ConnectionPool(Destination destination, int maxConnections, Promise<Connection> connectionPromise)
48 {
49 this.destination = destination;
50 this.maxConnections = maxConnections;
51 this.connectionPromise = connectionPromise;
52 this.idleConnections = new LinkedBlockingDeque<>(maxConnections);
53 this.activeConnections = new BlockingArrayQueue<>(maxConnections);
54 }
55
56 public BlockingQueue<Connection> getIdleConnections()
57 {
58 return idleConnections;
59 }
60
61 public BlockingQueue<Connection> getActiveConnections()
62 {
63 return activeConnections;
64 }
65
66 public Connection acquire()
67 {
68 Connection result = acquireIdleConnection();
69 if (result != null)
70 return result;
71
72 while (true)
73 {
74 int current = connectionCount.get();
75 final int next = current + 1;
76
77 if (next > maxConnections)
78 {
79 LOG.debug("Max connections {}/{} reached", current, maxConnections);
80
81 return acquireIdleConnection();
82 }
83
84 if (connectionCount.compareAndSet(current, next))
85 {
86 LOG.debug("Connection {}/{} creation", next, maxConnections);
87
88 destination.newConnection(new Promise<Connection>()
89 {
90 @Override
91 public void succeeded(Connection connection)
92 {
93 LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection);
94 activate(connection);
95 connectionPromise.succeeded(connection);
96 }
97
98 @Override
99 public void failed(Throwable x)
100 {
101 LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x);
102 connectionCount.decrementAndGet();
103 connectionPromise.failed(x);
104 }
105 });
106
107
108 return acquireIdleConnection();
109 }
110 }
111 }
112
113 private Connection acquireIdleConnection()
114 {
115 Connection connection = idleConnections.pollFirst();
116 if (connection != null)
117 activate(connection);
118 return connection;
119 }
120
121 private boolean activate(Connection connection)
122 {
123 if (activeConnections.offer(connection))
124 {
125 LOG.debug("Connection active {}", connection);
126 return true;
127 }
128 else
129 {
130 LOG.debug("Connection active overflow {}", connection);
131 return false;
132 }
133 }
134
135 public boolean release(Connection connection)
136 {
137 if (activeConnections.remove(connection))
138 {
139
140 if (idleConnections.offerFirst(connection))
141 {
142 LOG.debug("Connection idle {}", connection);
143 return true;
144 }
145 else
146 {
147 LOG.debug("Connection idle overflow {}", connection);
148 }
149 }
150 return false;
151 }
152
153 public boolean remove(Connection connection)
154 {
155 boolean removed = activeConnections.remove(connection);
156 removed |= idleConnections.remove(connection);
157 if (removed)
158 {
159 int pooled = connectionCount.decrementAndGet();
160 LOG.debug("Connection removed {} - pooled: {}", connection, pooled);
161 }
162 return removed;
163 }
164
165 public boolean isActive(Connection connection)
166 {
167 return activeConnections.contains(connection);
168 }
169
170 public boolean isIdle(Connection connection)
171 {
172 return idleConnections.contains(connection);
173 }
174
175 public void close()
176 {
177 for (Connection connection : idleConnections)
178 connection.close();
179 idleConnections.clear();
180
181
182 for (Connection connection : activeConnections)
183 connection.close();
184 activeConnections.clear();
185
186 connectionCount.set(0);
187 }
188
189 @Override
190 public String dump()
191 {
192 return ContainerLifeCycle.dump(this);
193 }
194
195 @Override
196 public void dump(Appendable out, String indent) throws IOException
197 {
198 ContainerLifeCycle.dumpObject(out, this);
199 ContainerLifeCycle.dump(out, indent, activeConnections, idleConnections);
200 }
201
202 @Override
203 public String toString()
204 {
205 return String.format("%s %d/%d", getClass().getSimpleName(), connectionCount.get(), maxConnections);
206 }
207 }