1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.util.List;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.concurrent.atomic.AtomicReference;
25
26 import org.eclipse.jetty.client.api.Request;
27 import org.eclipse.jetty.client.api.Response;
28 import org.eclipse.jetty.client.api.Result;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31
32 public class HttpExchange
33 {
34 private static final Logger LOG = Log.getLogger(HttpExchange.class);
35
36 private final AtomicBoolean requestComplete = new AtomicBoolean();
37 private final AtomicBoolean responseComplete = new AtomicBoolean();
38 private final AtomicInteger complete = new AtomicInteger();
39 private final AtomicReference<HttpChannel> channel = new AtomicReference<>();
40 private final HttpDestination destination;
41 private final HttpRequest request;
42 private final List<Response.ResponseListener> listeners;
43 private final HttpResponse response;
44 private volatile Throwable requestFailure;
45 private volatile Throwable responseFailure;
46
47 public HttpExchange(HttpDestination destination, HttpRequest request, List<Response.ResponseListener> listeners)
48 {
49 this.destination = destination;
50 this.request = request;
51 this.listeners = listeners;
52 this.response = new HttpResponse(request, listeners);
53 HttpConversation conversation = request.getConversation();
54 conversation.getExchanges().offer(this);
55 conversation.updateResponseListeners(null);
56 }
57
58 public HttpConversation getConversation()
59 {
60 return request.getConversation();
61 }
62
63 public Request getRequest()
64 {
65 return request;
66 }
67
68 public Throwable getRequestFailure()
69 {
70 return requestFailure;
71 }
72
73 public List<Response.ResponseListener> getResponseListeners()
74 {
75 return listeners;
76 }
77
78 public HttpResponse getResponse()
79 {
80 return response;
81 }
82
83 public Throwable getResponseFailure()
84 {
85 return responseFailure;
86 }
87
88 public void associate(HttpChannel channel)
89 {
90 if (!this.channel.compareAndSet(null, channel))
91 throw new IllegalStateException();
92 }
93
94 public void disassociate(HttpChannel channel)
95 {
96 if (!this.channel.compareAndSet(channel, null))
97 throw new IllegalStateException();
98 }
99
100 public boolean requestComplete()
101 {
102 return requestComplete.compareAndSet(false, true);
103 }
104
105 public boolean responseComplete()
106 {
107 return responseComplete.compareAndSet(false, true);
108 }
109
110 public Result terminateRequest(Throwable failure)
111 {
112 int requestSuccess = 0b0011;
113 int requestFailure = 0b0001;
114 return terminate(failure == null ? requestSuccess : requestFailure, failure);
115 }
116
117 public Result terminateResponse(Throwable failure)
118 {
119 if (failure == null)
120 {
121 int responseSuccess = 0b1100;
122 return terminate(responseSuccess, null);
123 }
124 else
125 {
126 proceed(failure);
127 int responseFailure = 0b0100;
128 return terminate(responseFailure, failure);
129 }
130 }
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 private Result terminate(int code, Throwable failure)
148 {
149 int current = update(code, failure);
150 int terminated = 0b0101;
151 if ((current & terminated) == terminated)
152 {
153
154 LOG.debug("{} terminated", this);
155 return new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
156 }
157 return null;
158 }
159
160 private int update(int code, Throwable failure)
161 {
162 int current;
163 while (true)
164 {
165 current = complete.get();
166 boolean updateable = (current & code) == 0;
167 if (updateable)
168 {
169 int candidate = current | code;
170 if (!complete.compareAndSet(current, candidate))
171 continue;
172 current = candidate;
173 if ((code & 0b01) == 0b01)
174 requestFailure = failure;
175 if ((code & 0b0100) == 0b0100)
176 responseFailure = failure;
177 LOG.debug("{} updated", this);
178 }
179 break;
180 }
181 return current;
182 }
183
184 public boolean abort(Throwable cause)
185 {
186 if (destination.remove(this))
187 {
188 LOG.debug("Aborting while queued {}: {}", this, cause);
189 return fail(cause);
190 }
191 else
192 {
193 HttpChannel channel = this.channel.get();
194 if (channel == null)
195 return fail(cause);
196
197 boolean aborted = channel.abort(cause);
198 LOG.debug("Aborted while active ({}) {}: {}", aborted, this, cause);
199 return aborted;
200 }
201 }
202
203 private boolean fail(Throwable cause)
204 {
205 if (update(0b0101, cause) == 0b0101)
206 {
207 LOG.debug("Failing {}: {}", this, cause);
208 destination.getRequestNotifier().notifyFailure(request, cause);
209 List<Response.ResponseListener> listeners = getConversation().getResponseListeners();
210 ResponseNotifier responseNotifier = destination.getResponseNotifier();
211 responseNotifier.notifyFailure(listeners, response, cause);
212 responseNotifier.notifyComplete(listeners, new Result(request, cause, response, cause));
213 return true;
214 }
215 else
216 {
217 return false;
218 }
219 }
220
221 public void resetResponse(boolean success)
222 {
223 responseComplete.set(false);
224 int responseSuccess = 0b1100;
225 int responseFailure = 0b0100;
226 int code = success ? responseSuccess : responseFailure;
227 complete.addAndGet(-code);
228 }
229
230 public void proceed(Throwable failure)
231 {
232 HttpChannel channel = this.channel.get();
233 if (channel != null)
234 channel.proceed(this, failure);
235 }
236
237 private String toString(int code)
238 {
239 String padding = "0000";
240 String status = Integer.toBinaryString(code);
241 return String.format("%s@%x status=%s%s",
242 HttpExchange.class.getSimpleName(),
243 hashCode(),
244 padding.substring(status.length()),
245 status);
246 }
247
248 @Override
249 public String toString()
250 {
251 return toString(complete.get());
252 }
253 }