1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.util.Collections;
23
24 import org.eclipse.jetty.client.api.Connection;
25 import org.eclipse.jetty.client.api.Request;
26 import org.eclipse.jetty.util.Callback;
27 import org.eclipse.jetty.util.annotation.ManagedAttribute;
28 import org.eclipse.jetty.util.annotation.ManagedObject;
29 import org.eclipse.jetty.util.component.ContainerLifeCycle;
30 import org.eclipse.jetty.util.thread.Sweeper;
31
32 @ManagedObject
33 public abstract class PoolingHttpDestination<C extends Connection> extends HttpDestination implements Callback
34 {
35 private DuplexConnectionPool connectionPool;
36
37 public PoolingHttpDestination(HttpClient client, Origin origin)
38 {
39 super(client, origin);
40 this.connectionPool = newConnectionPool(client);
41 addBean(connectionPool);
42 Sweeper sweeper = client.getBean(Sweeper.class);
43 if (sweeper != null)
44 sweeper.offer(connectionPool);
45 }
46
47 @Override
48 protected void doStart() throws Exception
49 {
50 HttpClient client = getHttpClient();
51 this.connectionPool = newConnectionPool(client);
52 addBean(connectionPool);
53 super.doStart();
54 Sweeper sweeper = client.getBean(Sweeper.class);
55 if (sweeper != null)
56 sweeper.offer(connectionPool);
57 }
58
59 @Override
60 protected void doStop() throws Exception
61 {
62 HttpClient client = getHttpClient();
63 Sweeper sweeper = client.getBean(Sweeper.class);
64 if (sweeper != null)
65 sweeper.remove(connectionPool);
66 super.doStop();
67 removeBean(connectionPool);
68 }
69
70 protected DuplexConnectionPool newConnectionPool(HttpClient client)
71 {
72 return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
73 }
74
75 @ManagedAttribute(value = "The connection pool", readonly = true)
76 public DuplexConnectionPool getConnectionPool()
77 {
78 return connectionPool;
79 }
80
81 @Override
82 public void succeeded()
83 {
84 send();
85 }
86
87 @Override
88 public void failed(final Throwable x)
89 {
90 abort(x);
91 }
92
93 public void send()
94 {
95 if (getHttpExchanges().isEmpty())
96 return;
97 process();
98 }
99
100 @SuppressWarnings("unchecked")
101 public C acquire()
102 {
103 return (C)connectionPool.acquire();
104 }
105
106 private void process()
107 {
108 while (true)
109 {
110 C connection = acquire();
111 if (connection == null)
112 break;
113 boolean proceed = process(connection);
114 if (!proceed)
115 break;
116 }
117 }
118
119
120
121
122
123
124
125
126
127
128
129 public boolean process(final C connection)
130 {
131 HttpClient client = getHttpClient();
132 final HttpExchange exchange = getHttpExchanges().poll();
133 if (LOG.isDebugEnabled())
134 LOG.debug("Processing exchange {} on {} of {}", exchange, connection, this);
135 if (exchange == null)
136 {
137 if (!connectionPool.release(connection))
138 connection.close();
139 if (!client.isRunning())
140 {
141 if (LOG.isDebugEnabled())
142 LOG.debug("{} is stopping", client);
143 connection.close();
144 }
145 return false;
146 }
147 else
148 {
149 final Request request = exchange.getRequest();
150 Throwable cause = request.getAbortCause();
151 if (cause != null)
152 {
153 if (LOG.isDebugEnabled())
154 LOG.debug("Aborted before processing {}: {}", exchange, cause);
155
156
157
158 exchange.abort(cause);
159 }
160 else
161 {
162 SendFailure result = send(connection, exchange);
163 if (result != null)
164 {
165 if (LOG.isDebugEnabled())
166 LOG.debug("Send failed {} for {}", result, exchange);
167 if (result.retry)
168 {
169 if (enqueue(getHttpExchanges(), exchange))
170 return true;
171 }
172
173 request.abort(result.failure);
174 }
175 }
176 return getHttpExchanges().peek() != null;
177 }
178 }
179
180 protected abstract SendFailure send(C connection, HttpExchange exchange);
181
182 @Override
183 public void release(Connection c)
184 {
185 @SuppressWarnings("unchecked")
186 C connection = (C)c;
187 if (LOG.isDebugEnabled())
188 LOG.debug("Released {}", connection);
189 HttpClient client = getHttpClient();
190 if (client.isRunning())
191 {
192 if (connectionPool.isActive(connection))
193 {
194 if (connectionPool.release(connection))
195 send();
196 else
197 connection.close();
198 }
199 else
200 {
201 if (LOG.isDebugEnabled())
202 LOG.debug("Released explicit {}", connection);
203 }
204 }
205 else
206 {
207 if (LOG.isDebugEnabled())
208 LOG.debug("{} is stopped", client);
209 connection.close();
210 }
211 }
212
213 @Override
214 public void close(Connection connection)
215 {
216 super.close(connection);
217
218 boolean removed = connectionPool.remove(connection);
219
220 if (getHttpExchanges().isEmpty())
221 {
222 if (getHttpClient().isRemoveIdleDestinations() && connectionPool.isEmpty())
223 {
224
225
226
227
228
229
230 getHttpClient().removeDestination(this);
231 }
232 }
233 else
234 {
235
236
237
238 if (removed)
239 process();
240 }
241 }
242
243 public void close()
244 {
245 super.close();
246 connectionPool.close();
247 }
248
249 @Override
250 public void dump(Appendable out, String indent) throws IOException
251 {
252 super.dump(out, indent);
253 ContainerLifeCycle.dump(out, indent, Collections.singletonList(connectionPool));
254 }
255
256 @Override
257 public String toString()
258 {
259 return String.format("%s,pool=%s", super.toString(), connectionPool);
260 }
261 }