1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.util.List;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.concurrent.atomic.AtomicReference;
25
26 import org.eclipse.jetty.client.api.Request;
27 import org.eclipse.jetty.client.api.Response;
28 import org.eclipse.jetty.client.api.Result;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31
32 public class HttpExchange
33 {
34 private static final Logger LOG = Log.getLogger(HttpExchange.class);
35
36 private final AtomicBoolean requestComplete = new AtomicBoolean();
37 private final AtomicBoolean responseComplete = new AtomicBoolean();
38 private final AtomicInteger complete = new AtomicInteger();
39 private final AtomicReference<HttpChannel> channel = new AtomicReference<>();
40 private final HttpConversation conversation;
41 private final HttpDestination destination;
42 private final Request request;
43 private final List<Response.ResponseListener> listeners;
44 private final HttpResponse response;
45 private volatile Throwable requestFailure;
46 private volatile Throwable responseFailure;
47
48
49 public HttpExchange(HttpConversation conversation, HttpDestination destination, Request request, List<Response.ResponseListener> listeners)
50 {
51 this.conversation = conversation;
52 this.destination = destination;
53 this.request = request;
54 this.listeners = listeners;
55 this.response = new HttpResponse(request, listeners);
56 conversation.getExchanges().offer(this);
57 conversation.updateResponseListeners(null);
58 }
59
60 public HttpConversation getConversation()
61 {
62 return conversation;
63 }
64
65 public Request getRequest()
66 {
67 return request;
68 }
69
70 public Throwable getRequestFailure()
71 {
72 return requestFailure;
73 }
74
75 public List<Response.ResponseListener> getResponseListeners()
76 {
77 return listeners;
78 }
79
80 public HttpResponse getResponse()
81 {
82 return response;
83 }
84
85 public Throwable getResponseFailure()
86 {
87 return responseFailure;
88 }
89
90 public void associate(HttpChannel channel)
91 {
92 if (!this.channel.compareAndSet(null, channel))
93 throw new IllegalStateException();
94 }
95
96 public void disassociate(HttpChannel channel)
97 {
98 if (!this.channel.compareAndSet(channel, null))
99 throw new IllegalStateException();
100 }
101
102 public boolean requestComplete()
103 {
104 return requestComplete.compareAndSet(false, true);
105 }
106
107 public boolean responseComplete()
108 {
109 return responseComplete.compareAndSet(false, true);
110 }
111
112 public Result terminateRequest(Throwable failure)
113 {
114 int requestSuccess = 0b0011;
115 int requestFailure = 0b0001;
116 return terminate(failure == null ? requestSuccess : requestFailure, failure);
117 }
118
119 public Result terminateResponse(Throwable failure)
120 {
121 if (failure == null)
122 {
123 int responseSuccess = 0b1100;
124 return terminate(responseSuccess, failure);
125 }
126 else
127 {
128 proceed(false);
129 int responseFailure = 0b0100;
130 return terminate(responseFailure, failure);
131 }
132 }
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 private Result terminate(int code, Throwable failure)
150 {
151 int current;
152 while (true)
153 {
154 current = complete.get();
155 boolean updateable = (current & code) == 0;
156 if (updateable)
157 {
158 int candidate = current | code;
159 if (!complete.compareAndSet(current, candidate))
160 continue;
161 current = candidate;
162 if ((code & 0b01) == 0b01)
163 requestFailure = failure;
164 else
165 responseFailure = failure;
166 LOG.debug("{} updated", this);
167 }
168 break;
169 }
170
171 int terminated = 0b0101;
172 if ((current & terminated) == terminated)
173 {
174
175 LOG.debug("{} terminated", this);
176 conversation.complete();
177 return new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
178 }
179
180 return null;
181 }
182
183 public boolean abort(Throwable cause)
184 {
185 if (destination.remove(this))
186 {
187 destination.abort(this, cause);
188 LOG.debug("Aborted while queued {}: {}", this, cause);
189 return true;
190 }
191 else
192 {
193 HttpChannel channel = this.channel.get();
194
195 if (channel == null)
196 return false;
197
198 boolean aborted = channel.abort(cause);
199 LOG.debug("Aborted while active ({}) {}: {}", aborted, this, cause);
200 return aborted;
201 }
202 }
203
204 public void resetResponse(boolean success)
205 {
206 responseComplete.set(false);
207 int responseSuccess = 0b1100;
208 int responseFailure = 0b0100;
209 int code = success ? responseSuccess : responseFailure;
210 complete.addAndGet(-code);
211 }
212
213 public void proceed(boolean proceed)
214 {
215 HttpChannel channel = this.channel.get();
216 if (channel != null)
217 channel.proceed(this, proceed);
218 }
219
220 private String toString(int code)
221 {
222 String padding = "0000";
223 String status = Integer.toBinaryString(code);
224 return String.format("%s@%x status=%s%s",
225 HttpExchange.class.getSimpleName(),
226 hashCode(),
227 padding.substring(status.length()),
228 status);
229 }
230
231 @Override
232 public String toString()
233 {
234 return toString(complete.get());
235 }
236 }