View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.util.concurrent.atomic.AtomicReference;
22  
23  import org.eclipse.jetty.client.api.Connection;
24  import org.eclipse.jetty.client.api.Request;
25  import org.eclipse.jetty.util.Promise;
26  
27  public abstract class MultiplexHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
28  {
29      private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED);
30      private C connection;
31  
32      protected MultiplexHttpDestination(HttpClient client, Origin origin)
33      {
34          super(client, origin);
35      }
36  
37      @Override
38      protected void send()
39      {
40          while (true)
41          {
42              ConnectState current = connect.get();
43              switch (current)
44              {
45                  case DISCONNECTED:
46                  {
47                      if (!connect.compareAndSet(current, ConnectState.CONNECTING))
48                          break;
49                      newConnection(this);
50                      return;
51                  }
52                  case CONNECTING:
53                  {
54                      // Waiting to connect, just return
55                      return;
56                  }
57                  case CONNECTED:
58                  {
59                      if (process(connection, false))
60                          break;
61                      return;
62                  }
63                  default:
64                  {
65                      throw new IllegalStateException();
66                  }
67              }
68          }
69      }
70  
71      @Override
72      @SuppressWarnings("unchecked")
73      public void succeeded(Connection result)
74      {
75          C connection = this.connection = (C)result;
76          if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
77          {
78              process(connection, true);
79          }
80          else
81          {
82              connection.close();
83              failed(new IllegalStateException());
84          }
85      }
86  
87      @Override
88      public void failed(Throwable x)
89      {
90          connect.set(ConnectState.DISCONNECTED);
91      }
92  
93      protected boolean process(final C connection, boolean dispatch)
94      {
95          HttpClient client = getHttpClient();
96          final HttpExchange exchange = getHttpExchanges().poll();
97          LOG.debug("Processing {} on {}", exchange, connection);
98          if (exchange == null)
99              return false;
100 
101         final Request request = exchange.getRequest();
102         Throwable cause = request.getAbortCause();
103         if (cause != null)
104         {
105             LOG.debug("Abort before processing {}: {}", exchange, cause);
106             abort(exchange, cause);
107         }
108         else
109         {
110             if (dispatch)
111             {
112                 client.getExecutor().execute(new Runnable()
113                 {
114                     @Override
115                     public void run()
116                     {
117                         send(connection, exchange);
118                     }
119                 });
120             }
121             else
122             {
123                 send(connection, exchange);
124             }
125         }
126         return true;
127     }
128 
129     @Override
130     public void close(Connection connection)
131     {
132         super.close(connection);
133         while (true)
134         {
135             ConnectState current = connect.get();
136             if (connect.compareAndSet(current, ConnectState.DISCONNECTED))
137                 break;
138         }
139     }
140 
141     protected abstract void send(C connection, HttpExchange exchange);
142 
143     private enum ConnectState
144     {
145         DISCONNECTED, CONNECTING, CONNECTED
146     }
147 }