View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.http;
20  
21  import java.nio.channels.AsynchronousCloseException;
22  import java.util.concurrent.TimeoutException;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  import java.util.concurrent.atomic.AtomicInteger;
25  
26  import org.eclipse.jetty.client.HttpConnection;
27  import org.eclipse.jetty.client.HttpDestination;
28  import org.eclipse.jetty.client.HttpExchange;
29  import org.eclipse.jetty.client.SendFailure;
30  import org.eclipse.jetty.client.api.Connection;
31  import org.eclipse.jetty.client.api.Request;
32  import org.eclipse.jetty.client.api.Response;
33  import org.eclipse.jetty.io.AbstractConnection;
34  import org.eclipse.jetty.io.EndPoint;
35  import org.eclipse.jetty.util.Promise;
36  import org.eclipse.jetty.util.log.Log;
37  import org.eclipse.jetty.util.log.Logger;
38  import org.eclipse.jetty.util.thread.Sweeper;
39  
40  public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, Sweeper.Sweepable
41  {
42      private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class);
43  
44      private final AtomicBoolean closed = new AtomicBoolean();
45      private final AtomicInteger sweeps = new AtomicInteger();
46      private final Promise<Connection> promise;
47      private final Delegate delegate;
48      private final HttpChannelOverHTTP channel;
49      private long idleTimeout;
50  
51      public HttpConnectionOverHTTP(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
52      {
53          super(endPoint, destination.getHttpClient().getExecutor());
54          this.promise = promise;
55          this.delegate = new Delegate(destination);
56          this.channel = newHttpChannel();
57      }
58  
59      protected HttpChannelOverHTTP newHttpChannel()
60      {
61          return new HttpChannelOverHTTP(this);
62      }
63  
64      public HttpChannelOverHTTP getHttpChannel()
65      {
66          return channel;
67      }
68  
69      public HttpDestinationOverHTTP getHttpDestination()
70      {
71          return (HttpDestinationOverHTTP)delegate.getHttpDestination();
72      }
73  
74      @Override
75      public void send(Request request, Response.CompleteListener listener)
76      {
77          delegate.send(request, listener);
78      }
79  
80      protected SendFailure send(HttpExchange exchange)
81      {
82          return delegate.send(exchange);
83      }
84  
85      @Override
86      public void onOpen()
87      {
88          super.onOpen();
89          fillInterested();
90          promise.succeeded(this);
91      }
92  
93      public boolean isClosed()
94      {
95          return closed.get();
96      }
97  
98      @Override
99      public boolean onIdleExpired()
100     {
101         boolean close = delegate.onIdleTimeout();
102         if (close)
103             close(new TimeoutException("Idle timeout " + getEndPoint().getIdleTimeout() + "ms"));
104         return false;
105     }
106 
107     @Override
108     public void onFillable()
109     {
110         HttpExchange exchange = channel.getHttpExchange();
111         if (exchange != null)
112         {
113             channel.receive();
114         }
115         else
116         {
117             // If there is no exchange, then could be either a remote close,
118             // or garbage bytes; in both cases we close the connection
119             close();
120         }
121     }
122 
123     public void release()
124     {
125         // Restore idle timeout
126         getEndPoint().setIdleTimeout(idleTimeout);
127         getHttpDestination().release(this);
128     }
129 
130     @Override
131     public void close()
132     {
133         close(new AsynchronousCloseException());
134     }
135 
136     protected void close(Throwable failure)
137     {
138         if (closed.compareAndSet(false, true))
139         {
140             // First close then abort, to be sure that the connection cannot be reused
141             // from an onFailure() handler or by blocking code waiting for completion.
142             getHttpDestination().close(this);
143             getEndPoint().shutdownOutput();
144             if (LOG.isDebugEnabled())
145                 LOG.debug("Shutdown {}", this);
146             getEndPoint().close();
147             if (LOG.isDebugEnabled())
148                 LOG.debug("Closed {}", this);
149 
150             abort(failure);
151         }
152     }
153 
154     protected boolean abort(Throwable failure)
155     {
156         HttpExchange exchange = channel.getHttpExchange();
157         return exchange != null && exchange.getRequest().abort(failure);
158     }
159 
160     @Override
161     public boolean sweep()
162     {
163         if (!closed.get())
164             return false;
165         if (sweeps.incrementAndGet() < 4)
166             return false;
167         return true;
168     }
169 
170     @Override
171     public String toString()
172     {
173         return String.format("%s@%h(l:%s <-> r:%s,closed=%b)[%s]",
174                 getClass().getSimpleName(),
175                 this,
176                 getEndPoint().getLocalAddress(),
177                 getEndPoint().getRemoteAddress(),
178                 closed.get(),
179                 channel);
180     }
181 
182     private class Delegate extends HttpConnection
183     {
184         private Delegate(HttpDestination destination)
185         {
186             super(destination);
187         }
188 
189         @Override
190         protected SendFailure send(HttpExchange exchange)
191         {
192             Request request = exchange.getRequest();
193             normalizeRequest(request);
194 
195             // Save the old idle timeout to restore it.
196             EndPoint endPoint = getEndPoint();
197             idleTimeout = endPoint.getIdleTimeout();
198             endPoint.setIdleTimeout(request.getIdleTimeout());
199 
200             // One channel per connection, just delegate the send.
201             return send(channel, exchange);
202         }
203 
204         @Override
205         public void close()
206         {
207             HttpConnectionOverHTTP.this.close();
208         }
209 
210         @Override
211         public String toString()
212         {
213             return HttpConnectionOverHTTP.this.toString();
214         }
215     }
216 }