View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.util.List;
22  import java.util.concurrent.CountDownLatch;
23  import java.util.concurrent.atomic.AtomicInteger;
24  import java.util.concurrent.atomic.AtomicMarkableReference;
25  
26  import org.eclipse.jetty.client.api.Request;
27  import org.eclipse.jetty.client.api.Response;
28  import org.eclipse.jetty.client.api.Result;
29  import org.eclipse.jetty.util.log.Log;
30  import org.eclipse.jetty.util.log.Logger;
31  
32  public class HttpExchange
33  {
34      private static final Logger LOG = Log.getLogger(HttpExchange.class);
35  
36      private final AtomicInteger complete = new AtomicInteger();
37      private final CountDownLatch terminate = new CountDownLatch(2);
38      private final HttpConversation conversation;
39      private final HttpDestination destination;
40      private final Request request;
41      private final List<Response.ResponseListener> listeners;
42      private final HttpResponse response;
43      private volatile HttpConnection connection;
44      private volatile Throwable requestFailure;
45      private volatile Throwable responseFailure;
46    
47  
48      public HttpExchange(HttpConversation conversation, HttpDestination destination, Request request, List<Response.ResponseListener> listeners)
49      {
50          this.conversation = conversation;
51          this.destination = destination;
52          this.request = request;
53          this.listeners = listeners;
54          this.response = new HttpResponse(request, listeners);
55          conversation.getExchanges().offer(this);
56          conversation.updateResponseListeners(null);
57      }
58  
59      public HttpConversation getConversation()
60      {
61          return conversation;
62      }
63  
64      public Request getRequest()
65      {
66          return request;
67      }
68  
69      public Throwable getRequestFailure()
70      {
71          return requestFailure;
72      }
73  
74      public List<Response.ResponseListener> getResponseListeners()
75      {
76          return listeners;
77      }
78  
79      public HttpResponse getResponse()
80      {
81          return response;
82      }
83  
84      public Throwable getResponseFailure()
85      {
86          return responseFailure;
87      }
88  
89      public void setConnection(HttpConnection connection)
90      {
91          this.connection = connection;
92      }
93  
94      public AtomicMarkableReference<Result> requestComplete(Throwable failure)
95      {
96          int requestSuccess = 0b0011;
97          int requestFailure = 0b0001;
98          return complete(failure == null ? requestSuccess : requestFailure, failure);
99      }
100 
101     public AtomicMarkableReference<Result> responseComplete(Throwable failure)
102     {
103         if (failure == null)
104         {
105             int responseSuccess = 0b1100;
106             return complete(responseSuccess, failure);
107         }
108         else
109         {
110             proceed(false);
111             int responseFailure = 0b0100;
112             return complete(responseFailure, failure);
113         }
114     }
115 
116     /**
117      * This method needs to atomically compute whether this exchange is completed,
118      * that is both request and responses are completed (either with a success or
119      * a failure).
120      *
121      * Furthermore, this method needs to atomically compute whether the exchange
122      * has completed successfully (both request and response are successful) or not.
123      *
124      * To do this, we use 2 bits for the request (one to indicate completion, one
125      * to indicate success), and similarly for the response.
126      * By using {@link AtomicInteger} to atomically sum these codes we can know
127      * whether the exchange is completed and whether is successful.
128      *
129      * @param code the bits representing the status code for either the request or the response
130      * @param failure the failure - if any - associated with the status code for either the request or the response
131      * @return an AtomicMarkableReference holding whether the operation modified the
132      * completion status and the {@link Result} - if any - associated with the status
133      */
134     private AtomicMarkableReference<Result> complete(int code, Throwable failure)
135     {
136         Result result = null;
137         boolean modified = false;
138 
139         int current;
140         while (true)
141         {
142             current = complete.get();
143             boolean updateable = (current & code) == 0;
144             if (updateable)
145             {
146                 int candidate = current | code;
147                 if (!complete.compareAndSet(current, candidate))
148                     continue;
149                 current = candidate;
150                 modified = true;
151                 if ((code & 0b01) == 0b01)
152                     requestFailure = failure;
153                 else
154                     responseFailure = failure;
155                 LOG.debug("{} updated", this);
156             }
157             break;
158         }
159 
160         int completed = 0b0101;
161         if ((current & completed) == completed)
162         {
163             if (modified)
164             {
165                 // Request and response completed
166                 LOG.debug("{} complete", this);
167                 conversation.complete();
168             }
169             result = new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
170         }
171 
172         return new AtomicMarkableReference<>(result, modified);
173     }
174 
175     public boolean abort(Throwable cause)
176     {
177         if (destination.remove(this))
178         {
179             destination.abort(this, cause);
180             LOG.debug("Aborted while queued {}: {}", this, cause);
181             return true;
182         }
183         else
184         {
185             HttpConnection connection = this.connection;
186             // If there is no connection, this exchange is already completed
187             if (connection == null)
188                 return false;
189 
190             boolean aborted = connection.abort(cause);
191             LOG.debug("Aborted while active ({}) {}: {}", aborted, this, cause);
192             return aborted;
193         }
194     }
195 
196     public void resetResponse(boolean success)
197     {
198         int responseSuccess = 0b1100;
199         int responseFailure = 0b0100;
200         int code = success ? responseSuccess : responseFailure;
201         complete.addAndGet(-code);
202     }
203 
204     public void proceed(boolean proceed)
205     {
206         HttpConnection connection = this.connection;
207         if (connection != null)
208             connection.proceed(proceed);
209     }
210 
211     public void terminateRequest()
212     {
213         terminate.countDown();
214     }
215 
216     public void terminateResponse()
217     {
218         terminate.countDown();
219     }
220 
221     public void awaitTermination()
222     {
223         try
224         {
225             terminate.await();
226         }
227         catch (InterruptedException x)
228         {
229             LOG.ignore(x);
230         }
231     }
232 
233     @Override
234     public String toString()
235     {
236         String padding = "0000";
237         String status = Integer.toBinaryString(complete.get());
238         return String.format("%s@%x status=%s%s",
239                 HttpExchange.class.getSimpleName(),
240                 hashCode(),
241                 padding.substring(status.length()),
242                 status);
243     }
244 }