1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.util.Arrays;
23
24 import org.eclipse.jetty.client.api.Connection;
25 import org.eclipse.jetty.client.api.Request;
26 import org.eclipse.jetty.util.Promise;
27 import org.eclipse.jetty.util.component.ContainerLifeCycle;
28
29 public abstract class PoolingHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
30 {
31 private final ConnectionPool connectionPool;
32
33 public PoolingHttpDestination(HttpClient client, Origin origin)
34 {
35 super(client, origin);
36 this.connectionPool = newConnectionPool(client);
37 }
38
39 protected ConnectionPool newConnectionPool(HttpClient client)
40 {
41 return new ConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
42 }
43
44 public ConnectionPool getConnectionPool()
45 {
46 return connectionPool;
47 }
48
49 @Override
50 @SuppressWarnings("unchecked")
51 public void succeeded(Connection connection)
52 {
53 process((C)connection, true);
54 }
55
56 @Override
57 public void failed(final Throwable x)
58 {
59 getHttpClient().getExecutor().execute(new Runnable()
60 {
61 @Override
62 public void run()
63 {
64 abort(x);
65 }
66 });
67 }
68
69 protected void send()
70 {
71 C connection = acquire();
72 if (connection != null)
73 process(connection, false);
74 }
75
76 @SuppressWarnings("unchecked")
77 public C acquire()
78 {
79 return (C)connectionPool.acquire();
80 }
81
82
83
84
85
86
87
88
89
90
91
92 public void process(final C connection, boolean dispatch)
93 {
94 HttpClient client = getHttpClient();
95 final HttpExchange exchange = getHttpExchanges().poll();
96 if (LOG.isDebugEnabled())
97 LOG.debug("Processing exchange {} on {} of {}", exchange, connection, this);
98 if (exchange == null)
99 {
100 if (!connectionPool.release(connection))
101 connection.close();
102
103 if (!client.isRunning())
104 {
105 if (LOG.isDebugEnabled())
106 LOG.debug("{} is stopping", client);
107 connection.close();
108 }
109 }
110 else
111 {
112 final Request request = exchange.getRequest();
113 Throwable cause = request.getAbortCause();
114 if (cause != null)
115 {
116 if (LOG.isDebugEnabled())
117 LOG.debug("Aborted before processing {}: {}", exchange, cause);
118
119
120
121 exchange.abort(cause);
122 }
123 else
124 {
125 if (dispatch)
126 {
127 client.getExecutor().execute(new Runnable()
128 {
129 @Override
130 public void run()
131 {
132 send(connection, exchange);
133 }
134 });
135 }
136 else
137 {
138 send(connection, exchange);
139 }
140 }
141 }
142 }
143
144 protected abstract void send(C connection, HttpExchange exchange);
145
146 @Override
147 public void release(Connection c)
148 {
149 @SuppressWarnings("unchecked")
150 C connection = (C)c;
151 if (LOG.isDebugEnabled())
152 LOG.debug("{} released", connection);
153 HttpClient client = getHttpClient();
154 if (client.isRunning())
155 {
156 if (connectionPool.isActive(connection))
157 {
158 process(connection, false);
159 }
160 else
161 {
162 if (LOG.isDebugEnabled())
163 LOG.debug("{} explicit", connection);
164 }
165 }
166 else
167 {
168 if (LOG.isDebugEnabled())
169 LOG.debug("{} is stopped", client);
170 close(connection);
171 connection.close();
172 }
173 }
174
175 @Override
176 public void close(Connection oldConnection)
177 {
178 super.close(oldConnection);
179 connectionPool.remove(oldConnection);
180
181 if (getHttpExchanges().isEmpty())
182 {
183 if (getHttpClient().isRemoveIdleDestinations() && connectionPool.isEmpty())
184 {
185
186
187
188
189
190
191 getHttpClient().removeDestination(this);
192 }
193 }
194 else
195 {
196
197
198
199 C newConnection = acquire();
200 if (newConnection != null)
201 process(newConnection, false);
202 }
203 }
204
205 public void close()
206 {
207 super.close();
208 connectionPool.close();
209 }
210
211 @Override
212 public void dump(Appendable out, String indent) throws IOException
213 {
214 ContainerLifeCycle.dump(out, indent, Arrays.asList(connectionPool));
215 }
216
217 @Override
218 public String toString()
219 {
220 return String.format("%s,pool=%s", super.toString(), connectionPool);
221 }
222 }