1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.util.List;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.concurrent.atomic.AtomicReference;
25
26 import org.eclipse.jetty.client.api.Request;
27 import org.eclipse.jetty.client.api.Response;
28 import org.eclipse.jetty.client.api.Result;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31
32 public class HttpExchange
33 {
34 private static final Logger LOG = Log.getLogger(HttpExchange.class);
35
36 private final AtomicBoolean requestComplete = new AtomicBoolean();
37 private final AtomicBoolean responseComplete = new AtomicBoolean();
38 private final AtomicInteger complete = new AtomicInteger();
39 private final AtomicReference<HttpChannel> channel = new AtomicReference<>();
40 private final HttpDestination destination;
41 private final HttpRequest request;
42 private final List<Response.ResponseListener> listeners;
43 private final HttpResponse response;
44 private volatile Throwable requestFailure;
45 private volatile Throwable responseFailure;
46
47 public HttpExchange(HttpDestination destination, HttpRequest request, List<Response.ResponseListener> listeners)
48 {
49 this.destination = destination;
50 this.request = request;
51 this.listeners = listeners;
52 this.response = new HttpResponse(request, listeners);
53 HttpConversation conversation = request.getConversation();
54 conversation.getExchanges().offer(this);
55 conversation.updateResponseListeners(null);
56 }
57
58 public HttpConversation getConversation()
59 {
60 return request.getConversation();
61 }
62
63 public Request getRequest()
64 {
65 return request;
66 }
67
68 public Throwable getRequestFailure()
69 {
70 return requestFailure;
71 }
72
73 public List<Response.ResponseListener> getResponseListeners()
74 {
75 return listeners;
76 }
77
78 public HttpResponse getResponse()
79 {
80 return response;
81 }
82
83 public Throwable getResponseFailure()
84 {
85 return responseFailure;
86 }
87
88 public void associate(HttpChannel channel)
89 {
90 if (!this.channel.compareAndSet(null, channel))
91 throw new IllegalStateException();
92 }
93
94 public void disassociate(HttpChannel channel)
95 {
96 if (!this.channel.compareAndSet(channel, null))
97 throw new IllegalStateException();
98 }
99
100 public boolean requestComplete()
101 {
102 return requestComplete.compareAndSet(false, true);
103 }
104
105 public boolean responseComplete()
106 {
107 return responseComplete.compareAndSet(false, true);
108 }
109
110 public Result terminateRequest(Throwable failure)
111 {
112 int requestSuccess = 0b0011;
113 int requestFailure = 0b0001;
114 return terminate(failure == null ? requestSuccess : requestFailure, failure);
115 }
116
117 public Result terminateResponse(Throwable failure)
118 {
119 if (failure == null)
120 {
121 int responseSuccess = 0b1100;
122 return terminate(responseSuccess, null);
123 }
124 else
125 {
126 proceed(failure);
127 int responseFailure = 0b0100;
128 return terminate(responseFailure, failure);
129 }
130 }
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 private Result terminate(int code, Throwable failure)
148 {
149 int current = update(code, failure);
150 int terminated = 0b0101;
151 if ((current & terminated) == terminated)
152 {
153
154 if (LOG.isDebugEnabled())
155 LOG.debug("{} terminated", this);
156 return new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
157 }
158 return null;
159 }
160
161 private int update(int code, Throwable failure)
162 {
163 int current;
164 while (true)
165 {
166 current = complete.get();
167 boolean updateable = (current & code) == 0;
168 if (updateable)
169 {
170 int candidate = current | code;
171 if (!complete.compareAndSet(current, candidate))
172 continue;
173 current = candidate;
174 if ((code & 0b01) == 0b01)
175 requestFailure = failure;
176 if ((code & 0b0100) == 0b0100)
177 responseFailure = failure;
178 if (LOG.isDebugEnabled())
179 LOG.debug("{} updated", this);
180 }
181 break;
182 }
183 return current;
184 }
185
186 public boolean abort(Throwable cause)
187 {
188 if (destination.remove(this))
189 {
190 if (LOG.isDebugEnabled())
191 LOG.debug("Aborting while queued {}: {}", this, cause);
192 return fail(cause);
193 }
194 else
195 {
196 HttpChannel channel = this.channel.get();
197 if (channel == null)
198 return fail(cause);
199
200 boolean aborted = channel.abort(cause);
201 if (LOG.isDebugEnabled())
202 LOG.debug("Aborted while active ({}) {}: {}", aborted, this, cause);
203 return aborted;
204 }
205 }
206
207 private boolean fail(Throwable cause)
208 {
209 if (update(0b0101, cause) == 0b0101)
210 {
211 if (LOG.isDebugEnabled())
212 LOG.debug("Failing {}: {}", this, cause);
213 destination.getRequestNotifier().notifyFailure(request, cause);
214 List<Response.ResponseListener> listeners = getConversation().getResponseListeners();
215 ResponseNotifier responseNotifier = destination.getResponseNotifier();
216 responseNotifier.notifyFailure(listeners, response, cause);
217 responseNotifier.notifyComplete(listeners, new Result(request, cause, response, cause));
218 return true;
219 }
220 else
221 {
222 return false;
223 }
224 }
225
226 public void resetResponse(boolean success)
227 {
228 responseComplete.set(false);
229 int responseSuccess = 0b1100;
230 int responseFailure = 0b0100;
231 int code = success ? responseSuccess : responseFailure;
232 complete.addAndGet(-code);
233 }
234
235 public void proceed(Throwable failure)
236 {
237 HttpChannel channel = this.channel.get();
238 if (channel != null)
239 channel.proceed(this, failure);
240 }
241
242 private String toString(int code)
243 {
244 String padding = "0000";
245 String status = Integer.toBinaryString(code);
246 return String.format("%s@%x status=%s%s",
247 HttpExchange.class.getSimpleName(),
248 hashCode(),
249 padding.substring(status.length()),
250 status);
251 }
252
253 @Override
254 public String toString()
255 {
256 return toString(complete.get());
257 }
258 }