1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.util.Arrays;
23
24 import org.eclipse.jetty.client.api.Connection;
25 import org.eclipse.jetty.client.api.Request;
26 import org.eclipse.jetty.util.Promise;
27 import org.eclipse.jetty.util.component.ContainerLifeCycle;
28
29 public abstract class PoolingHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
30 {
31 private final ConnectionPool connectionPool;
32
33 public PoolingHttpDestination(HttpClient client, Origin origin)
34 {
35 super(client, origin);
36 this.connectionPool = newConnectionPool(client);
37 }
38
39 protected ConnectionPool newConnectionPool(HttpClient client)
40 {
41 return new ConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
42 }
43
44 public ConnectionPool getConnectionPool()
45 {
46 return connectionPool;
47 }
48
49 @Override
50 @SuppressWarnings("unchecked")
51 public void succeeded(Connection connection)
52 {
53 process((C)connection, true);
54 }
55
56 @Override
57 public void failed(final Throwable x)
58 {
59 getHttpClient().getExecutor().execute(new Runnable()
60 {
61 @Override
62 public void run()
63 {
64 abort(x);
65 }
66 });
67 }
68
69 protected void send()
70 {
71 C connection = acquire();
72 if (connection != null)
73 process(connection, false);
74 }
75
76 @SuppressWarnings("unchecked")
77 public C acquire()
78 {
79 return (C)connectionPool.acquire();
80 }
81
82
83
84
85
86
87
88
89
90
91
92 public void process(final C connection, boolean dispatch)
93 {
94 HttpClient client = getHttpClient();
95 final HttpExchange exchange = getHttpExchanges().poll();
96 LOG.debug("Processing exchange {} on connection {}", exchange, connection);
97 if (exchange == null)
98 {
99
100
101
102 if (!connectionPool.release(connection))
103 connection.close();
104
105 if (!client.isRunning())
106 {
107 LOG.debug("{} is stopping", client);
108 connection.close();
109 }
110 }
111 else
112 {
113 final Request request = exchange.getRequest();
114 Throwable cause = request.getAbortCause();
115 if (cause != null)
116 {
117 abort(exchange, cause);
118 LOG.debug("Aborted before processing {}: {}", exchange, cause);
119 }
120 else
121 {
122 if (dispatch)
123 {
124 client.getExecutor().execute(new Runnable()
125 {
126 @Override
127 public void run()
128 {
129 send(connection, exchange);
130 }
131 });
132 }
133 else
134 {
135 send(connection, exchange);
136 }
137 }
138 }
139 }
140
141 protected abstract void send(C connection, HttpExchange exchange);
142
143 public void release(C connection)
144 {
145 LOG.debug("{} released", connection);
146 HttpClient client = getHttpClient();
147 if (client.isRunning())
148 {
149 if (connectionPool.isActive(connection))
150 process(connection, false);
151 else
152 LOG.debug("{} explicit", connection);
153 }
154 else
155 {
156 LOG.debug("{} is stopped", client);
157 close(connection);
158 connection.close();
159 }
160 }
161
162 @Override
163 public void close(Connection oldConnection)
164 {
165 super.close(oldConnection);
166 connectionPool.remove(oldConnection);
167
168
169
170
171 if (!getHttpExchanges().isEmpty())
172 {
173 C newConnection = acquire();
174 if (newConnection != null)
175 process(newConnection, false);
176 }
177 }
178
179 public void close()
180 {
181 connectionPool.close();
182 }
183
184 @Override
185 public void dump(Appendable out, String indent) throws IOException
186 {
187 ContainerLifeCycle.dump(out, indent, Arrays.asList(connectionPool));
188 }
189 }