1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.net.SocketAddress;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.SocketChannel;
25 import java.util.Map;
26
27 import org.eclipse.jetty.client.api.Connection;
28 import org.eclipse.jetty.io.EndPoint;
29 import org.eclipse.jetty.io.SelectChannelEndPoint;
30 import org.eclipse.jetty.io.SelectorManager;
31 import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
32 import org.eclipse.jetty.util.Promise;
33 import org.eclipse.jetty.util.component.ContainerLifeCycle;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.log.Logger;
36
37 public abstract class AbstractHttpClientTransport extends ContainerLifeCycle implements HttpClientTransport
38 {
39 protected static final Logger LOG = Log.getLogger(HttpClientTransport.class);
40
41 private final int selectors;
42 private volatile HttpClient client;
43 private volatile SelectorManager selectorManager;
44
45 protected AbstractHttpClientTransport(int selectors)
46 {
47 this.selectors = selectors;
48 }
49
50 protected HttpClient getHttpClient()
51 {
52 return client;
53 }
54
55 @Override
56 public void setHttpClient(HttpClient client)
57 {
58 this.client = client;
59 }
60
61 @Override
62 protected void doStart() throws Exception
63 {
64 selectorManager = newSelectorManager(client);
65 selectorManager.setConnectTimeout(client.getConnectTimeout());
66 addBean(selectorManager);
67 super.doStart();
68 }
69
70 @Override
71 protected void doStop() throws Exception
72 {
73 super.doStop();
74 removeBean(selectorManager);
75 }
76
77 @Override
78 public void connect(SocketAddress address, Map<String, Object> context)
79 {
80 SocketChannel channel = null;
81 try
82 {
83 channel = SocketChannel.open();
84 HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
85 HttpClient client = destination.getHttpClient();
86 SocketAddress bindAddress = client.getBindAddress();
87 if (bindAddress != null)
88 channel.bind(bindAddress);
89 configure(client, channel);
90 channel.configureBlocking(false);
91 channel.connect(address);
92
93 context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, destination.getHost());
94 context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, destination.getPort());
95 selectorManager.connect(channel, context);
96 }
97
98
99 catch (Throwable x)
100 {
101 try
102 {
103 if (channel != null)
104 channel.close();
105 }
106 catch (IOException xx)
107 {
108 LOG.ignore(xx);
109 }
110 finally
111 {
112 @SuppressWarnings("unchecked")
113 Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
114 promise.failed(x);
115 }
116 }
117 }
118
119 protected void configure(HttpClient client, SocketChannel channel) throws IOException
120 {
121 channel.socket().setTcpNoDelay(client.isTCPNoDelay());
122 }
123
124 protected SelectorManager newSelectorManager(HttpClient client)
125 {
126 return new ClientSelectorManager(client, selectors);
127 }
128
129 protected class ClientSelectorManager extends SelectorManager
130 {
131 private final HttpClient client;
132
133 protected ClientSelectorManager(HttpClient client, int selectors)
134 {
135 super(client.getExecutor(), client.getScheduler(), selectors);
136 this.client = client;
137 }
138
139 @Override
140 protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key)
141 {
142 return new SelectChannelEndPoint(channel, selector, key, getScheduler(), client.getIdleTimeout());
143 }
144
145 @Override
146 public org.eclipse.jetty.io.Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
147 {
148 @SuppressWarnings("unchecked")
149 Map<String, Object> context = (Map<String, Object>)attachment;
150 HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
151 return destination.getClientConnectionFactory().newConnection(endPoint, context);
152 }
153
154 @Override
155 protected void connectionFailed(SocketChannel channel, Throwable x, Object attachment)
156 {
157 @SuppressWarnings("unchecked")
158 Map<String, Object> context = (Map<String, Object>)attachment;
159 @SuppressWarnings("unchecked")
160 Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
161 promise.failed(x);
162 }
163 }
164 }