View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.util.List;
22  import java.util.concurrent.atomic.AtomicBoolean;
23  import java.util.concurrent.atomic.AtomicInteger;
24  import java.util.concurrent.atomic.AtomicReference;
25  
26  import org.eclipse.jetty.client.api.Request;
27  import org.eclipse.jetty.client.api.Response;
28  import org.eclipse.jetty.client.api.Result;
29  import org.eclipse.jetty.util.log.Log;
30  import org.eclipse.jetty.util.log.Logger;
31  
32  public class HttpExchange
33  {
34      private static final Logger LOG = Log.getLogger(HttpExchange.class);
35  
36      private final AtomicBoolean requestComplete = new AtomicBoolean();
37      private final AtomicBoolean responseComplete = new AtomicBoolean();
38      private final AtomicInteger complete = new AtomicInteger();
39      private final AtomicReference<HttpChannel> channel = new AtomicReference<>();
40      private final HttpConversation conversation;
41      private final HttpDestination destination;
42      private final Request request;
43      private final List<Response.ResponseListener> listeners;
44      private final HttpResponse response;
45      private volatile Throwable requestFailure;
46      private volatile Throwable responseFailure;
47    
48  
49      public HttpExchange(HttpConversation conversation, HttpDestination destination, Request request, List<Response.ResponseListener> listeners)
50      {
51          this.conversation = conversation;
52          this.destination = destination;
53          this.request = request;
54          this.listeners = listeners;
55          this.response = new HttpResponse(request, listeners);
56          conversation.getExchanges().offer(this);
57          conversation.updateResponseListeners(null);
58      }
59  
60      public HttpConversation getConversation()
61      {
62          return conversation;
63      }
64  
65      public Request getRequest()
66      {
67          return request;
68      }
69  
70      public Throwable getRequestFailure()
71      {
72          return requestFailure;
73      }
74  
75      public List<Response.ResponseListener> getResponseListeners()
76      {
77          return listeners;
78      }
79  
80      public HttpResponse getResponse()
81      {
82          return response;
83      }
84  
85      public Throwable getResponseFailure()
86      {
87          return responseFailure;
88      }
89  
90      public void associate(HttpChannel channel)
91      {
92          if (!this.channel.compareAndSet(null, channel))
93              throw new IllegalStateException();
94      }
95  
96      public void disassociate(HttpChannel channel)
97      {
98          if (!this.channel.compareAndSet(channel, null))
99              throw new IllegalStateException();
100     }
101 
102     public boolean requestComplete()
103     {
104         return requestComplete.compareAndSet(false, true);
105     }
106 
107     public boolean responseComplete()
108     {
109         return responseComplete.compareAndSet(false, true);
110     }
111 
112     public Result terminateRequest(Throwable failure)
113     {
114         int requestSuccess = 0b0011;
115         int requestFailure = 0b0001;
116         return terminate(failure == null ? requestSuccess : requestFailure, failure);
117     }
118 
119     public Result terminateResponse(Throwable failure)
120     {
121         if (failure == null)
122         {
123             int responseSuccess = 0b1100;
124             return terminate(responseSuccess, failure);
125         }
126         else
127         {
128             proceed(false);
129             int responseFailure = 0b0100;
130             return terminate(responseFailure, failure);
131         }
132     }
133 
134     /**
135      * This method needs to atomically compute whether this exchange is completed,
136      * that is both request and responses are completed (either with a success or
137      * a failure).
138      *
139      * Furthermore, this method needs to atomically compute whether the exchange
140      * has completed successfully (both request and response are successful) or not.
141      *
142      * To do this, we use 2 bits for the request (one to indicate completion, one
143      * to indicate success), and similarly for the response.
144      * By using {@link AtomicInteger} to atomically sum these codes we can know
145      * whether the exchange is completed and whether is successful.
146      *
147      * @return the {@link Result} - if any - associated with the status
148      */
149     private Result terminate(int code, Throwable failure)
150     {
151         int current;
152         while (true)
153         {
154             current = complete.get();
155             boolean updateable = (current & code) == 0;
156             if (updateable)
157             {
158                 int candidate = current | code;
159                 if (!complete.compareAndSet(current, candidate))
160                     continue;
161                 current = candidate;
162                 if ((code & 0b01) == 0b01)
163                     requestFailure = failure;
164                 else
165                     responseFailure = failure;
166                 LOG.debug("{} updated", this);
167             }
168             break;
169         }
170 
171         int terminated = 0b0101;
172         if ((current & terminated) == terminated)
173         {
174             // Request and response terminated
175             LOG.debug("{} terminated", this);
176             conversation.complete();
177             return new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
178         }
179 
180         return null;
181     }
182 
183     public boolean abort(Throwable cause)
184     {
185         if (destination.remove(this))
186         {
187             destination.abort(this, cause);
188             LOG.debug("Aborted while queued {}: {}", this, cause);
189             return true;
190         }
191         else
192         {
193             HttpChannel channel = this.channel.get();
194             // If there is no channel, this exchange is already completed
195             if (channel == null)
196                 return false;
197 
198             boolean aborted = channel.abort(cause);
199             LOG.debug("Aborted while active ({}) {}: {}", aborted, this, cause);
200             return aborted;
201         }
202     }
203 
204     public void resetResponse(boolean success)
205     {
206         responseComplete.set(false);
207         int responseSuccess = 0b1100;
208         int responseFailure = 0b0100;
209         int code = success ? responseSuccess : responseFailure;
210         complete.addAndGet(-code);
211     }
212 
213     public void proceed(boolean proceed)
214     {
215         HttpChannel channel = this.channel.get();
216         if (channel != null)
217             channel.proceed(this, proceed);
218     }
219 
220     private String toString(int code)
221     {
222         String padding = "0000";
223         String status = Integer.toBinaryString(code);
224         return String.format("%s@%x status=%s%s",
225                 HttpExchange.class.getSimpleName(),
226                 hashCode(),
227                 padding.substring(status.length()),
228                 status);
229     }
230 
231     @Override
232     public String toString()
233     {
234         return toString(complete.get());
235     }
236 }