View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.util.Collections;
23  
24  import org.eclipse.jetty.client.api.Connection;
25  import org.eclipse.jetty.client.api.Request;
26  import org.eclipse.jetty.util.Callback;
27  import org.eclipse.jetty.util.annotation.ManagedAttribute;
28  import org.eclipse.jetty.util.annotation.ManagedObject;
29  import org.eclipse.jetty.util.component.ContainerLifeCycle;
30  import org.eclipse.jetty.util.thread.Sweeper;
31  
32  @ManagedObject
33  public abstract class PoolingHttpDestination<C extends Connection> extends HttpDestination implements Callback
34  {
35      private final ConnectionPool connectionPool;
36  
37      public PoolingHttpDestination(HttpClient client, Origin origin)
38      {
39          super(client, origin);
40          this.connectionPool = newConnectionPool(client);
41          addBean(connectionPool);
42          Sweeper sweeper = client.getBean(Sweeper.class);
43          if (sweeper != null)
44              sweeper.offer(connectionPool);
45      }
46  
47      protected ConnectionPool newConnectionPool(HttpClient client)
48      {
49          return new ConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
50      }
51  
52      @ManagedAttribute(value = "The connection pool", readonly = true)
53      public ConnectionPool getConnectionPool()
54      {
55          return connectionPool;
56      }
57  
58      @Override
59      public void succeeded()
60      {
61          send();
62      }
63  
64      @Override
65      public void failed(final Throwable x)
66      {
67          abort(x);
68      }
69  
70      public void send()
71      {
72          if (getHttpExchanges().isEmpty())
73              return;
74          process();
75      }
76  
77      @SuppressWarnings("unchecked")
78      public C acquire()
79      {
80          return (C)connectionPool.acquire();
81      }
82  
83      private void process()
84      {
85          C connection = acquire();
86          if (connection != null)
87              process(connection);
88      }
89  
90      /**
91       * <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p>
92       * <p>A new connection is created when a request needs to be executed; it is possible that the request that
93       * triggered the request creation is executed by another connection that was just released, so the new connection
94       * may become idle.</p>
95       * <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p>
96       *
97       * @param connection the new connection
98       */
99      public void process(final C connection)
100     {
101         HttpClient client = getHttpClient();
102         final HttpExchange exchange = getHttpExchanges().poll();
103         if (LOG.isDebugEnabled())
104             LOG.debug("Processing exchange {} on {} of {}", exchange, connection, this);
105         if (exchange == null)
106         {
107             if (!connectionPool.release(connection))
108                 connection.close();
109 
110             if (!client.isRunning())
111             {
112                 if (LOG.isDebugEnabled())
113                     LOG.debug("{} is stopping", client);
114                 connection.close();
115             }
116         }
117         else
118         {
119             final Request request = exchange.getRequest();
120             Throwable cause = request.getAbortCause();
121             if (cause != null)
122             {
123                 if (LOG.isDebugEnabled())
124                     LOG.debug("Aborted before processing {}: {}", exchange, cause);
125                 // It may happen that the request is aborted before the exchange
126                 // is created. Aborting the exchange a second time will result in
127                 // a no-operation, so we just abort here to cover that edge case.
128                 exchange.abort(cause);
129             }
130             else
131             {
132                 send(connection, exchange);
133             }
134         }
135     }
136 
137     protected abstract void send(C connection, HttpExchange exchange);
138 
139     @Override
140     public void release(Connection c)
141     {
142         @SuppressWarnings("unchecked")
143         C connection = (C)c;
144         if (LOG.isDebugEnabled())
145             LOG.debug("Released {}", connection);
146         HttpClient client = getHttpClient();
147         if (client.isRunning())
148         {
149             if (connectionPool.isActive(connection))
150             {
151                 if (connectionPool.release(connection))
152                     send();
153                 else
154                     connection.close();
155             }
156             else
157             {
158                 if (LOG.isDebugEnabled())
159                     LOG.debug("Released explicit {}", connection);
160             }
161         }
162         else
163         {
164             if (LOG.isDebugEnabled())
165                 LOG.debug("{} is stopped", client);
166             connection.close();
167         }
168     }
169 
170     @Override
171     public void close(Connection connection)
172     {
173         super.close(connection);
174 
175         boolean removed = connectionPool.remove(connection);
176 
177         if (getHttpExchanges().isEmpty())
178         {
179             if (getHttpClient().isRemoveIdleDestinations() && connectionPool.isEmpty())
180             {
181                 // There is a race condition between this thread removing the destination
182                 // and another thread queueing a request to this same destination.
183                 // If this destination is removed, but the request queued, a new connection
184                 // will be opened, the exchange will be executed and eventually the connection
185                 // will idle timeout and be closed. Meanwhile a new destination will be created
186                 // in HttpClient and will be used for other requests.
187                 getHttpClient().removeDestination(this);
188             }
189         }
190         else
191         {
192             // We need to execute queued requests even if this connection failed.
193             // We may create a connection that is not needed, but it will eventually
194             // idle timeout, so no worries.
195             if (removed)
196                 process();
197         }
198     }
199 
200     public void close()
201     {
202         super.close();
203         connectionPool.close();
204     }
205 
206     @Override
207     public void dump(Appendable out, String indent) throws IOException
208     {
209         super.dump(out, indent);
210         ContainerLifeCycle.dump(out, indent, Collections.singletonList(connectionPool));
211     }
212 
213     @Override
214     public String toString()
215     {
216         return String.format("%s,pool=%s", super.toString(), connectionPool);
217     }
218 }