View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.http;
20  
21  import java.nio.channels.AsynchronousCloseException;
22  import java.util.concurrent.TimeoutException;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  import java.util.concurrent.atomic.AtomicInteger;
25  
26  import org.eclipse.jetty.client.HttpConnection;
27  import org.eclipse.jetty.client.HttpDestination;
28  import org.eclipse.jetty.client.HttpExchange;
29  import org.eclipse.jetty.client.api.Connection;
30  import org.eclipse.jetty.client.api.Request;
31  import org.eclipse.jetty.client.api.Response;
32  import org.eclipse.jetty.io.AbstractConnection;
33  import org.eclipse.jetty.io.EndPoint;
34  import org.eclipse.jetty.util.Promise;
35  import org.eclipse.jetty.util.log.Log;
36  import org.eclipse.jetty.util.log.Logger;
37  import org.eclipse.jetty.util.thread.Sweeper;
38  
39  public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, Sweeper.Sweepable
40  {
41      private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class);
42  
43      private final AtomicBoolean closed = new AtomicBoolean();
44      private final Promise<Connection> promise;
45      private final AtomicInteger sweeps = new AtomicInteger();
46      private final Delegate delegate;
47      private final HttpChannelOverHTTP channel;
48      private long idleTimeout;
49  
50      public HttpConnectionOverHTTP(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
51      {
52          super(endPoint, destination.getHttpClient().getExecutor());
53          this.promise = promise;
54          this.delegate = new Delegate(destination);
55          this.channel = newHttpChannel();
56      }
57  
58      protected HttpChannelOverHTTP newHttpChannel()
59      {
60          return new HttpChannelOverHTTP(this);
61      }
62  
63      public HttpChannelOverHTTP getHttpChannel()
64      {
65          return channel;
66      }
67  
68      public HttpDestinationOverHTTP getHttpDestination()
69      {
70          return (HttpDestinationOverHTTP)delegate.getHttpDestination();
71      }
72  
73      @Override
74      public void send(Request request, Response.CompleteListener listener)
75      {
76          delegate.send(request, listener);
77      }
78  
79      protected void send(HttpExchange exchange)
80      {
81          delegate.send(exchange);
82      }
83  
84      @Override
85      public void onOpen()
86      {
87          super.onOpen();
88          fillInterested();
89          promise.succeeded(this);
90      }
91  
92      public boolean isClosed()
93      {
94          return closed.get();
95      }
96  
97      @Override
98      protected boolean onReadTimeout()
99      {
100         if (LOG.isDebugEnabled())
101             LOG.debug("Idle timeout {}", this);
102         close(new TimeoutException());
103         return false;
104     }
105 
106     @Override
107     public void onFillable()
108     {
109         HttpExchange exchange = channel.getHttpExchange();
110         if (exchange != null)
111         {
112             channel.receive();
113         }
114         else
115         {
116             // If there is no exchange, then could be either a remote close,
117             // or garbage bytes; in both cases we close the connection
118             close();
119         }
120     }
121 
122     public void release()
123     {
124         // Restore idle timeout
125         getEndPoint().setIdleTimeout(idleTimeout);
126         getHttpDestination().release(this);
127     }
128 
129     @Override
130     public void close()
131     {
132         close(new AsynchronousCloseException());
133     }
134 
135     protected void close(Throwable failure)
136     {
137         if (softClose())
138         {
139             // First close then abort, to be sure that the connection cannot be reused
140             // from an onFailure() handler or by blocking code waiting for completion.
141             getHttpDestination().close(this);
142             getEndPoint().shutdownOutput();
143             if (LOG.isDebugEnabled())
144                 LOG.debug("Shutdown {}", this);
145             getEndPoint().close();
146             if (LOG.isDebugEnabled())
147                 LOG.debug("Closed {}", this);
148 
149             abort(failure);
150         }
151     }
152 
153     public boolean softClose()
154     {
155         return closed.compareAndSet(false, true);
156     }
157 
158     protected boolean abort(Throwable failure)
159     {
160         HttpExchange exchange = channel.getHttpExchange();
161         return exchange != null && exchange.getRequest().abort(failure);
162     }
163 
164     @Override
165     public boolean sweep()
166     {
167         if (!closed.get())
168             return false;
169         if (sweeps.incrementAndGet() < 4)
170             return false;
171         return true;
172     }
173 
174     @Override
175     public String toString()
176     {
177         return String.format("%s@%h(l:%s <-> r:%s,closed=%b)[%s]",
178                 getClass().getSimpleName(),
179                 this,
180                 getEndPoint().getLocalAddress(),
181                 getEndPoint().getRemoteAddress(),
182                 closed.get(),
183                 channel);
184     }
185 
186     private class Delegate extends HttpConnection
187     {
188         private Delegate(HttpDestination destination)
189         {
190             super(destination);
191         }
192 
193         @Override
194         protected void send(HttpExchange exchange)
195         {
196             Request request = exchange.getRequest();
197             normalizeRequest(request);
198 
199             // Save the old idle timeout to restore it
200             EndPoint endPoint = getEndPoint();
201             idleTimeout = endPoint.getIdleTimeout();
202             endPoint.setIdleTimeout(request.getIdleTimeout());
203 
204             // One channel per connection, just delegate the send
205             if (channel.associate(exchange))
206                 channel.send();
207             else
208                 channel.release();
209         }
210 
211         @Override
212         public void close()
213         {
214             HttpConnectionOverHTTP.this.close();
215         }
216 
217         @Override
218         public String toString()
219         {
220             return HttpConnectionOverHTTP.this.toString();
221         }
222     }
223 }