1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.util.concurrent.atomic.AtomicReference;
22
23 import org.eclipse.jetty.client.api.Connection;
24 import org.eclipse.jetty.client.api.Request;
25 import org.eclipse.jetty.util.Promise;
26
27 public abstract class MultiplexHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
28 {
29 private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED);
30 private C connection;
31
32 protected MultiplexHttpDestination(HttpClient client, Origin origin)
33 {
34 super(client, origin);
35 }
36
37 @Override
38 public void send()
39 {
40 while (true)
41 {
42 ConnectState current = connect.get();
43 switch (current)
44 {
45 case DISCONNECTED:
46 {
47 if (!connect.compareAndSet(current, ConnectState.CONNECTING))
48 break;
49 newConnection(this);
50 return;
51 }
52 case CONNECTING:
53 {
54
55 return;
56 }
57 case CONNECTED:
58 {
59 if (process(connection))
60 break;
61 return;
62 }
63 default:
64 {
65 abort(new IllegalStateException("Invalid connection state " + current));
66 return;
67 }
68 }
69 }
70 }
71
72 @Override
73 @SuppressWarnings("unchecked")
74 public void succeeded(Connection result)
75 {
76 C connection = this.connection = (C)result;
77 if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
78 {
79 process(connection);
80 }
81 else
82 {
83 connection.close();
84 failed(new IllegalStateException());
85 }
86 }
87
88 @Override
89 public void failed(Throwable x)
90 {
91 connect.set(ConnectState.DISCONNECTED);
92 abort(x);
93 }
94
95 protected boolean process(final C connection)
96 {
97 HttpClient client = getHttpClient();
98 final HttpExchange exchange = getHttpExchanges().poll();
99 if (LOG.isDebugEnabled())
100 LOG.debug("Processing {} on {}", exchange, connection);
101 if (exchange == null)
102 return false;
103
104 final Request request = exchange.getRequest();
105 Throwable cause = request.getAbortCause();
106 if (cause != null)
107 {
108 if (LOG.isDebugEnabled())
109 LOG.debug("Aborted before processing {}: {}", exchange, cause);
110
111
112
113 exchange.abort(cause);
114 }
115 else
116 {
117 send(connection, exchange);
118 }
119 return true;
120 }
121
122 @Override
123 public void close()
124 {
125 super.close();
126 C connection = this.connection;
127 if (connection != null)
128 connection.close();
129 }
130
131 @Override
132 public void close(Connection connection)
133 {
134 super.close(connection);
135 while (true)
136 {
137 ConnectState current = connect.get();
138 if (connect.compareAndSet(current, ConnectState.DISCONNECTED))
139 {
140 if (getHttpClient().isRemoveIdleDestinations())
141 getHttpClient().removeDestination(this);
142 break;
143 }
144 }
145 }
146
147 protected abstract void send(C connection, HttpExchange exchange);
148
149 private enum ConnectState
150 {
151 DISCONNECTED, CONNECTING, CONNECTED
152 }
153 }