1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy.client;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.SocketChannel;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Queue;
32 import java.util.concurrent.ConcurrentLinkedQueue;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.Executor;
35
36 import org.eclipse.jetty.io.ByteBufferPool;
37 import org.eclipse.jetty.io.ClientConnectionFactory;
38 import org.eclipse.jetty.io.Connection;
39 import org.eclipse.jetty.io.EndPoint;
40 import org.eclipse.jetty.io.MappedByteBufferPool;
41 import org.eclipse.jetty.io.SelectChannelEndPoint;
42 import org.eclipse.jetty.io.SelectorManager;
43 import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
44 import org.eclipse.jetty.spdy.FlowControlStrategy;
45 import org.eclipse.jetty.spdy.api.GoAwayInfo;
46 import org.eclipse.jetty.spdy.api.Session;
47 import org.eclipse.jetty.spdy.api.SessionFrameListener;
48 import org.eclipse.jetty.util.Callback;
49 import org.eclipse.jetty.util.FuturePromise;
50 import org.eclipse.jetty.util.Promise;
51 import org.eclipse.jetty.util.component.ContainerLifeCycle;
52 import org.eclipse.jetty.util.ssl.SslContextFactory;
53 import org.eclipse.jetty.util.thread.QueuedThreadPool;
54 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
55 import org.eclipse.jetty.util.thread.Scheduler;
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 public class SPDYClient
74 {
75 private final short version;
76 private final Factory factory;
77 private volatile SocketAddress bindAddress;
78 private volatile long idleTimeout = -1;
79 private volatile int initialWindowSize;
80 private volatile boolean dispatchIO;
81 private volatile ClientConnectionFactory connectionFactory;
82
83 protected SPDYClient(short version, Factory factory)
84 {
85 this.version = version;
86 this.factory = factory;
87 setInitialWindowSize(65536);
88 setDispatchIO(true);
89 ClientConnectionFactory connectionFactory = new SPDYClientConnectionFactory();
90 if (factory.sslContextFactory != null)
91 connectionFactory = new SslClientConnectionFactory(factory.getSslContextFactory(), factory.getByteBufferPool(), factory.getExecutor(), new NPNClientConnectionFactory(this, connectionFactory));
92 setClientConnectionFactory(connectionFactory);
93 }
94
95 public short getVersion()
96 {
97 return version;
98 }
99
100 public Factory getFactory()
101 {
102 return factory;
103 }
104
105
106
107
108
109
110
111
112
113
114
115
116 public Session connect(SocketAddress address, SessionFrameListener listener) throws ExecutionException, InterruptedException
117 {
118 FuturePromise<Session> promise = new FuturePromise<>();
119 connect(address, listener, promise);
120 return promise.get();
121 }
122
123
124
125
126
127
128
129
130
131
132
133 public void connect(SocketAddress address, SessionFrameListener listener, Promise<Session> promise)
134 {
135 connect(address, listener, promise, new HashMap<String, Object>());
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151 public void connect(final SocketAddress address, final SessionFrameListener listener, final Promise<Session> promise, Map<String, Object> context)
152 {
153 if (!factory.isStarted())
154 throw new IllegalStateException(Factory.class.getSimpleName() + " is not started");
155
156 try
157 {
158 SocketChannel channel = SocketChannel.open();
159 if (bindAddress != null)
160 channel.bind(bindAddress);
161 configure(channel);
162 channel.configureBlocking(false);
163 channel.connect(address);
164
165 context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, ((InetSocketAddress)address).getHostString());
166 context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, ((InetSocketAddress)address).getPort());
167 context.put(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY, this);
168 context.put(SPDYClientConnectionFactory.SPDY_SESSION_LISTENER_CONTEXT_KEY, listener);
169 context.put(SPDYClientConnectionFactory.SPDY_SESSION_PROMISE_CONTEXT_KEY, promise);
170
171 factory.selector.connect(channel, context);
172 }
173 catch (IOException x)
174 {
175 promise.failed(x);
176 }
177 }
178
179 protected void configure(SocketChannel channel) throws IOException
180 {
181 channel.socket().setTcpNoDelay(true);
182 }
183
184
185
186
187
188 public SocketAddress getBindAddress()
189 {
190 return bindAddress;
191 }
192
193
194
195
196
197 public void setBindAddress(SocketAddress bindAddress)
198 {
199 this.bindAddress = bindAddress;
200 }
201
202 public long getIdleTimeout()
203 {
204 return idleTimeout;
205 }
206
207 public void setIdleTimeout(long idleTimeout)
208 {
209 this.idleTimeout = idleTimeout;
210 }
211
212 public int getInitialWindowSize()
213 {
214 return initialWindowSize;
215 }
216
217 public void setInitialWindowSize(int initialWindowSize)
218 {
219 this.initialWindowSize = initialWindowSize;
220 }
221
222 public boolean isDispatchIO()
223 {
224 return dispatchIO;
225 }
226
227 public void setDispatchIO(boolean dispatchIO)
228 {
229 this.dispatchIO = dispatchIO;
230 }
231
232 public ClientConnectionFactory getClientConnectionFactory()
233 {
234 return connectionFactory;
235 }
236
237 public void setClientConnectionFactory(ClientConnectionFactory connectionFactory)
238 {
239 this.connectionFactory = connectionFactory;
240 }
241
242 protected String selectProtocol(List<String> serverProtocols)
243 {
244 String protocol = "spdy/" + version;
245 for (String serverProtocol : serverProtocols)
246 {
247 if (serverProtocol.equals(protocol))
248 return protocol;
249 }
250 return null;
251 }
252
253 protected FlowControlStrategy newFlowControlStrategy()
254 {
255 return FlowControlStrategyFactory.newFlowControlStrategy(version);
256 }
257
258 public static class Factory extends ContainerLifeCycle
259 {
260 private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
261 private final ByteBufferPool bufferPool = new MappedByteBufferPool();
262 private final Scheduler scheduler;
263 private final Executor executor;
264 private final SslContextFactory sslContextFactory;
265 private final SelectorManager selector;
266 private final long idleTimeout;
267 private long connectTimeout;
268
269 public Factory()
270 {
271 this(null, null);
272 }
273
274 public Factory(SslContextFactory sslContextFactory)
275 {
276 this(null, null, sslContextFactory);
277 }
278
279 public Factory(Executor executor)
280 {
281 this(executor, null);
282 }
283
284 public Factory(Executor executor, Scheduler scheduler)
285 {
286 this(executor, scheduler, null);
287 }
288
289 public Factory(Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory)
290 {
291 this(executor, scheduler, sslContextFactory, 30000);
292 }
293
294 public Factory(Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory, long idleTimeout)
295 {
296 this.idleTimeout = idleTimeout;
297 setConnectTimeout(15000);
298
299 if (executor == null)
300 executor = new QueuedThreadPool();
301 this.executor = executor;
302 addBean(executor);
303
304 if (scheduler == null)
305 scheduler = new ScheduledExecutorScheduler();
306 this.scheduler = scheduler;
307 addBean(scheduler);
308
309 this.sslContextFactory = sslContextFactory;
310 if (sslContextFactory != null)
311 addBean(sslContextFactory);
312
313 selector = new ClientSelectorManager(executor, scheduler);
314 selector.setConnectTimeout(getConnectTimeout());
315 addBean(selector);
316 }
317
318 public ByteBufferPool getByteBufferPool()
319 {
320 return bufferPool;
321 }
322
323 public Scheduler getScheduler()
324 {
325 return scheduler;
326 }
327
328 public Executor getExecutor()
329 {
330 return executor;
331 }
332
333 public SslContextFactory getSslContextFactory()
334 {
335 return sslContextFactory;
336 }
337
338 public long getConnectTimeout()
339 {
340 return connectTimeout;
341 }
342
343 public void setConnectTimeout(long connectTimeout)
344 {
345 this.connectTimeout = connectTimeout;
346 }
347
348 public SPDYClient newSPDYClient(short version)
349 {
350 return new SPDYClient(version, this);
351 }
352
353 @Override
354 protected void doStop() throws Exception
355 {
356 closeConnections();
357 super.doStop();
358 }
359
360 boolean sessionOpened(Session session)
361 {
362
363 return isRunning() && sessions.offer(session);
364 }
365
366 boolean sessionClosed(Session session)
367 {
368
369
370 return isRunning() && sessions.remove(session);
371 }
372
373 private void closeConnections()
374 {
375 for (Session session : sessions)
376 session.goAway(new GoAwayInfo(), new Callback.Adapter());
377 sessions.clear();
378 }
379
380 public Collection<Session> getSessions()
381 {
382 return Collections.unmodifiableCollection(sessions);
383 }
384
385 private class ClientSelectorManager extends SelectorManager
386 {
387 private ClientSelectorManager(Executor executor, Scheduler scheduler)
388 {
389 super(executor, scheduler);
390 }
391
392 @Override
393 protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
394 {
395 @SuppressWarnings("unchecked")
396 Map<String, Object> context = (Map<String, Object>)key.attachment();
397 SPDYClient client = (SPDYClient)context.get(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY);
398 long clientIdleTimeout = client.getIdleTimeout();
399 if (clientIdleTimeout < 0)
400 clientIdleTimeout = idleTimeout;
401 return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), clientIdleTimeout);
402 }
403
404 @Override
405 public Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
406 {
407 @SuppressWarnings("unchecked")
408 Map<String, Object> context = (Map<String, Object>)attachment;
409 try
410 {
411 SPDYClient client = (SPDYClient)context.get(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY);
412 return client.getClientConnectionFactory().newConnection(endPoint, context);
413 }
414 catch (Throwable x)
415 {
416 @SuppressWarnings("unchecked")
417 Promise<Session> promise = (Promise<Session>)context.get(SPDYClientConnectionFactory.SPDY_SESSION_PROMISE_CONTEXT_KEY);
418 promise.failed(x);
419 throw x;
420 }
421 }
422 }
423 }
424 }