1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.util.concurrent.atomic.AtomicInteger;
22 import java.util.concurrent.atomic.AtomicReference;
23
24 import org.eclipse.jetty.client.api.Connection;
25 import org.eclipse.jetty.client.api.Request;
26 import org.eclipse.jetty.util.Promise;
27
28 public abstract class MultiplexHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
29 {
30 private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED);
31 private final AtomicInteger requestsPerConnection = new AtomicInteger();
32 private int maxRequestsPerConnection = 1024;
33 private C connection;
34
35 protected MultiplexHttpDestination(HttpClient client, Origin origin)
36 {
37 super(client, origin);
38 }
39
40 public int getMaxRequestsPerConnection()
41 {
42 return maxRequestsPerConnection;
43 }
44
45 public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
46 {
47 this.maxRequestsPerConnection = maxRequestsPerConnection;
48 }
49
50 @Override
51 public void send()
52 {
53 while (true)
54 {
55 ConnectState current = connect.get();
56 switch (current)
57 {
58 case DISCONNECTED:
59 {
60 if (!connect.compareAndSet(current, ConnectState.CONNECTING))
61 break;
62 newConnection(this);
63 return;
64 }
65 case CONNECTING:
66 {
67
68 return;
69 }
70 case CONNECTED:
71 {
72 if (process(connection))
73 break;
74 return;
75 }
76 default:
77 {
78 abort(new IllegalStateException("Invalid connection state " + current));
79 return;
80 }
81 }
82 }
83 }
84
85 @Override
86 @SuppressWarnings("unchecked")
87 public void succeeded(Connection result)
88 {
89 C connection = this.connection = (C)result;
90 if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
91 {
92 send();
93 }
94 else
95 {
96 connection.close();
97 failed(new IllegalStateException("Invalid connection state " + connect));
98 }
99 }
100
101 @Override
102 public void failed(Throwable x)
103 {
104 connect.set(ConnectState.DISCONNECTED);
105 abort(x);
106 }
107
108 protected boolean process(final C connection)
109 {
110 while (true)
111 {
112 int max = getMaxRequestsPerConnection();
113 int count = requestsPerConnection.get();
114 int next = count + 1;
115 if (next > max)
116 return false;
117
118 if (requestsPerConnection.compareAndSet(count, next))
119 {
120 HttpExchange exchange = getHttpExchanges().poll();
121 if (LOG.isDebugEnabled())
122 LOG.debug("Processing {}/{} {} on {}", next, max, exchange, connection);
123 if (exchange == null)
124 {
125 requestsPerConnection.decrementAndGet();
126 return false;
127 }
128
129 final Request request = exchange.getRequest();
130 Throwable cause = request.getAbortCause();
131 if (cause != null)
132 {
133 if (LOG.isDebugEnabled())
134 LOG.debug("Aborted before processing {}: {}", exchange, cause);
135
136
137
138 exchange.abort(cause);
139 requestsPerConnection.decrementAndGet();
140 }
141 else
142 {
143 SendFailure result = send(connection, exchange);
144 if (result != null)
145 {
146 if (LOG.isDebugEnabled())
147 LOG.debug("Send failed {} for {}", result, exchange);
148 if (result.retry)
149 {
150 if (enqueue(getHttpExchanges(), exchange))
151 return true;
152 }
153
154 request.abort(result.failure);
155 }
156 }
157 return getHttpExchanges().peek() != null;
158 }
159 }
160 }
161
162 @Override
163 public void release(Connection connection)
164 {
165 requestsPerConnection.decrementAndGet();
166 send();
167 }
168
169 @Override
170 public void close()
171 {
172 super.close();
173 C connection = this.connection;
174 if (connection != null)
175 connection.close();
176 }
177
178 @Override
179 public void close(Connection connection)
180 {
181 super.close(connection);
182 while (true)
183 {
184 ConnectState current = connect.get();
185 if (connect.compareAndSet(current, ConnectState.DISCONNECTED))
186 {
187 if (getHttpClient().isRemoveIdleDestinations())
188 getHttpClient().removeDestination(this);
189 break;
190 }
191 }
192 }
193
194 protected abstract SendFailure send(C connection, HttpExchange exchange);
195
196 private enum ConnectState
197 {
198 DISCONNECTED, CONNECTING, CONNECTED
199 }
200 }