View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.util.concurrent.atomic.AtomicReference;
22  
23  import org.eclipse.jetty.client.api.Connection;
24  import org.eclipse.jetty.client.api.Request;
25  import org.eclipse.jetty.util.Promise;
26  
27  public abstract class MultiplexHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
28  {
29      private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED);
30      private C connection;
31  
32      protected MultiplexHttpDestination(HttpClient client, Origin origin)
33      {
34          super(client, origin);
35      }
36  
37      @Override
38      public void send()
39      {
40          while (true)
41          {
42              ConnectState current = connect.get();
43              switch (current)
44              {
45                  case DISCONNECTED:
46                  {
47                      if (!connect.compareAndSet(current, ConnectState.CONNECTING))
48                          break;
49                      newConnection(this);
50                      return;
51                  }
52                  case CONNECTING:
53                  {
54                      // Waiting to connect, just return
55                      return;
56                  }
57                  case CONNECTED:
58                  {
59                      if (process(connection))
60                          break;
61                      return;
62                  }
63                  default:
64                  {
65                      abort(new IllegalStateException("Invalid connection state " + current));
66                      return;
67                  }
68              }
69          }
70      }
71  
72      @Override
73      @SuppressWarnings("unchecked")
74      public void succeeded(Connection result)
75      {
76          C connection = this.connection = (C)result;
77          if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
78          {
79              process(connection);
80          }
81          else
82          {
83              connection.close();
84              failed(new IllegalStateException());
85          }
86      }
87  
88      @Override
89      public void failed(Throwable x)
90      {
91          connect.set(ConnectState.DISCONNECTED);
92          abort(x);
93      }
94  
95      protected boolean process(final C connection)
96      {
97          HttpClient client = getHttpClient();
98          final HttpExchange exchange = getHttpExchanges().poll();
99          if (LOG.isDebugEnabled())
100             LOG.debug("Processing {} on {}", exchange, connection);
101         if (exchange == null)
102             return false;
103 
104         final Request request = exchange.getRequest();
105         Throwable cause = request.getAbortCause();
106         if (cause != null)
107         {
108             if (LOG.isDebugEnabled())
109                 LOG.debug("Aborted before processing {}: {}", exchange, cause);
110             // It may happen that the request is aborted before the exchange
111             // is created. Aborting the exchange a second time will result in
112             // a no-operation, so we just abort here to cover that edge case.
113             exchange.abort(cause);
114         }
115         else
116         {
117             send(connection, exchange);
118         }
119         return true;
120     }
121 
122     @Override
123     public void close()
124     {
125         super.close();
126         C connection = this.connection;
127         if (connection != null)
128             connection.close();
129     }
130 
131     @Override
132     public void close(Connection connection)
133     {
134         super.close(connection);
135         while (true)
136         {
137             ConnectState current = connect.get();
138             if (connect.compareAndSet(current, ConnectState.DISCONNECTED))
139             {
140                 if (getHttpClient().isRemoveIdleDestinations())
141                     getHttpClient().removeDestination(this);
142                 break;
143             }
144         }
145     }
146 
147     protected abstract void send(C connection, HttpExchange exchange);
148 
149     private enum ConnectState
150     {
151         DISCONNECTED, CONNECTING, CONNECTED
152     }
153 }