View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.util.List;
22  import java.util.concurrent.atomic.AtomicBoolean;
23  import java.util.concurrent.atomic.AtomicInteger;
24  import java.util.concurrent.atomic.AtomicReference;
25  
26  import org.eclipse.jetty.client.api.Request;
27  import org.eclipse.jetty.client.api.Response;
28  import org.eclipse.jetty.client.api.Result;
29  import org.eclipse.jetty.util.log.Log;
30  import org.eclipse.jetty.util.log.Logger;
31  
32  public class HttpExchange
33  {
34      private static final Logger LOG = Log.getLogger(HttpExchange.class);
35  
36      private final AtomicBoolean requestComplete = new AtomicBoolean();
37      private final AtomicBoolean responseComplete = new AtomicBoolean();
38      private final AtomicInteger complete = new AtomicInteger();
39      private final AtomicReference<HttpChannel> channel = new AtomicReference<>();
40      private final HttpDestination destination;
41      private final HttpRequest request;
42      private final List<Response.ResponseListener> listeners;
43      private final HttpResponse response;
44      private volatile Throwable requestFailure;
45      private volatile Throwable responseFailure;
46  
47      public HttpExchange(HttpDestination destination, HttpRequest request, List<Response.ResponseListener> listeners)
48      {
49          this.destination = destination;
50          this.request = request;
51          this.listeners = listeners;
52          this.response = new HttpResponse(request, listeners);
53          HttpConversation conversation = request.getConversation();
54          conversation.getExchanges().offer(this);
55          conversation.updateResponseListeners(null);
56      }
57  
58      public HttpConversation getConversation()
59      {
60          return request.getConversation();
61      }
62  
63      public Request getRequest()
64      {
65          return request;
66      }
67  
68      public Throwable getRequestFailure()
69      {
70          return requestFailure;
71      }
72  
73      public List<Response.ResponseListener> getResponseListeners()
74      {
75          return listeners;
76      }
77  
78      public HttpResponse getResponse()
79      {
80          return response;
81      }
82  
83      public Throwable getResponseFailure()
84      {
85          return responseFailure;
86      }
87  
88      public void associate(HttpChannel channel)
89      {
90          if (!this.channel.compareAndSet(null, channel))
91              throw new IllegalStateException();
92      }
93  
94      public void disassociate(HttpChannel channel)
95      {
96          if (!this.channel.compareAndSet(channel, null))
97              throw new IllegalStateException();
98      }
99  
100     public boolean requestComplete()
101     {
102         return requestComplete.compareAndSet(false, true);
103     }
104 
105     public boolean responseComplete()
106     {
107         return responseComplete.compareAndSet(false, true);
108     }
109 
110     public Result terminateRequest(Throwable failure)
111     {
112         int requestSuccess = 0b0011;
113         int requestFailure = 0b0001;
114         return terminate(failure == null ? requestSuccess : requestFailure, failure);
115     }
116 
117     public Result terminateResponse(Throwable failure)
118     {
119         if (failure == null)
120         {
121             int responseSuccess = 0b1100;
122             return terminate(responseSuccess, null);
123         }
124         else
125         {
126             proceed(failure);
127             int responseFailure = 0b0100;
128             return terminate(responseFailure, failure);
129         }
130     }
131 
132     /**
133      * This method needs to atomically compute whether this exchange is completed,
134      * that is both request and responses are completed (either with a success or
135      * a failure).
136      *
137      * Furthermore, this method needs to atomically compute whether the exchange
138      * has completed successfully (both request and response are successful) or not.
139      *
140      * To do this, we use 2 bits for the request (one to indicate completion, one
141      * to indicate success), and similarly for the response.
142      * By using {@link AtomicInteger} to atomically sum these codes we can know
143      * whether the exchange is completed and whether is successful.
144      *
145      * @return the {@link Result} - if any - associated with the status
146      */
147     private Result terminate(int code, Throwable failure)
148     {
149         int current = update(code, failure);
150         int terminated = 0b0101;
151         if ((current & terminated) == terminated)
152         {
153             // Request and response terminated
154             if (LOG.isDebugEnabled())
155                 LOG.debug("{} terminated", this);
156             return new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
157         }
158         return null;
159     }
160 
161     private int update(int code, Throwable failure)
162     {
163         int current;
164         while (true)
165         {
166             current = complete.get();
167             boolean updateable = (current & code) == 0;
168             if (updateable)
169             {
170                 int candidate = current | code;
171                 if (!complete.compareAndSet(current, candidate))
172                     continue;
173                 current = candidate;
174                 if ((code & 0b01) == 0b01)
175                     requestFailure = failure;
176                 if ((code & 0b0100) == 0b0100)
177                     responseFailure = failure;
178                 if (LOG.isDebugEnabled())
179                     LOG.debug("{} updated", this);
180             }
181             break;
182         }
183         return current;
184     }
185 
186     public boolean abort(Throwable cause)
187     {
188         if (destination.remove(this))
189         {
190             if (LOG.isDebugEnabled())
191                 LOG.debug("Aborting while queued {}: {}", this, cause);
192             return fail(cause);
193         }
194         else
195         {
196             HttpChannel channel = this.channel.get();
197             if (channel == null)
198                 return fail(cause);
199 
200             boolean aborted = channel.abort(cause);
201             if (LOG.isDebugEnabled())
202                 LOG.debug("Aborted while active ({}) {}: {}", aborted, this, cause);
203             return aborted;
204         }
205     }
206 
207     private boolean fail(Throwable cause)
208     {
209         if (update(0b0101, cause) == 0b0101)
210         {
211             if (LOG.isDebugEnabled())
212                 LOG.debug("Failing {}: {}", this, cause);
213             destination.getRequestNotifier().notifyFailure(request, cause);
214             List<Response.ResponseListener> listeners = getConversation().getResponseListeners();
215             ResponseNotifier responseNotifier = destination.getResponseNotifier();
216             responseNotifier.notifyFailure(listeners, response, cause);
217             responseNotifier.notifyComplete(listeners, new Result(request, cause, response, cause));
218             return true;
219         }
220         else
221         {
222             return false;
223         }
224     }
225 
226     public void resetResponse(boolean success)
227     {
228         responseComplete.set(false);
229         int responseSuccess = 0b1100;
230         int responseFailure = 0b0100;
231         int code = success ? responseSuccess : responseFailure;
232         complete.addAndGet(-code);
233     }
234 
235     public void proceed(Throwable failure)
236     {
237         HttpChannel channel = this.channel.get();
238         if (channel != null)
239             channel.proceed(this, failure);
240     }
241 
242     private String toString(int code)
243     {
244         String padding = "0000";
245         String status = Integer.toBinaryString(code);
246         return String.format("%s@%x status=%s%s",
247                 HttpExchange.class.getSimpleName(),
248                 hashCode(),
249                 padding.substring(status.length()),
250                 status);
251     }
252 
253     @Override
254     public String toString()
255     {
256         return toString(complete.get());
257     }
258 }