1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.util.concurrent.atomic.AtomicReference;
22
23 import org.eclipse.jetty.client.api.Connection;
24 import org.eclipse.jetty.client.api.Request;
25 import org.eclipse.jetty.util.Promise;
26
27 public abstract class MultiplexHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
28 {
29 private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED);
30 private C connection;
31
32 protected MultiplexHttpDestination(HttpClient client, Origin origin)
33 {
34 super(client, origin);
35 }
36
37 @Override
38 protected void send()
39 {
40 while (true)
41 {
42 ConnectState current = connect.get();
43 switch (current)
44 {
45 case DISCONNECTED:
46 {
47 if (!connect.compareAndSet(current, ConnectState.CONNECTING))
48 break;
49 newConnection(this);
50 return;
51 }
52 case CONNECTING:
53 {
54
55 return;
56 }
57 case CONNECTED:
58 {
59 if (process(connection, false))
60 break;
61 return;
62 }
63 default:
64 {
65 throw new IllegalStateException();
66 }
67 }
68 }
69 }
70
71 @Override
72 @SuppressWarnings("unchecked")
73 public void succeeded(Connection result)
74 {
75 C connection = this.connection = (C)result;
76 if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
77 {
78 process(connection, true);
79 }
80 else
81 {
82 connection.close();
83 failed(new IllegalStateException());
84 }
85 }
86
87 @Override
88 public void failed(Throwable x)
89 {
90 connect.set(ConnectState.DISCONNECTED);
91 }
92
93 protected boolean process(final C connection, boolean dispatch)
94 {
95 HttpClient client = getHttpClient();
96 final HttpExchange exchange = getHttpExchanges().poll();
97 LOG.debug("Processing {} on {}", exchange, connection);
98 if (exchange == null)
99 return false;
100
101 final Request request = exchange.getRequest();
102 Throwable cause = request.getAbortCause();
103 if (cause != null)
104 {
105 LOG.debug("Abort before processing {}: {}", exchange, cause);
106 abort(exchange, cause);
107 }
108 else
109 {
110 if (dispatch)
111 {
112 client.getExecutor().execute(new Runnable()
113 {
114 @Override
115 public void run()
116 {
117 send(connection, exchange);
118 }
119 });
120 }
121 else
122 {
123 send(connection, exchange);
124 }
125 }
126 return true;
127 }
128
129 @Override
130 public void close(Connection connection)
131 {
132 super.close(connection);
133 while (true)
134 {
135 ConnectState current = connect.get();
136 if (connect.compareAndSet(current, ConnectState.DISCONNECTED))
137 break;
138 }
139 }
140
141 protected abstract void send(C connection, HttpExchange exchange);
142
143 private enum ConnectState
144 {
145 DISCONNECTED, CONNECTING, CONNECTED
146 }
147 }