View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.util.List;
22  
23  import org.eclipse.jetty.client.api.Response;
24  import org.eclipse.jetty.client.api.Result;
25  import org.eclipse.jetty.util.log.Log;
26  import org.eclipse.jetty.util.log.Logger;
27  
28  public class HttpExchange
29  {
30      private static final Logger LOG = Log.getLogger(HttpExchange.class);
31  
32      private final HttpDestination destination;
33      private final HttpRequest request;
34      private final List<Response.ResponseListener> listeners;
35      private final HttpResponse response;
36      private State requestState = State.PENDING;
37      private State responseState = State.PENDING;
38      private HttpChannel _channel;
39      private Throwable requestFailure;
40      private Throwable responseFailure;
41  
42      public HttpExchange(HttpDestination destination, HttpRequest request, List<Response.ResponseListener> listeners)
43      {
44          this.destination = destination;
45          this.request = request;
46          this.listeners = listeners;
47          this.response = new HttpResponse(request, listeners);
48          HttpConversation conversation = request.getConversation();
49          conversation.getExchanges().offer(this);
50          conversation.updateResponseListeners(null);
51      }
52  
53      public HttpConversation getConversation()
54      {
55          return request.getConversation();
56      }
57  
58      public HttpRequest getRequest()
59      {
60          return request;
61      }
62  
63      public Throwable getRequestFailure()
64      {
65          synchronized (this)
66          {
67              return requestFailure;
68          }
69      }
70  
71      public List<Response.ResponseListener> getResponseListeners()
72      {
73          return listeners;
74      }
75  
76      public HttpResponse getResponse()
77      {
78          return response;
79      }
80  
81      public Throwable getResponseFailure()
82      {
83          synchronized (this)
84          {
85              return responseFailure;
86          }
87      }
88  
89      /**
90       * <p>Associates the given {@code channel} to this exchange.</p>
91       * <p>Works in strict collaboration with {@link HttpChannel#associate(HttpExchange)}.</p>
92       *
93       * @param channel the channel to associate to this exchange
94       * @return true if the channel could be associated, false otherwise
95       */
96      boolean associate(HttpChannel channel)
97      {
98          boolean result = false;
99          boolean abort = false;
100         synchronized (this)
101         {
102             // Only associate if the exchange state is initial,
103             // as the exchange could be already failed.
104             if (requestState == State.PENDING && responseState == State.PENDING)
105             {
106                 abort = _channel != null;
107                 if (!abort)
108                 {
109                     _channel = channel;
110                     result = true;
111                 }
112             }
113         }
114 
115         if (abort)
116             request.abort(new IllegalStateException(toString()));
117 
118         return result;
119     }
120 
121     void disassociate(HttpChannel channel)
122     {
123         boolean abort = false;
124         synchronized (this)
125         {
126             if (_channel != channel || requestState != State.TERMINATED || responseState != State.TERMINATED)
127                 abort = true;
128             _channel = null;
129         }
130 
131         if (abort)
132             request.abort(new IllegalStateException(toString()));
133     }
134 
135     private HttpChannel getHttpChannel()
136     {
137         synchronized (this)
138         {
139             return _channel;
140         }
141     }
142 
143     public boolean requestComplete(Throwable failure)
144     {
145         synchronized (this)
146         {
147             return completeRequest(failure);
148         }
149     }
150 
151     private boolean completeRequest(Throwable failure)
152     {
153         if (requestState == State.PENDING)
154         {
155             requestState = State.COMPLETED;
156             requestFailure = failure;
157             return true;
158         }
159         return false;
160     }
161 
162     public boolean responseComplete(Throwable failure)
163     {
164         synchronized (this)
165         {
166             return completeResponse(failure);
167         }
168     }
169 
170     private boolean completeResponse(Throwable failure)
171     {
172         if (responseState == State.PENDING)
173         {
174             responseState = State.COMPLETED;
175             responseFailure = failure;
176             return true;
177         }
178         return false;
179     }
180 
181     public Result terminateRequest()
182     {
183         Result result = null;
184         synchronized (this)
185         {
186             if (requestState == State.COMPLETED)
187                 requestState = State.TERMINATED;
188             if (requestState == State.TERMINATED && responseState == State.TERMINATED)
189                 result = new Result(getRequest(), requestFailure, getResponse(), responseFailure);
190         }
191 
192         if (LOG.isDebugEnabled())
193             LOG.debug("Terminated request for {}, result: {}", this, result);
194 
195         return result;
196     }
197 
198     public Result terminateResponse()
199     {
200         Result result = null;
201         synchronized (this)
202         {
203             if (responseState == State.COMPLETED)
204                 responseState = State.TERMINATED;
205             if (requestState == State.TERMINATED && responseState == State.TERMINATED)
206                 result = new Result(getRequest(), requestFailure, getResponse(), responseFailure);
207         }
208 
209         if (LOG.isDebugEnabled())
210             LOG.debug("Terminated response for {}, result: {}", this, result);
211 
212         return result;
213     }
214 
215     public boolean abort(Throwable failure)
216     {
217         // Atomically change the state of this exchange to be completed.
218         // This will avoid that this exchange can be associated to a channel.
219         boolean abortRequest;
220         boolean abortResponse;
221         synchronized (this)
222         {
223             abortRequest = completeRequest(failure);
224             abortResponse = completeResponse(failure);
225         }
226 
227         if (LOG.isDebugEnabled())
228             LOG.debug("Failed {}: req={}/rsp={} {}", this, abortRequest, abortResponse, failure);
229 
230         if (!abortRequest && !abortResponse)
231             return false;
232 
233         // We failed this exchange, deal with it.
234 
235         // Case #1: exchange was in the destination queue.
236         if (destination.remove(this))
237         {
238             if (LOG.isDebugEnabled())
239                 LOG.debug("Aborting while queued {}: {}", this, failure);
240             notifyFailureComplete(failure);
241             return true;
242         }
243 
244         HttpChannel channel = getHttpChannel();
245         if (channel == null)
246         {
247             // Case #2: exchange was not yet associated.
248             // Because this exchange is failed, when associate() is called
249             // it will return false, and the caller will dispose the channel.
250             if (LOG.isDebugEnabled())
251                 LOG.debug("Aborted before association {}: {}", this, failure);
252             notifyFailureComplete(failure);
253             return true;
254         }
255 
256         // Case #3: exchange was already associated.
257         boolean aborted = channel.abort(this, abortRequest ? failure : null, abortResponse ? failure : null);
258         if (LOG.isDebugEnabled())
259             LOG.debug("Aborted ({}) while active {}: {}", aborted, this, failure);
260         return aborted;
261     }
262 
263     private void notifyFailureComplete(Throwable failure)
264     {
265         destination.getRequestNotifier().notifyFailure(request, failure);
266         List<Response.ResponseListener> listeners = getConversation().getResponseListeners();
267         ResponseNotifier responseNotifier = destination.getResponseNotifier();
268         responseNotifier.notifyFailure(listeners, response, failure);
269         responseNotifier.notifyComplete(listeners, new Result(request, failure, response, failure));
270     }
271 
272     public void resetResponse()
273     {
274         synchronized (this)
275         {
276             responseState = State.PENDING;
277             responseFailure = null;
278         }
279     }
280 
281     public void proceed(Throwable failure)
282     {
283         HttpChannel channel = getHttpChannel();
284         if (channel != null)
285             channel.proceed(this, failure);
286     }
287 
288     @Override
289     public String toString()
290     {
291         synchronized (this)
292         {
293             return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h",
294                     HttpExchange.class.getSimpleName(),
295                     hashCode(),
296                     requestState, requestFailure, requestFailure,
297                     responseState, responseFailure, responseFailure);
298         }
299     }
300 
301     private enum State
302     {
303         PENDING, COMPLETED, TERMINATED
304     }
305 }