1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.util.List;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.concurrent.atomic.AtomicMarkableReference;
25
26 import org.eclipse.jetty.client.api.Request;
27 import org.eclipse.jetty.client.api.Response;
28 import org.eclipse.jetty.client.api.Result;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31
32 public class HttpExchange
33 {
34 private static final Logger LOG = Log.getLogger(HttpExchange.class);
35
36 private final AtomicInteger complete = new AtomicInteger();
37 private final CountDownLatch terminate = new CountDownLatch(2);
38 private final HttpConversation conversation;
39 private final HttpDestination destination;
40 private final Request request;
41 private final List<Response.ResponseListener> listeners;
42 private final HttpResponse response;
43 private volatile HttpConnection connection;
44 private volatile Throwable requestFailure;
45 private volatile Throwable responseFailure;
46
47 public HttpExchange(HttpConversation conversation, HttpDestination destination, Request request, List<Response.ResponseListener> listeners)
48 {
49 this.conversation = conversation;
50 this.destination = destination;
51 this.request = request;
52 this.listeners = listeners;
53 this.response = new HttpResponse(request, listeners);
54 conversation.getExchanges().offer(this);
55 conversation.updateResponseListeners(null);
56 }
57
58 public HttpConversation getConversation()
59 {
60 return conversation;
61 }
62
63 public Request getRequest()
64 {
65 return request;
66 }
67
68 public Throwable getRequestFailure()
69 {
70 return requestFailure;
71 }
72
73 public List<Response.ResponseListener> getResponseListeners()
74 {
75 return listeners;
76 }
77
78 public HttpResponse getResponse()
79 {
80 return response;
81 }
82
83 public Throwable getResponseFailure()
84 {
85 return responseFailure;
86 }
87
88 public void setConnection(HttpConnection connection)
89 {
90 this.connection = connection;
91 }
92
93 public AtomicMarkableReference<Result> requestComplete(Throwable failure)
94 {
95 int requestSuccess = 0b0011;
96 int requestFailure = 0b0001;
97 return complete(failure == null ? requestSuccess : requestFailure, failure);
98 }
99
100 public AtomicMarkableReference<Result> responseComplete(Throwable failure)
101 {
102 if (failure == null)
103 {
104 int responseSuccess = 0b1100;
105 return complete(responseSuccess, failure);
106 }
107 else
108 {
109 proceed(false);
110 int responseFailure = 0b0100;
111 return complete(responseFailure, failure);
112 }
113 }
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133 private AtomicMarkableReference<Result> complete(int code, Throwable failure)
134 {
135 Result result = null;
136 boolean modified = false;
137
138 int current;
139 while (true)
140 {
141 current = complete.get();
142 boolean updateable = (current & code) == 0;
143 if (updateable)
144 {
145 int candidate = current | code;
146 if (!complete.compareAndSet(current, candidate))
147 continue;
148 current = candidate;
149 modified = true;
150 if ((code & 0b01) == 0b01)
151 requestFailure = failure;
152 else
153 responseFailure = failure;
154 LOG.debug("{} updated", this);
155 }
156 break;
157 }
158
159 int completed = 0b0101;
160 if ((current & completed) == completed)
161 {
162 if (modified)
163 {
164
165 LOG.debug("{} complete", this);
166 conversation.complete();
167 }
168 result = new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
169 }
170
171 return new AtomicMarkableReference<>(result, modified);
172 }
173
174 public boolean abort(Throwable cause)
175 {
176 if (destination.remove(this))
177 {
178 destination.abort(this, cause);
179 LOG.debug("Aborted while queued {}: {}", this, cause);
180 return true;
181 }
182 else
183 {
184 HttpConnection connection = this.connection;
185
186 if (connection == null)
187 return false;
188
189 boolean aborted = connection.abort(cause);
190 LOG.debug("Aborted while active ({}) {}: {}", aborted, this, cause);
191 return aborted;
192 }
193 }
194
195 public void resetResponse(boolean success)
196 {
197 int responseSuccess = 0b1100;
198 int responseFailure = 0b0100;
199 int code = success ? responseSuccess : responseFailure;
200 complete.addAndGet(-code);
201 }
202
203 public void proceed(boolean proceed)
204 {
205 HttpConnection connection = this.connection;
206 if (connection != null)
207 connection.proceed(proceed);
208 }
209
210 public void terminateRequest()
211 {
212 terminate.countDown();
213 }
214
215 public void terminateResponse()
216 {
217 terminate.countDown();
218 }
219
220 public void awaitTermination()
221 {
222 try
223 {
224 terminate.await();
225 }
226 catch (InterruptedException x)
227 {
228 LOG.ignore(x);
229 }
230 }
231
232 @Override
233 public String toString()
234 {
235 String padding = "0000";
236 String status = Integer.toBinaryString(complete.get());
237 return String.format("%s@%x status=%s%s",
238 HttpExchange.class.getSimpleName(),
239 hashCode(),
240 padding.substring(status.length()),
241 status);
242 }
243 }