View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.util.Arrays;
23  
24  import org.eclipse.jetty.client.api.Connection;
25  import org.eclipse.jetty.client.api.Request;
26  import org.eclipse.jetty.util.Promise;
27  import org.eclipse.jetty.util.component.ContainerLifeCycle;
28  
29  public abstract class PoolingHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
30  {
31      private final ConnectionPool connectionPool;
32  
33      public PoolingHttpDestination(HttpClient client, Origin origin)
34      {
35          super(client, origin);
36          this.connectionPool = newConnectionPool(client);
37      }
38  
39      protected ConnectionPool newConnectionPool(HttpClient client)
40      {
41          return new ConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
42      }
43  
44      public ConnectionPool getConnectionPool()
45      {
46          return connectionPool;
47      }
48  
49      @Override
50      @SuppressWarnings("unchecked")
51      public void succeeded(Connection connection)
52      {
53          process((C)connection, true);
54      }
55  
56      @Override
57      public void failed(final Throwable x)
58      {
59          getHttpClient().getExecutor().execute(new Runnable()
60          {
61              @Override
62              public void run()
63              {
64                  abort(x);
65              }
66          });
67      }
68  
69      protected void send()
70      {
71          C connection = acquire();
72          if (connection != null)
73              process(connection, false);
74      }
75  
76      @SuppressWarnings("unchecked")
77      public C acquire()
78      {
79          return (C)connectionPool.acquire();
80      }
81  
82      /**
83       * <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p>
84       * <p>A new connection is created when a request needs to be executed; it is possible that the request that
85       * triggered the request creation is executed by another connection that was just released, so the new connection
86       * may become idle.</p>
87       * <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p>
88       *
89       * @param connection the new connection
90       * @param dispatch whether to dispatch the processing to another thread
91       */
92      public void process(final C connection, boolean dispatch)
93      {
94          HttpClient client = getHttpClient();
95          final HttpExchange exchange = getHttpExchanges().poll();
96          if (LOG.isDebugEnabled())
97              LOG.debug("Processing exchange {} on {} of {}", exchange, connection, this);
98          if (exchange == null)
99          {
100             if (!connectionPool.release(connection))
101                 connection.close();
102 
103             if (!client.isRunning())
104             {
105                 if (LOG.isDebugEnabled())
106                     LOG.debug("{} is stopping", client);
107                 connection.close();
108             }
109         }
110         else
111         {
112             final Request request = exchange.getRequest();
113             Throwable cause = request.getAbortCause();
114             if (cause != null)
115             {
116                 if (LOG.isDebugEnabled())
117                     LOG.debug("Aborted before processing {}: {}", exchange, cause);
118                 // It may happen that the request is aborted before the exchange
119                 // is created. Aborting the exchange a second time will result in
120                 // a no-operation, so we just abort here to cover that edge case.
121                 exchange.abort(cause);
122             }
123             else
124             {
125                 if (dispatch)
126                 {
127                     client.getExecutor().execute(new Runnable()
128                     {
129                         @Override
130                         public void run()
131                         {
132                             send(connection, exchange);
133                         }
134                     });
135                 }
136                 else
137                 {
138                     send(connection, exchange);
139                 }
140             }
141         }
142     }
143 
144     protected abstract void send(C connection, HttpExchange exchange);
145 
146     @Override
147     public void release(Connection c)
148     {
149         @SuppressWarnings("unchecked")
150         C connection = (C)c;
151         if (LOG.isDebugEnabled())
152             LOG.debug("{} released", connection);
153         HttpClient client = getHttpClient();
154         if (client.isRunning())
155         {
156             if (connectionPool.isActive(connection))
157             {
158                 process(connection, false);
159             }
160             else
161             {
162                 if (LOG.isDebugEnabled())
163                     LOG.debug("{} explicit", connection);
164             }
165         }
166         else
167         {
168             if (LOG.isDebugEnabled())
169                 LOG.debug("{} is stopped", client);
170             close(connection);
171             connection.close();
172         }
173     }
174 
175     @Override
176     public void close(Connection oldConnection)
177     {
178         super.close(oldConnection);
179         connectionPool.remove(oldConnection);
180 
181         if (getHttpExchanges().isEmpty())
182         {
183             if (getHttpClient().isRemoveIdleDestinations() && connectionPool.isEmpty())
184             {
185                 // There is a race condition between this thread removing the destination
186                 // and another thread queueing a request to this same destination.
187                 // If this destination is removed, but the request queued, a new connection
188                 // will be opened, the exchange will be executed and eventually the connection
189                 // will idle timeout and be closed. Meanwhile a new destination will be created
190                 // in HttpClient and will be used for other requests.
191                 getHttpClient().removeDestination(this);
192             }
193         }
194         else
195         {
196             // We need to execute queued requests even if this connection failed.
197             // We may create a connection that is not needed, but it will eventually
198             // idle timeout, so no worries.
199             C newConnection = acquire();
200             if (newConnection != null)
201                 process(newConnection, false);
202         }
203     }
204 
205     public void close()
206     {
207         super.close();
208         connectionPool.close();
209     }
210 
211     @Override
212     public void dump(Appendable out, String indent) throws IOException
213     {
214         ContainerLifeCycle.dump(out, indent, Arrays.asList(connectionPool));
215     }
216 
217     @Override
218     public String toString()
219     {
220         return String.format("%s,pool=%s", super.toString(), connectionPool);
221     }
222 }