1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.net.SocketException;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.SocketChannel;
27 import java.util.Map;
28
29 import org.eclipse.jetty.client.api.Connection;
30 import org.eclipse.jetty.io.EndPoint;
31 import org.eclipse.jetty.io.ManagedSelector;
32 import org.eclipse.jetty.io.SelectChannelEndPoint;
33 import org.eclipse.jetty.io.SelectorManager;
34 import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
35 import org.eclipse.jetty.util.Promise;
36 import org.eclipse.jetty.util.annotation.ManagedAttribute;
37 import org.eclipse.jetty.util.annotation.ManagedObject;
38 import org.eclipse.jetty.util.component.ContainerLifeCycle;
39 import org.eclipse.jetty.util.log.Log;
40 import org.eclipse.jetty.util.log.Logger;
41
42 @ManagedObject
43 public abstract class AbstractHttpClientTransport extends ContainerLifeCycle implements HttpClientTransport
44 {
45 protected static final Logger LOG = Log.getLogger(HttpClientTransport.class);
46
47 private final int selectors;
48 private volatile HttpClient client;
49 private volatile SelectorManager selectorManager;
50
51 protected AbstractHttpClientTransport(int selectors)
52 {
53 this.selectors = selectors;
54 }
55
56 protected HttpClient getHttpClient()
57 {
58 return client;
59 }
60
61 @Override
62 public void setHttpClient(HttpClient client)
63 {
64 this.client = client;
65 }
66
67 @ManagedAttribute(value = "The number of selectors", readonly = true)
68 public int getSelectors()
69 {
70 return selectors;
71 }
72
73 @Override
74 protected void doStart() throws Exception
75 {
76 selectorManager = newSelectorManager(client);
77 selectorManager.setConnectTimeout(client.getConnectTimeout());
78 addBean(selectorManager);
79 super.doStart();
80 }
81
82 @Override
83 protected void doStop() throws Exception
84 {
85 super.doStop();
86 removeBean(selectorManager);
87 }
88
89 @Override
90 public void connect(InetSocketAddress address, Map<String, Object> context)
91 {
92 SocketChannel channel = null;
93 try
94 {
95 channel = SocketChannel.open();
96 HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
97 HttpClient client = destination.getHttpClient();
98 SocketAddress bindAddress = client.getBindAddress();
99 if (bindAddress != null)
100 channel.bind(bindAddress);
101 configure(client, channel);
102
103 context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, destination.getHost());
104 context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, destination.getPort());
105
106 if (client.isConnectBlocking())
107 {
108 channel.socket().connect(address, (int)client.getConnectTimeout());
109 channel.configureBlocking(false);
110 selectorManager.accept(channel, context);
111 }
112 else
113 {
114 channel.configureBlocking(false);
115 if (channel.connect(address))
116 selectorManager.accept(channel, context);
117 else
118 selectorManager.connect(channel, context);
119 }
120 }
121
122
123 catch (Throwable x)
124 {
125
126
127 if (x.getClass() == SocketException.class)
128 x = new SocketException("Could not connect to " + address).initCause(x);
129
130 try
131 {
132 if (channel != null)
133 channel.close();
134 }
135 catch (IOException xx)
136 {
137 LOG.ignore(xx);
138 }
139 finally
140 {
141 connectFailed(context, x);
142 }
143 }
144 }
145
146 protected void connectFailed(Map<String, Object> context, Throwable x)
147 {
148 if (LOG.isDebugEnabled())
149 LOG.debug("Could not connect to {}", context.get(HTTP_DESTINATION_CONTEXT_KEY));
150 @SuppressWarnings("unchecked")
151 Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
152 promise.failed(x);
153 }
154
155 protected void configure(HttpClient client, SocketChannel channel) throws IOException
156 {
157 channel.socket().setTcpNoDelay(client.isTCPNoDelay());
158 }
159
160 protected SelectorManager newSelectorManager(HttpClient client)
161 {
162 return new ClientSelectorManager(client, selectors);
163 }
164
165 protected class ClientSelectorManager extends SelectorManager
166 {
167 private final HttpClient client;
168
169 protected ClientSelectorManager(HttpClient client, int selectors)
170 {
171 super(client.getExecutor(), client.getScheduler(), selectors);
172 this.client = client;
173 }
174
175 @Override
176 protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key)
177 {
178 return new SelectChannelEndPoint(channel, selector, key, getScheduler(), client.getIdleTimeout());
179 }
180
181 @Override
182 public org.eclipse.jetty.io.Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
183 {
184 @SuppressWarnings("unchecked")
185 Map<String, Object> context = (Map<String, Object>)attachment;
186 HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
187 return destination.getClientConnectionFactory().newConnection(endPoint, context);
188 }
189
190 @Override
191 protected void connectionFailed(SocketChannel channel, Throwable x, Object attachment)
192 {
193 @SuppressWarnings("unchecked")
194 Map<String, Object> context = (Map<String, Object>)attachment;
195 connectFailed(context, x);
196 }
197 }
198 }