View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.util.concurrent.atomic.AtomicInteger;
22  import java.util.concurrent.atomic.AtomicReference;
23  
24  import org.eclipse.jetty.client.api.Connection;
25  import org.eclipse.jetty.client.api.Request;
26  import org.eclipse.jetty.util.Promise;
27  
28  public abstract class MultiplexHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
29  {
30      private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED);
31      private final AtomicInteger requestsPerConnection = new AtomicInteger();
32      private int maxRequestsPerConnection = 1024;
33      private C connection;
34  
35      protected MultiplexHttpDestination(HttpClient client, Origin origin)
36      {
37          super(client, origin);
38      }
39  
40      public int getMaxRequestsPerConnection()
41      {
42          return maxRequestsPerConnection;
43      }
44  
45      public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
46      {
47          this.maxRequestsPerConnection = maxRequestsPerConnection;
48      }
49  
50      @Override
51      public void send()
52      {
53          while (true)
54          {
55              ConnectState current = connect.get();
56              switch (current)
57              {
58                  case DISCONNECTED:
59                  {
60                      if (!connect.compareAndSet(current, ConnectState.CONNECTING))
61                          break;
62                      newConnection(this);
63                      return;
64                  }
65                  case CONNECTING:
66                  {
67                      // Waiting to connect, just return
68                      return;
69                  }
70                  case CONNECTED:
71                  {
72                      if (process(connection))
73                          break;
74                      return;
75                  }
76                  default:
77                  {
78                      abort(new IllegalStateException("Invalid connection state " + current));
79                      return;
80                  }
81              }
82          }
83      }
84  
85      @Override
86      @SuppressWarnings("unchecked")
87      public void succeeded(Connection result)
88      {
89          C connection = this.connection = (C)result;
90          if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
91          {
92              send();
93          }
94          else
95          {
96              connection.close();
97              failed(new IllegalStateException("Invalid connection state " + connect));
98          }
99      }
100 
101     @Override
102     public void failed(Throwable x)
103     {
104         connect.set(ConnectState.DISCONNECTED);
105         abort(x);
106     }
107 
108     protected boolean process(final C connection)
109     {
110         while (true)
111         {
112             int max = getMaxRequestsPerConnection();
113             int count = requestsPerConnection.get();
114             int next = count + 1;
115             if (next > max)
116                 return false;
117 
118             if (requestsPerConnection.compareAndSet(count, next))
119             {
120                 HttpExchange exchange = getHttpExchanges().poll();
121                 if (LOG.isDebugEnabled())
122                     LOG.debug("Processing {}/{} {} on {}", next, max, exchange, connection);
123                 if (exchange == null)
124                 {
125                     requestsPerConnection.decrementAndGet();
126                     return false;
127                 }
128 
129                 final Request request = exchange.getRequest();
130                 Throwable cause = request.getAbortCause();
131                 if (cause != null)
132                 {
133                     if (LOG.isDebugEnabled())
134                         LOG.debug("Aborted before processing {}: {}", exchange, cause);
135                     // It may happen that the request is aborted before the exchange
136                     // is created. Aborting the exchange a second time will result in
137                     // a no-operation, so we just abort here to cover that edge case.
138                     exchange.abort(cause);
139                     requestsPerConnection.decrementAndGet();
140                 }
141                 else
142                 {
143                     SendFailure result = send(connection, exchange);
144                     if (result != null)
145                     {
146                         if (LOG.isDebugEnabled())
147                             LOG.debug("Send failed {} for {}", result, exchange);
148                         if (result.retry)
149                         {
150                             if (enqueue(getHttpExchanges(), exchange))
151                                 return true;
152                         }
153 
154                         request.abort(result.failure);
155                     }
156                 }
157                 return getHttpExchanges().peek() != null;
158             }
159         }
160     }
161 
162     @Override
163     public void release(Connection connection)
164     {
165         requestsPerConnection.decrementAndGet();
166         send();
167     }
168 
169     @Override
170     public void close()
171     {
172         super.close();
173         C connection = this.connection;
174         if (connection != null)
175             connection.close();
176     }
177 
178     @Override
179     public void close(Connection connection)
180     {
181         super.close(connection);
182         while (true)
183         {
184             ConnectState current = connect.get();
185             if (connect.compareAndSet(current, ConnectState.DISCONNECTED))
186             {
187                 if (getHttpClient().isRemoveIdleDestinations())
188                     getHttpClient().removeDestination(this);
189                 break;
190             }
191         }
192     }
193 
194     protected abstract SendFailure send(C connection, HttpExchange exchange);
195 
196     private enum ConnectState
197     {
198         DISCONNECTED, CONNECTING, CONNECTED
199     }
200 }