1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.net.SocketAddress;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.SocketChannel;
25 import java.util.Map;
26
27 import org.eclipse.jetty.client.api.Connection;
28 import org.eclipse.jetty.io.EndPoint;
29 import org.eclipse.jetty.io.SelectChannelEndPoint;
30 import org.eclipse.jetty.io.SelectorManager;
31 import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
32 import org.eclipse.jetty.util.Promise;
33 import org.eclipse.jetty.util.component.ContainerLifeCycle;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.log.Logger;
36
37 public abstract class AbstractHttpClientTransport extends ContainerLifeCycle implements HttpClientTransport
38 {
39 protected static final Logger LOG = Log.getLogger(HttpClientTransport.class);
40
41 private final int selectors;
42 private volatile HttpClient client;
43 private volatile SelectorManager selectorManager;
44
45 protected AbstractHttpClientTransport(int selectors)
46 {
47 this.selectors = selectors;
48 }
49
50 protected HttpClient getHttpClient()
51 {
52 return client;
53 }
54
55 @Override
56 public void setHttpClient(HttpClient client)
57 {
58 this.client = client;
59 }
60
61 @Override
62 protected void doStart() throws Exception
63 {
64 selectorManager = newSelectorManager(client);
65 selectorManager.setConnectTimeout(client.getConnectTimeout());
66 addBean(selectorManager);
67 super.doStart();
68 }
69
70 @Override
71 protected void doStop() throws Exception
72 {
73 super.doStop();
74 removeBean(selectorManager);
75 }
76
77 @Override
78 public void connect(SocketAddress address, Map<String, Object> context)
79 {
80 SocketChannel channel = null;
81 try
82 {
83 channel = SocketChannel.open();
84 HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
85 HttpClient client = destination.getHttpClient();
86 SocketAddress bindAddress = client.getBindAddress();
87 if (bindAddress != null)
88 channel.bind(bindAddress);
89 configure(client, channel);
90 channel.configureBlocking(false);
91
92 context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, destination.getHost());
93 context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, destination.getPort());
94
95 if (channel.connect(address))
96 selectorManager.accept(channel, context);
97 else
98 selectorManager.connect(channel, context);
99 }
100
101
102 catch (Throwable x)
103 {
104 try
105 {
106 if (channel != null)
107 channel.close();
108 }
109 catch (IOException xx)
110 {
111 LOG.ignore(xx);
112 }
113 finally
114 {
115 connectFailed(context, x);
116 }
117 }
118 }
119
120 protected void connectFailed(Map<String, Object> context, Throwable x)
121 {
122 if (LOG.isDebugEnabled())
123 LOG.debug("Could not connect to {}", context.get(HTTP_DESTINATION_CONTEXT_KEY));
124 @SuppressWarnings("unchecked")
125 Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
126 promise.failed(x);
127 }
128
129 protected void configure(HttpClient client, SocketChannel channel) throws IOException
130 {
131 channel.socket().setTcpNoDelay(client.isTCPNoDelay());
132 }
133
134 protected SelectorManager newSelectorManager(HttpClient client)
135 {
136 return new ClientSelectorManager(client, selectors);
137 }
138
139 protected class ClientSelectorManager extends SelectorManager
140 {
141 private final HttpClient client;
142
143 protected ClientSelectorManager(HttpClient client, int selectors)
144 {
145 super(client.getExecutor(), client.getScheduler(), selectors);
146 this.client = client;
147 }
148
149 @Override
150 protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key)
151 {
152 return new SelectChannelEndPoint(channel, selector, key, getScheduler(), client.getIdleTimeout());
153 }
154
155 @Override
156 public org.eclipse.jetty.io.Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
157 {
158 @SuppressWarnings("unchecked")
159 Map<String, Object> context = (Map<String, Object>)attachment;
160 HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
161 return destination.getClientConnectionFactory().newConnection(endPoint, context);
162 }
163
164 @Override
165 protected void connectionFailed(SocketChannel channel, Throwable x, Object attachment)
166 {
167 @SuppressWarnings("unchecked")
168 Map<String, Object> context = (Map<String, Object>)attachment;
169 connectFailed(context, x);
170 }
171 }
172 }