View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.util.List;
22  import java.util.concurrent.CountDownLatch;
23  import java.util.concurrent.atomic.AtomicInteger;
24  import java.util.concurrent.atomic.AtomicMarkableReference;
25  
26  import org.eclipse.jetty.client.api.Request;
27  import org.eclipse.jetty.client.api.Response;
28  import org.eclipse.jetty.client.api.Result;
29  import org.eclipse.jetty.util.log.Log;
30  import org.eclipse.jetty.util.log.Logger;
31  
32  public class HttpExchange
33  {
34      private static final Logger LOG = Log.getLogger(HttpExchange.class);
35  
36      private final AtomicInteger complete = new AtomicInteger();
37      private final CountDownLatch terminate = new CountDownLatch(2);
38      private final HttpConversation conversation;
39      private final HttpDestination destination;
40      private final Request request;
41      private final List<Response.ResponseListener> listeners;
42      private final HttpResponse response;
43      private volatile HttpConnection connection;
44      private volatile Throwable requestFailure;
45      private volatile Throwable responseFailure;
46  
47      public HttpExchange(HttpConversation conversation, HttpDestination destination, Request request, List<Response.ResponseListener> listeners)
48      {
49          this.conversation = conversation;
50          this.destination = destination;
51          this.request = request;
52          this.listeners = listeners;
53          this.response = new HttpResponse(request, listeners);
54          conversation.getExchanges().offer(this);
55          conversation.updateResponseListeners(null);
56      }
57  
58      public HttpConversation getConversation()
59      {
60          return conversation;
61      }
62  
63      public Request getRequest()
64      {
65          return request;
66      }
67  
68      public Throwable getRequestFailure()
69      {
70          return requestFailure;
71      }
72  
73      public List<Response.ResponseListener> getResponseListeners()
74      {
75          return listeners;
76      }
77  
78      public HttpResponse getResponse()
79      {
80          return response;
81      }
82  
83      public Throwable getResponseFailure()
84      {
85          return responseFailure;
86      }
87  
88      public void setConnection(HttpConnection connection)
89      {
90          this.connection = connection;
91      }
92  
93      public AtomicMarkableReference<Result> requestComplete(Throwable failure)
94      {
95          int requestSuccess = 0b0011;
96          int requestFailure = 0b0001;
97          return complete(failure == null ? requestSuccess : requestFailure, failure);
98      }
99  
100     public AtomicMarkableReference<Result> responseComplete(Throwable failure)
101     {
102         if (failure == null)
103         {
104             int responseSuccess = 0b1100;
105             return complete(responseSuccess, failure);
106         }
107         else
108         {
109             proceed(false);
110             int responseFailure = 0b0100;
111             return complete(responseFailure, failure);
112         }
113     }
114 
115     /**
116      * This method needs to atomically compute whether this exchange is completed,
117      * that is both request and responses are completed (either with a success or
118      * a failure).
119      *
120      * Furthermore, this method needs to atomically compute whether the exchange
121      * has completed successfully (both request and response are successful) or not.
122      *
123      * To do this, we use 2 bits for the request (one to indicate completion, one
124      * to indicate success), and similarly for the response.
125      * By using {@link AtomicInteger} to atomically sum these codes we can know
126      * whether the exchange is completed and whether is successful.
127      *
128      * @param code the bits representing the status code for either the request or the response
129      * @param failure the failure - if any - associated with the status code for either the request or the response
130      * @return an AtomicMarkableReference holding whether the operation modified the
131      * completion status and the {@link Result} - if any - associated with the status
132      */
133     private AtomicMarkableReference<Result> complete(int code, Throwable failure)
134     {
135         Result result = null;
136         boolean modified = false;
137 
138         int current;
139         while (true)
140         {
141             current = complete.get();
142             boolean updateable = (current & code) == 0;
143             if (updateable)
144             {
145                 int candidate = current | code;
146                 if (!complete.compareAndSet(current, candidate))
147                     continue;
148                 current = candidate;
149                 modified = true;
150                 if ((code & 0b01) == 0b01)
151                     requestFailure = failure;
152                 else
153                     responseFailure = failure;
154                 LOG.debug("{} updated", this);
155             }
156             break;
157         }
158 
159         int completed = 0b0101;
160         if ((current & completed) == completed)
161         {
162             if (modified)
163             {
164                 // Request and response completed
165                 LOG.debug("{} complete", this);
166                 conversation.complete();
167             }
168             result = new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
169         }
170 
171         return new AtomicMarkableReference<>(result, modified);
172     }
173 
174     public boolean abort(Throwable cause)
175     {
176         if (destination.remove(this))
177         {
178             destination.abort(this, cause);
179             LOG.debug("Aborted while queued {}: {}", this, cause);
180             return true;
181         }
182         else
183         {
184             HttpConnection connection = this.connection;
185             // If there is no connection, this exchange is already completed
186             if (connection == null)
187                 return false;
188 
189             boolean aborted = connection.abort(cause);
190             LOG.debug("Aborted while active ({}) {}: {}", aborted, this, cause);
191             return aborted;
192         }
193     }
194 
195     public void resetResponse(boolean success)
196     {
197         int responseSuccess = 0b1100;
198         int responseFailure = 0b0100;
199         int code = success ? responseSuccess : responseFailure;
200         complete.addAndGet(-code);
201     }
202 
203     public void proceed(boolean proceed)
204     {
205         HttpConnection connection = this.connection;
206         if (connection != null)
207             connection.proceed(proceed);
208     }
209 
210     public void terminateRequest()
211     {
212         terminate.countDown();
213     }
214 
215     public void terminateResponse()
216     {
217         terminate.countDown();
218     }
219 
220     public void awaitTermination()
221     {
222         try
223         {
224             terminate.await();
225         }
226         catch (InterruptedException x)
227         {
228             LOG.ignore(x);
229         }
230     }
231 
232     @Override
233     public String toString()
234     {
235         String padding = "0000";
236         String status = Integer.toBinaryString(complete.get());
237         return String.format("%s@%x status=%s%s",
238                 HttpExchange.class.getSimpleName(),
239                 hashCode(),
240                 padding.substring(status.length()),
241                 status);
242     }
243 }