1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.util.Collections;
23
24 import org.eclipse.jetty.client.api.Connection;
25 import org.eclipse.jetty.client.api.Request;
26 import org.eclipse.jetty.util.Callback;
27 import org.eclipse.jetty.util.annotation.ManagedAttribute;
28 import org.eclipse.jetty.util.annotation.ManagedObject;
29 import org.eclipse.jetty.util.component.ContainerLifeCycle;
30 import org.eclipse.jetty.util.thread.Sweeper;
31
32 @ManagedObject
33 public abstract class PoolingHttpDestination<C extends Connection> extends HttpDestination implements Callback
34 {
35 private DuplexConnectionPool connectionPool;
36
37 public PoolingHttpDestination(HttpClient client, Origin origin)
38 {
39 super(client, origin);
40 this.connectionPool = newConnectionPool(client);
41 addBean(connectionPool);
42 Sweeper sweeper = client.getBean(Sweeper.class);
43 if (sweeper != null)
44 sweeper.offer(connectionPool);
45 }
46
47 @Override
48 protected void doStart() throws Exception
49 {
50 HttpClient client = getHttpClient();
51 this.connectionPool = newConnectionPool(client);
52 addBean(connectionPool);
53 super.doStart();
54 Sweeper sweeper = client.getBean(Sweeper.class);
55 if (sweeper != null)
56 sweeper.offer(connectionPool);
57 }
58
59 @Override
60 protected void doStop() throws Exception
61 {
62 HttpClient client = getHttpClient();
63 Sweeper sweeper = client.getBean(Sweeper.class);
64 if (sweeper != null)
65 sweeper.remove(connectionPool);
66 super.doStop();
67 removeBean(connectionPool);
68 }
69
70 protected DuplexConnectionPool newConnectionPool(HttpClient client)
71 {
72 return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
73 }
74
75 @ManagedAttribute(value = "The connection pool", readonly = true)
76 public DuplexConnectionPool getConnectionPool()
77 {
78 return connectionPool;
79 }
80
81 @Override
82 public void succeeded()
83 {
84 send();
85 }
86
87 @Override
88 public void failed(final Throwable x)
89 {
90 abort(x);
91 }
92
93 public void send()
94 {
95 if (getHttpExchanges().isEmpty())
96 return;
97 process();
98 }
99
100 @SuppressWarnings("unchecked")
101 public C acquire()
102 {
103 return (C)connectionPool.acquire();
104 }
105
106 private void process()
107 {
108 while (true)
109 {
110 C connection = acquire();
111 if (connection == null)
112 break;
113 boolean proceed = process(connection);
114 if (!proceed)
115 break;
116 }
117 }
118
119
120
121
122
123
124
125
126
127
128
129 public boolean process(final C connection)
130 {
131 HttpClient client = getHttpClient();
132 final HttpExchange exchange = getHttpExchanges().poll();
133 if (LOG.isDebugEnabled())
134 LOG.debug("Processing exchange {} on {} of {}", exchange, connection, this);
135 if (exchange == null)
136 {
137 if (!connectionPool.release(connection))
138 connection.close();
139 if (!client.isRunning())
140 {
141 if (LOG.isDebugEnabled())
142 LOG.debug("{} is stopping", client);
143 connection.close();
144 }
145 return false;
146 }
147 else
148 {
149 final Request request = exchange.getRequest();
150 Throwable cause = request.getAbortCause();
151 if (cause != null)
152 {
153 if (LOG.isDebugEnabled())
154 LOG.debug("Aborted before processing {}: {}", exchange, cause);
155
156 if (!connectionPool.release(connection))
157 connection.close();
158
159
160
161 exchange.abort(cause);
162 }
163 else
164 {
165 SendFailure result = send(connection, exchange);
166 if (result != null)
167 {
168 if (LOG.isDebugEnabled())
169 LOG.debug("Send failed {} for {}", result, exchange);
170 if (result.retry)
171 {
172 if (enqueue(getHttpExchanges(), exchange))
173 return true;
174 }
175
176 request.abort(result.failure);
177 }
178 }
179 return getHttpExchanges().peek() != null;
180 }
181 }
182
183 protected abstract SendFailure send(C connection, HttpExchange exchange);
184
185 @Override
186 public void release(Connection c)
187 {
188 @SuppressWarnings("unchecked")
189 C connection = (C)c;
190 if (LOG.isDebugEnabled())
191 LOG.debug("Released {}", connection);
192 HttpClient client = getHttpClient();
193 if (client.isRunning())
194 {
195 if (connectionPool.isActive(connection))
196 {
197 if (connectionPool.release(connection))
198 send();
199 else
200 connection.close();
201 }
202 else
203 {
204 if (LOG.isDebugEnabled())
205 LOG.debug("Released explicit {}", connection);
206 }
207 }
208 else
209 {
210 if (LOG.isDebugEnabled())
211 LOG.debug("{} is stopped", client);
212 connection.close();
213 }
214 }
215
216 @Override
217 public void close(Connection connection)
218 {
219 super.close(connection);
220
221 boolean removed = connectionPool.remove(connection);
222
223 if (getHttpExchanges().isEmpty())
224 {
225 if (getHttpClient().isRemoveIdleDestinations() && connectionPool.isEmpty())
226 {
227
228
229
230
231
232
233 getHttpClient().removeDestination(this);
234 }
235 }
236 else
237 {
238
239
240
241 if (removed)
242 process();
243 }
244 }
245
246 public void close()
247 {
248 super.close();
249 connectionPool.close();
250 }
251
252 @Override
253 public void dump(Appendable out, String indent) throws IOException
254 {
255 super.dump(out, indent);
256 ContainerLifeCycle.dump(out, indent, Collections.singletonList(connectionPool));
257 }
258
259 @Override
260 public String toString()
261 {
262 return String.format("%s,pool=%s", super.toString(), connectionPool);
263 }
264 }