View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.util.concurrent.atomic.AtomicReference;
22  
23  import org.eclipse.jetty.client.api.Connection;
24  import org.eclipse.jetty.client.api.Request;
25  import org.eclipse.jetty.util.Promise;
26  
27  public abstract class MultiplexHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
28  {
29      private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED);
30      private C connection;
31  
32      protected MultiplexHttpDestination(HttpClient client, Origin origin)
33      {
34          super(client, origin);
35      }
36  
37      @Override
38      protected void send()
39      {
40          while (true)
41          {
42              ConnectState current = connect.get();
43              switch (current)
44              {
45                  case DISCONNECTED:
46                  {
47                      if (!connect.compareAndSet(current, ConnectState.CONNECTING))
48                          break;
49                      newConnection(this);
50                      return;
51                  }
52                  case CONNECTING:
53                  {
54                      // Waiting to connect, just return
55                      return;
56                  }
57                  case CONNECTED:
58                  {
59                      if (process(connection, false))
60                          break;
61                      return;
62                  }
63                  default:
64                  {
65                      throw new IllegalStateException();
66                  }
67              }
68          }
69      }
70  
71      @Override
72      @SuppressWarnings("unchecked")
73      public void succeeded(Connection result)
74      {
75          C connection = this.connection = (C)result;
76          if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
77          {
78              process(connection, true);
79          }
80          else
81          {
82              connection.close();
83              failed(new IllegalStateException());
84          }
85      }
86  
87      @Override
88      public void failed(Throwable x)
89      {
90          connect.set(ConnectState.DISCONNECTED);
91      }
92  
93      protected boolean process(final C connection, boolean dispatch)
94      {
95          HttpClient client = getHttpClient();
96          final HttpExchange exchange = getHttpExchanges().poll();
97          if (LOG.isDebugEnabled())
98              LOG.debug("Processing {} on {}", exchange, connection);
99          if (exchange == null)
100             return false;
101 
102         final Request request = exchange.getRequest();
103         Throwable cause = request.getAbortCause();
104         if (cause != null)
105         {
106             if (LOG.isDebugEnabled())
107                 LOG.debug("Aborted before processing {}: {}", exchange, cause);
108             // It may happen that the request is aborted before the exchange
109             // is created. Aborting the exchange a second time will result in
110             // a no-operation, so we just abort here to cover that edge case.
111             exchange.abort(cause);
112         }
113         else
114         {
115             if (dispatch)
116             {
117                 client.getExecutor().execute(new Runnable()
118                 {
119                     @Override
120                     public void run()
121                     {
122                         send(connection, exchange);
123                     }
124                 });
125             }
126             else
127             {
128                 send(connection, exchange);
129             }
130         }
131         return true;
132     }
133 
134     @Override
135     public void close()
136     {
137         super.close();
138         C connection = this.connection;
139         if (connection != null)
140             connection.close();
141     }
142 
143     @Override
144     public void close(Connection connection)
145     {
146         super.close(connection);
147         while (true)
148         {
149             ConnectState current = connect.get();
150             if (connect.compareAndSet(current, ConnectState.DISCONNECTED))
151             {
152                 if (getHttpClient().isRemoveIdleDestinations())
153                     getHttpClient().removeDestination(this);
154                 break;
155             }
156         }
157     }
158 
159     protected abstract void send(C connection, HttpExchange exchange);
160 
161     private enum ConnectState
162     {
163         DISCONNECTED, CONNECTING, CONNECTED
164     }
165 }