1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.nio.channels.SelectionKey;
18 import java.nio.channels.SocketChannel;
19 import javax.net.ssl.SSLContext;
20 import javax.net.ssl.SSLEngine;
21 import javax.net.ssl.SSLSession;
22
23 import org.eclipse.jetty.http.HttpMethods;
24 import org.eclipse.jetty.http.HttpVersions;
25 import org.eclipse.jetty.http.ssl.SslSelectChannelEndPoint;
26 import org.eclipse.jetty.io.Buffer;
27 import org.eclipse.jetty.io.Buffers;
28 import org.eclipse.jetty.io.Connection;
29 import org.eclipse.jetty.io.ThreadLocalBuffers;
30 import org.eclipse.jetty.io.nio.DirectNIOBuffer;
31 import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
32 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
33 import org.eclipse.jetty.io.nio.SelectorManager;
34 import org.eclipse.jetty.util.component.AbstractLifeCycle;
35 import org.eclipse.jetty.util.log.Log;
36
37 class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, Runnable
38 {
39 private final HttpClient _httpClient;
40 private SSLContext _sslContext;
41 private Buffers _sslBuffers;
42 private boolean _blockingConnect;
43
44 Manager _selectorManager=new Manager();
45
46
47
48
49 SelectConnector(HttpClient httpClient)
50 {
51 _httpClient = httpClient;
52 }
53
54
55
56
57
58 public boolean isBlockingConnect()
59 {
60 return _blockingConnect;
61 }
62
63
64
65
66
67 public void setBlockingConnect(boolean blockingConnect)
68 {
69 _blockingConnect = blockingConnect;
70 }
71
72
73 protected void doStart() throws Exception
74 {
75 super.doStart();
76
77 _selectorManager.start();
78
79 final boolean direct=_httpClient.getUseDirectBuffers();
80
81 SSLEngine sslEngine=_selectorManager.newSslEngine();
82 final SSLSession ssl_session=sslEngine.getSession();
83 ThreadLocalBuffers ssl_buffers = new ThreadLocalBuffers()
84 {
85 {
86 super.setBufferSize(ssl_session.getApplicationBufferSize());
87 super.setHeaderSize(ssl_session.getApplicationBufferSize());
88 }
89
90 @Override
91 protected Buffer newBuffer(int size)
92 {
93 return direct?new DirectNIOBuffer(size):new IndirectNIOBuffer(size);
94 }
95 @Override
96 protected Buffer newHeader(int size)
97 {
98 return direct?new DirectNIOBuffer(size):new IndirectNIOBuffer(size);
99 }
100 @Override
101 protected boolean isHeader(Buffer buffer)
102 {
103 return true;
104 }
105
106 @Override
107 public void setBufferSize(int size)
108 {
109 }
110
111 @Override
112 public void setHeaderSize(int size)
113 {
114 }
115 };
116 _sslBuffers=ssl_buffers;
117
118 _httpClient._threadPool.dispatch(this);
119 }
120
121
122 protected void doStop() throws Exception
123 {
124 _selectorManager.stop();
125 }
126
127
128 public void startConnection( HttpDestination destination )
129 throws IOException
130 {
131 SocketChannel channel = SocketChannel.open();
132 Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
133 channel.configureBlocking( false );
134 channel.connect(address.toSocketAddress());
135 _selectorManager.register( channel, destination );
136 }
137
138
139 public void run()
140 {
141 while (_httpClient.isRunning())
142 {
143 try
144 {
145 _selectorManager.doSelect(0);
146 }
147 catch (Exception e)
148 {
149 Log.warn(e.toString());
150 Log.debug(e);
151 Thread.yield();
152 }
153 }
154 }
155
156
157 class Manager extends SelectorManager
158 {
159 @Override
160 protected SocketChannel acceptChannel(SelectionKey key) throws IOException
161 {
162 throw new IllegalStateException();
163 }
164
165 @Override
166 public boolean dispatch(Runnable task)
167 {
168 return SelectConnector.this._httpClient._threadPool.dispatch(task);
169 }
170
171 @Override
172 protected void endPointOpened(SelectChannelEndPoint endpoint)
173 {
174 }
175
176 @Override
177 protected void endPointClosed(SelectChannelEndPoint endpoint)
178 {
179 }
180
181 @Override
182 protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
183 {
184 if (endpoint instanceof SslSelectChannelEndPoint)
185 return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
186
187 return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
188 }
189
190 @Override
191 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
192 {
193
194 HttpDestination dest=(HttpDestination)key.attachment();
195
196 SelectChannelEndPoint ep=null;
197
198 if (dest.isSecure())
199 {
200 if (dest.isProxied())
201 {
202 String connect = HttpMethods.CONNECT+" "+dest.getAddress()+HttpVersions.HTTP_1_0+"\r\n\r\n";
203
204
205 throw new IllegalStateException("Not Implemented");
206 }
207
208 SSLEngine engine=newSslEngine();
209 ep = new SslSelectChannelEndPoint(_sslBuffers,channel,selectSet,key,engine);
210 }
211 else
212 {
213 ep=new SelectChannelEndPoint(channel,selectSet,key);
214 }
215
216 HttpConnection connection=(HttpConnection)ep.getConnection();
217 connection.setDestination(dest);
218 dest.onNewConnection(connection);
219 return ep;
220 }
221
222 private synchronized SSLEngine newSslEngine() throws IOException
223 {
224 if (_sslContext==null)
225 {
226 _sslContext = SelectConnector.this._httpClient.getSSLContext();
227 }
228
229 SSLEngine sslEngine = _sslContext.createSSLEngine();
230 sslEngine.setUseClientMode(true);
231 sslEngine.beginHandshake();
232
233 return sslEngine;
234 }
235
236
237
238
239
240 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
241 {
242 if (attachment instanceof HttpDestination)
243 ((HttpDestination)attachment).onConnectionFailed(ex);
244 else
245 super.connectionFailed(channel,ex,attachment);
246 }
247 }
248 }