1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.util.concurrent.atomic.AtomicReference;
22
23 import org.eclipse.jetty.client.api.Connection;
24 import org.eclipse.jetty.client.api.Request;
25 import org.eclipse.jetty.util.Promise;
26
27 public abstract class MultiplexHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
28 {
29 private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED);
30 private C connection;
31
32 protected MultiplexHttpDestination(HttpClient client, Origin origin)
33 {
34 super(client, origin);
35 }
36
37 @Override
38 protected void send()
39 {
40 while (true)
41 {
42 ConnectState current = connect.get();
43 switch (current)
44 {
45 case DISCONNECTED:
46 {
47 if (!connect.compareAndSet(current, ConnectState.CONNECTING))
48 break;
49 newConnection(this);
50 return;
51 }
52 case CONNECTING:
53 {
54
55 return;
56 }
57 case CONNECTED:
58 {
59 if (process(connection, false))
60 break;
61 return;
62 }
63 default:
64 {
65 throw new IllegalStateException();
66 }
67 }
68 }
69 }
70
71 @Override
72 @SuppressWarnings("unchecked")
73 public void succeeded(Connection result)
74 {
75 C connection = this.connection = (C)result;
76 if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
77 {
78 process(connection, true);
79 }
80 else
81 {
82 connection.close();
83 failed(new IllegalStateException());
84 }
85 }
86
87 @Override
88 public void failed(Throwable x)
89 {
90 connect.set(ConnectState.DISCONNECTED);
91 }
92
93 protected boolean process(final C connection, boolean dispatch)
94 {
95 HttpClient client = getHttpClient();
96 final HttpExchange exchange = getHttpExchanges().poll();
97 LOG.debug("Processing {} on {}", exchange, connection);
98 if (exchange == null)
99 return false;
100
101 final Request request = exchange.getRequest();
102 Throwable cause = request.getAbortCause();
103 if (cause != null)
104 {
105 LOG.debug("Aborted before processing {}: {}", exchange, cause);
106
107
108
109 exchange.abort(cause);
110 }
111 else
112 {
113 if (dispatch)
114 {
115 client.getExecutor().execute(new Runnable()
116 {
117 @Override
118 public void run()
119 {
120 send(connection, exchange);
121 }
122 });
123 }
124 else
125 {
126 send(connection, exchange);
127 }
128 }
129 return true;
130 }
131
132 @Override
133 public void close()
134 {
135 super.close();
136 C connection = this.connection;
137 if (connection != null)
138 connection.close();
139 }
140
141 @Override
142 public void close(Connection connection)
143 {
144 super.close(connection);
145 while (true)
146 {
147 ConnectState current = connect.get();
148 if (connect.compareAndSet(current, ConnectState.DISCONNECTED))
149 {
150 if (getHttpClient().isRemoveIdleDestinations())
151 getHttpClient().removeDestination(this);
152 break;
153 }
154 }
155 }
156
157 protected abstract void send(C connection, HttpExchange exchange);
158
159 private enum ConnectState
160 {
161 DISCONNECTED, CONNECTING, CONNECTED
162 }
163 }