1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.util.List;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.concurrent.atomic.AtomicMarkableReference;
25
26 import org.eclipse.jetty.client.api.Request;
27 import org.eclipse.jetty.client.api.Response;
28 import org.eclipse.jetty.client.api.Result;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31
32 public class HttpExchange
33 {
34 private static final Logger LOG = Log.getLogger(HttpExchange.class);
35
36 private final AtomicInteger complete = new AtomicInteger();
37 private final CountDownLatch terminate = new CountDownLatch(2);
38 private final HttpConversation conversation;
39 private final HttpDestination destination;
40 private final Request request;
41 private final List<Response.ResponseListener> listeners;
42 private final HttpResponse response;
43 private volatile HttpConnection connection;
44 private volatile Throwable requestFailure;
45 private volatile Throwable responseFailure;
46
47
48 public HttpExchange(HttpConversation conversation, HttpDestination destination, Request request, List<Response.ResponseListener> listeners)
49 {
50 this.conversation = conversation;
51 this.destination = destination;
52 this.request = request;
53 this.listeners = listeners;
54 this.response = new HttpResponse(request, listeners);
55 conversation.getExchanges().offer(this);
56 conversation.updateResponseListeners(null);
57 }
58
59 public HttpConversation getConversation()
60 {
61 return conversation;
62 }
63
64 public Request getRequest()
65 {
66 return request;
67 }
68
69 public Throwable getRequestFailure()
70 {
71 return requestFailure;
72 }
73
74 public List<Response.ResponseListener> getResponseListeners()
75 {
76 return listeners;
77 }
78
79 public HttpResponse getResponse()
80 {
81 return response;
82 }
83
84 public Throwable getResponseFailure()
85 {
86 return responseFailure;
87 }
88
89 public void setConnection(HttpConnection connection)
90 {
91 this.connection = connection;
92 }
93
94 public AtomicMarkableReference<Result> requestComplete(Throwable failure)
95 {
96 int requestSuccess = 0b0011;
97 int requestFailure = 0b0001;
98 return complete(failure == null ? requestSuccess : requestFailure, failure);
99 }
100
101 public AtomicMarkableReference<Result> responseComplete(Throwable failure)
102 {
103 if (failure == null)
104 {
105 int responseSuccess = 0b1100;
106 return complete(responseSuccess, failure);
107 }
108 else
109 {
110 proceed(false);
111 int responseFailure = 0b0100;
112 return complete(responseFailure, failure);
113 }
114 }
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 private AtomicMarkableReference<Result> complete(int code, Throwable failure)
135 {
136 Result result = null;
137 boolean modified = false;
138
139 int current;
140 while (true)
141 {
142 current = complete.get();
143 boolean updateable = (current & code) == 0;
144 if (updateable)
145 {
146 int candidate = current | code;
147 if (!complete.compareAndSet(current, candidate))
148 continue;
149 current = candidate;
150 modified = true;
151 if ((code & 0b01) == 0b01)
152 requestFailure = failure;
153 else
154 responseFailure = failure;
155 LOG.debug("{} updated", this);
156 }
157 break;
158 }
159
160 int completed = 0b0101;
161 if ((current & completed) == completed)
162 {
163 if (modified)
164 {
165
166 LOG.debug("{} complete", this);
167 conversation.complete();
168 }
169 result = new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
170 }
171
172 return new AtomicMarkableReference<>(result, modified);
173 }
174
175 public boolean abort(Throwable cause)
176 {
177 if (destination.remove(this))
178 {
179 destination.abort(this, cause);
180 LOG.debug("Aborted while queued {}: {}", this, cause);
181 return true;
182 }
183 else
184 {
185 HttpConnection connection = this.connection;
186
187 if (connection == null)
188 return false;
189
190 boolean aborted = connection.abort(cause);
191 LOG.debug("Aborted while active ({}) {}: {}", aborted, this, cause);
192 return aborted;
193 }
194 }
195
196 public void resetResponse(boolean success)
197 {
198 int responseSuccess = 0b1100;
199 int responseFailure = 0b0100;
200 int code = success ? responseSuccess : responseFailure;
201 complete.addAndGet(-code);
202 }
203
204 public void proceed(boolean proceed)
205 {
206 HttpConnection connection = this.connection;
207 if (connection != null)
208 connection.proceed(proceed);
209 }
210
211 public void terminateRequest()
212 {
213 terminate.countDown();
214 }
215
216 public void terminateResponse()
217 {
218 terminate.countDown();
219 }
220
221 public void awaitTermination()
222 {
223 try
224 {
225 terminate.await();
226 }
227 catch (InterruptedException x)
228 {
229 LOG.ignore(x);
230 }
231 }
232
233 @Override
234 public String toString()
235 {
236 String padding = "0000";
237 String status = Integer.toBinaryString(complete.get());
238 return String.format("%s@%x status=%s%s",
239 HttpExchange.class.getSimpleName(),
240 hashCode(),
241 padding.substring(status.length()),
242 status);
243 }
244 }