1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.util.concurrent.atomic.AtomicReference;
22
23 import org.eclipse.jetty.client.api.Connection;
24 import org.eclipse.jetty.client.api.Request;
25 import org.eclipse.jetty.util.Promise;
26
27 public abstract class MultiplexHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
28 {
29 private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED);
30 private C connection;
31
32 protected MultiplexHttpDestination(HttpClient client, Origin origin)
33 {
34 super(client, origin);
35 }
36
37 @Override
38 protected void send()
39 {
40 while (true)
41 {
42 ConnectState current = connect.get();
43 switch (current)
44 {
45 case DISCONNECTED:
46 {
47 if (!connect.compareAndSet(current, ConnectState.CONNECTING))
48 break;
49 newConnection(this);
50 return;
51 }
52 case CONNECTING:
53 {
54
55 return;
56 }
57 case CONNECTED:
58 {
59 if (process(connection, false))
60 break;
61 return;
62 }
63 default:
64 {
65 throw new IllegalStateException();
66 }
67 }
68 }
69 }
70
71 @Override
72 @SuppressWarnings("unchecked")
73 public void succeeded(Connection result)
74 {
75 C connection = this.connection = (C)result;
76 if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
77 {
78 process(connection, true);
79 }
80 else
81 {
82 connection.close();
83 failed(new IllegalStateException());
84 }
85 }
86
87 @Override
88 public void failed(Throwable x)
89 {
90 connect.set(ConnectState.DISCONNECTED);
91 }
92
93 protected boolean process(final C connection, boolean dispatch)
94 {
95 HttpClient client = getHttpClient();
96 final HttpExchange exchange = getHttpExchanges().poll();
97 if (LOG.isDebugEnabled())
98 LOG.debug("Processing {} on {}", exchange, connection);
99 if (exchange == null)
100 return false;
101
102 final Request request = exchange.getRequest();
103 Throwable cause = request.getAbortCause();
104 if (cause != null)
105 {
106 if (LOG.isDebugEnabled())
107 LOG.debug("Aborted before processing {}: {}", exchange, cause);
108
109
110
111 exchange.abort(cause);
112 }
113 else
114 {
115 if (dispatch)
116 {
117 client.getExecutor().execute(new Runnable()
118 {
119 @Override
120 public void run()
121 {
122 send(connection, exchange);
123 }
124 });
125 }
126 else
127 {
128 send(connection, exchange);
129 }
130 }
131 return true;
132 }
133
134 @Override
135 public void close()
136 {
137 super.close();
138 C connection = this.connection;
139 if (connection != null)
140 connection.close();
141 }
142
143 @Override
144 public void close(Connection connection)
145 {
146 super.close(connection);
147 while (true)
148 {
149 ConnectState current = connect.get();
150 if (connect.compareAndSet(current, ConnectState.DISCONNECTED))
151 {
152 if (getHttpClient().isRemoveIdleDestinations())
153 getHttpClient().removeDestination(this);
154 break;
155 }
156 }
157 }
158
159 protected abstract void send(C connection, HttpExchange exchange);
160
161 private enum ConnectState
162 {
163 DISCONNECTED, CONNECTING, CONNECTED
164 }
165 }