View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.util.Collections;
23  
24  import org.eclipse.jetty.client.api.Connection;
25  import org.eclipse.jetty.client.api.Request;
26  import org.eclipse.jetty.util.Callback;
27  import org.eclipse.jetty.util.annotation.ManagedAttribute;
28  import org.eclipse.jetty.util.annotation.ManagedObject;
29  import org.eclipse.jetty.util.component.ContainerLifeCycle;
30  import org.eclipse.jetty.util.thread.Sweeper;
31  
32  @ManagedObject
33  public abstract class PoolingHttpDestination<C extends Connection> extends HttpDestination implements Callback
34  {
35      private DuplexConnectionPool connectionPool;
36  
37      public PoolingHttpDestination(HttpClient client, Origin origin)
38      {
39          super(client, origin);
40          this.connectionPool = newConnectionPool(client);
41          addBean(connectionPool);
42          Sweeper sweeper = client.getBean(Sweeper.class);
43          if (sweeper != null)
44              sweeper.offer(connectionPool);
45      }
46  
47      @Override
48      protected void doStart() throws Exception
49      {
50          HttpClient client = getHttpClient();
51          this.connectionPool = newConnectionPool(client);
52          addBean(connectionPool);
53          super.doStart();
54          Sweeper sweeper = client.getBean(Sweeper.class);
55          if (sweeper != null)
56              sweeper.offer(connectionPool);
57      }
58  
59      @Override
60      protected void doStop() throws Exception
61      {
62          HttpClient client = getHttpClient();
63          Sweeper sweeper = client.getBean(Sweeper.class);
64          if (sweeper != null)
65              sweeper.remove(connectionPool);
66          super.doStop();
67          removeBean(connectionPool);
68      }
69  
70      protected DuplexConnectionPool newConnectionPool(HttpClient client)
71      {
72          return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
73      }
74  
75      @ManagedAttribute(value = "The connection pool", readonly = true)
76      public DuplexConnectionPool getConnectionPool()
77      {
78          return connectionPool;
79      }
80  
81      @Override
82      public void succeeded()
83      {
84          send();
85      }
86  
87      @Override
88      public void failed(final Throwable x)
89      {
90          abort(x);
91      }
92  
93      public void send()
94      {
95          if (getHttpExchanges().isEmpty())
96              return;
97          process();
98      }
99  
100     @SuppressWarnings("unchecked")
101     public C acquire()
102     {
103         return (C)connectionPool.acquire();
104     }
105 
106     private void process()
107     {
108         while (true)
109         {
110             C connection = acquire();
111             if (connection == null)
112                 break;
113             boolean proceed = process(connection);
114             if (!proceed)
115                 break;
116         }
117     }
118 
119     /**
120      * <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p>
121      * <p>A new connection is created when a request needs to be executed; it is possible that the request that
122      * triggered the request creation is executed by another connection that was just released, so the new connection
123      * may become idle.</p>
124      * <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p>
125      *
126      * @param connection the new connection
127      * @return whether to perform more processing
128      */
129     public boolean process(final C connection)
130     {
131         HttpClient client = getHttpClient();
132         final HttpExchange exchange = getHttpExchanges().poll();
133         if (LOG.isDebugEnabled())
134             LOG.debug("Processing exchange {} on {} of {}", exchange, connection, this);
135         if (exchange == null)
136         {
137             if (!connectionPool.release(connection))
138                 connection.close();
139             if (!client.isRunning())
140             {
141                 if (LOG.isDebugEnabled())
142                     LOG.debug("{} is stopping", client);
143                 connection.close();
144             }
145             return false;
146         }
147         else
148         {
149             final Request request = exchange.getRequest();
150             Throwable cause = request.getAbortCause();
151             if (cause != null)
152             {
153                 if (LOG.isDebugEnabled())
154                     LOG.debug("Aborted before processing {}: {}", exchange, cause);
155                 // Won't use this connection, release it back.
156                 if (!connectionPool.release(connection))
157                     connection.close();
158                 // It may happen that the request is aborted before the exchange
159                 // is created. Aborting the exchange a second time will result in
160                 // a no-operation, so we just abort here to cover that edge case.
161                 exchange.abort(cause);
162             }
163             else
164             {
165                 SendFailure result = send(connection, exchange);
166                 if (result != null)
167                 {
168                     if (LOG.isDebugEnabled())
169                         LOG.debug("Send failed {} for {}", result, exchange);
170                     if (result.retry)
171                     {
172                         if (enqueue(getHttpExchanges(), exchange))
173                             return true;
174                     }
175 
176                     request.abort(result.failure);
177                 }
178             }
179             return getHttpExchanges().peek() != null;
180         }
181     }
182 
183     protected abstract SendFailure send(C connection, HttpExchange exchange);
184 
185     @Override
186     public void release(Connection c)
187     {
188         @SuppressWarnings("unchecked")
189         C connection = (C)c;
190         if (LOG.isDebugEnabled())
191             LOG.debug("Released {}", connection);
192         HttpClient client = getHttpClient();
193         if (client.isRunning())
194         {
195             if (connectionPool.isActive(connection))
196             {
197                 if (connectionPool.release(connection))
198                     send();
199                 else
200                     connection.close();
201             }
202             else
203             {
204                 if (LOG.isDebugEnabled())
205                     LOG.debug("Released explicit {}", connection);
206             }
207         }
208         else
209         {
210             if (LOG.isDebugEnabled())
211                 LOG.debug("{} is stopped", client);
212             connection.close();
213         }
214     }
215 
216     @Override
217     public void close(Connection connection)
218     {
219         super.close(connection);
220 
221         boolean removed = connectionPool.remove(connection);
222 
223         if (getHttpExchanges().isEmpty())
224         {
225             if (getHttpClient().isRemoveIdleDestinations() && connectionPool.isEmpty())
226             {
227                 // There is a race condition between this thread removing the destination
228                 // and another thread queueing a request to this same destination.
229                 // If this destination is removed, but the request queued, a new connection
230                 // will be opened, the exchange will be executed and eventually the connection
231                 // will idle timeout and be closed. Meanwhile a new destination will be created
232                 // in HttpClient and will be used for other requests.
233                 getHttpClient().removeDestination(this);
234             }
235         }
236         else
237         {
238             // We need to execute queued requests even if this connection failed.
239             // We may create a connection that is not needed, but it will eventually
240             // idle timeout, so no worries.
241             if (removed)
242                 process();
243         }
244     }
245 
246     public void close()
247     {
248         super.close();
249         connectionPool.close();
250     }
251 
252     @Override
253     public void dump(Appendable out, String indent) throws IOException
254     {
255         super.dump(out, indent);
256         ContainerLifeCycle.dump(out, indent, Collections.singletonList(connectionPool));
257     }
258 
259     @Override
260     public String toString()
261     {
262         return String.format("%s,pool=%s", super.toString(), connectionPool);
263     }
264 }