View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.util.Arrays;
23  
24  import org.eclipse.jetty.client.api.Connection;
25  import org.eclipse.jetty.client.api.Request;
26  import org.eclipse.jetty.util.Promise;
27  import org.eclipse.jetty.util.component.ContainerLifeCycle;
28  
29  public abstract class PoolingHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
30  {
31      private final ConnectionPool connectionPool;
32  
33      public PoolingHttpDestination(HttpClient client, Origin origin)
34      {
35          super(client, origin);
36          this.connectionPool = newConnectionPool(client);
37      }
38  
39      protected ConnectionPool newConnectionPool(HttpClient client)
40      {
41          return new ConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
42      }
43  
44      public ConnectionPool getConnectionPool()
45      {
46          return connectionPool;
47      }
48  
49      @Override
50      @SuppressWarnings("unchecked")
51      public void succeeded(Connection connection)
52      {
53          process((C)connection, true);
54      }
55  
56      @Override
57      public void failed(final Throwable x)
58      {
59          getHttpClient().getExecutor().execute(new Runnable()
60          {
61              @Override
62              public void run()
63              {
64                  abort(x);
65              }
66          });
67      }
68  
69      protected void send()
70      {
71          C connection = acquire();
72          if (connection != null)
73              process(connection, false);
74      }
75  
76      @SuppressWarnings("unchecked")
77      public C acquire()
78      {
79          return (C)connectionPool.acquire();
80      }
81  
82      /**
83       * <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p>
84       * <p>A new connection is created when a request needs to be executed; it is possible that the request that
85       * triggered the request creation is executed by another connection that was just released, so the new connection
86       * may become idle.</p>
87       * <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p>
88       *
89       * @param connection the new connection
90       * @param dispatch whether to dispatch the processing to another thread
91       */
92      public void process(final C connection, boolean dispatch)
93      {
94          HttpClient client = getHttpClient();
95          final HttpExchange exchange = getHttpExchanges().poll();
96          LOG.debug("Processing exchange {} on connection {}", exchange, connection);
97          if (exchange == null)
98          {
99              // TODO: review this part... may not be 100% correct
100             // TODO: e.g. is client is not running, there should be no need to close the connection
101 
102             if (!connectionPool.release(connection))
103                 connection.close();
104 
105             if (!client.isRunning())
106             {
107                 LOG.debug("{} is stopping", client);
108                 connection.close();
109             }
110         }
111         else
112         {
113             final Request request = exchange.getRequest();
114             Throwable cause = request.getAbortCause();
115             if (cause != null)
116             {
117                 abort(exchange, cause);
118                 LOG.debug("Aborted before processing {}: {}", exchange, cause);
119             }
120             else
121             {
122                 if (dispatch)
123                 {
124                     client.getExecutor().execute(new Runnable()
125                     {
126                         @Override
127                         public void run()
128                         {
129                             send(connection, exchange);
130                         }
131                     });
132                 }
133                 else
134                 {
135                     send(connection, exchange);
136                 }
137             }
138         }
139     }
140 
141     protected abstract void send(C connection, HttpExchange exchange);
142 
143     public void release(C connection)
144     {
145         LOG.debug("{} released", connection);
146         HttpClient client = getHttpClient();
147         if (client.isRunning())
148         {
149             if (connectionPool.isActive(connection))
150                 process(connection, false);
151             else
152                 LOG.debug("{} explicit", connection);
153         }
154         else
155         {
156             LOG.debug("{} is stopped", client);
157             close(connection);
158             connection.close();
159         }
160     }
161 
162     @Override
163     public void close(Connection oldConnection)
164     {
165         super.close(oldConnection);
166         connectionPool.remove(oldConnection);
167 
168         // We need to execute queued requests even if this connection failed.
169         // We may create a connection that is not needed, but it will eventually
170         // idle timeout, so no worries
171         if (!getHttpExchanges().isEmpty())
172         {
173             C newConnection = acquire();
174             if (newConnection != null)
175                 process(newConnection, false);
176         }
177     }
178 
179     public void close()
180     {
181         connectionPool.close();
182     }
183 
184     @Override
185     public void dump(Appendable out, String indent) throws IOException
186     {
187         ContainerLifeCycle.dump(out, indent, Arrays.asList(connectionPool));
188     }
189 }