View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.util.Collections;
23  
24  import org.eclipse.jetty.client.api.Connection;
25  import org.eclipse.jetty.client.api.Request;
26  import org.eclipse.jetty.util.Callback;
27  import org.eclipse.jetty.util.annotation.ManagedAttribute;
28  import org.eclipse.jetty.util.annotation.ManagedObject;
29  import org.eclipse.jetty.util.component.ContainerLifeCycle;
30  import org.eclipse.jetty.util.thread.Sweeper;
31  
32  @ManagedObject
33  public abstract class PoolingHttpDestination<C extends Connection> extends HttpDestination implements Callback
34  {
35      private DuplexConnectionPool connectionPool;
36  
37      public PoolingHttpDestination(HttpClient client, Origin origin)
38      {
39          super(client, origin);
40          this.connectionPool = newConnectionPool(client);
41          addBean(connectionPool);
42          Sweeper sweeper = client.getBean(Sweeper.class);
43          if (sweeper != null)
44              sweeper.offer(connectionPool);
45      }
46  
47      @Override
48      protected void doStart() throws Exception
49      {
50          HttpClient client = getHttpClient();
51          this.connectionPool = newConnectionPool(client);
52          addBean(connectionPool);
53          super.doStart();
54          Sweeper sweeper = client.getBean(Sweeper.class);
55          if (sweeper != null)
56              sweeper.offer(connectionPool);
57      }
58  
59      @Override
60      protected void doStop() throws Exception
61      {
62          HttpClient client = getHttpClient();
63          Sweeper sweeper = client.getBean(Sweeper.class);
64          if (sweeper != null)
65              sweeper.remove(connectionPool);
66          super.doStop();
67          removeBean(connectionPool);
68      }
69  
70      protected DuplexConnectionPool newConnectionPool(HttpClient client)
71      {
72          return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
73      }
74  
75      @ManagedAttribute(value = "The connection pool", readonly = true)
76      public DuplexConnectionPool getConnectionPool()
77      {
78          return connectionPool;
79      }
80  
81      @Override
82      public void succeeded()
83      {
84          send();
85      }
86  
87      @Override
88      public void failed(final Throwable x)
89      {
90          abort(x);
91      }
92  
93      public void send()
94      {
95          if (getHttpExchanges().isEmpty())
96              return;
97          process();
98      }
99  
100     @SuppressWarnings("unchecked")
101     public C acquire()
102     {
103         return (C)connectionPool.acquire();
104     }
105 
106     private void process()
107     {
108         while (true)
109         {
110             C connection = acquire();
111             if (connection == null)
112                 break;
113             boolean proceed = process(connection);
114             if (!proceed)
115                 break;
116         }
117     }
118 
119     /**
120      * <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p>
121      * <p>A new connection is created when a request needs to be executed; it is possible that the request that
122      * triggered the request creation is executed by another connection that was just released, so the new connection
123      * may become idle.</p>
124      * <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p>
125      *
126      * @param connection the new connection
127      * @return whether to perform more processing
128      */
129     public boolean process(final C connection)
130     {
131         HttpClient client = getHttpClient();
132         final HttpExchange exchange = getHttpExchanges().poll();
133         if (LOG.isDebugEnabled())
134             LOG.debug("Processing exchange {} on {} of {}", exchange, connection, this);
135         if (exchange == null)
136         {
137             if (!connectionPool.release(connection))
138                 connection.close();
139             if (!client.isRunning())
140             {
141                 if (LOG.isDebugEnabled())
142                     LOG.debug("{} is stopping", client);
143                 connection.close();
144             }
145             return false;
146         }
147         else
148         {
149             final Request request = exchange.getRequest();
150             Throwable cause = request.getAbortCause();
151             if (cause != null)
152             {
153                 if (LOG.isDebugEnabled())
154                     LOG.debug("Aborted before processing {}: {}", exchange, cause);
155                 // It may happen that the request is aborted before the exchange
156                 // is created. Aborting the exchange a second time will result in
157                 // a no-operation, so we just abort here to cover that edge case.
158                 exchange.abort(cause);
159             }
160             else
161             {
162                 SendFailure result = send(connection, exchange);
163                 if (result != null)
164                 {
165                     if (LOG.isDebugEnabled())
166                         LOG.debug("Send failed {} for {}", result, exchange);
167                     if (result.retry)
168                     {
169                         if (enqueue(getHttpExchanges(), exchange))
170                             return true;
171                     }
172 
173                     request.abort(result.failure);
174                 }
175             }
176             return getHttpExchanges().peek() != null;
177         }
178     }
179 
180     protected abstract SendFailure send(C connection, HttpExchange exchange);
181 
182     @Override
183     public void release(Connection c)
184     {
185         @SuppressWarnings("unchecked")
186         C connection = (C)c;
187         if (LOG.isDebugEnabled())
188             LOG.debug("Released {}", connection);
189         HttpClient client = getHttpClient();
190         if (client.isRunning())
191         {
192             if (connectionPool.isActive(connection))
193             {
194                 if (connectionPool.release(connection))
195                     send();
196                 else
197                     connection.close();
198             }
199             else
200             {
201                 if (LOG.isDebugEnabled())
202                     LOG.debug("Released explicit {}", connection);
203             }
204         }
205         else
206         {
207             if (LOG.isDebugEnabled())
208                 LOG.debug("{} is stopped", client);
209             connection.close();
210         }
211     }
212 
213     @Override
214     public void close(Connection connection)
215     {
216         super.close(connection);
217 
218         boolean removed = connectionPool.remove(connection);
219 
220         if (getHttpExchanges().isEmpty())
221         {
222             if (getHttpClient().isRemoveIdleDestinations() && connectionPool.isEmpty())
223             {
224                 // There is a race condition between this thread removing the destination
225                 // and another thread queueing a request to this same destination.
226                 // If this destination is removed, but the request queued, a new connection
227                 // will be opened, the exchange will be executed and eventually the connection
228                 // will idle timeout and be closed. Meanwhile a new destination will be created
229                 // in HttpClient and will be used for other requests.
230                 getHttpClient().removeDestination(this);
231             }
232         }
233         else
234         {
235             // We need to execute queued requests even if this connection failed.
236             // We may create a connection that is not needed, but it will eventually
237             // idle timeout, so no worries.
238             if (removed)
239                 process();
240         }
241     }
242 
243     public void close()
244     {
245         super.close();
246         connectionPool.close();
247     }
248 
249     @Override
250     public void dump(Appendable out, String indent) throws IOException
251     {
252         super.dump(out, indent);
253         ContainerLifeCycle.dump(out, indent, Collections.singletonList(connectionPool));
254     }
255 
256     @Override
257     public String toString()
258     {
259         return String.format("%s,pool=%s", super.toString(), connectionPool);
260     }
261 }