1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy.client;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.SocketChannel;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.Map;
30 import java.util.Queue;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.Executor;
34
35 import org.eclipse.jetty.io.ByteBufferPool;
36 import org.eclipse.jetty.io.ClientConnectionFactory;
37 import org.eclipse.jetty.io.Connection;
38 import org.eclipse.jetty.io.EndPoint;
39 import org.eclipse.jetty.io.MappedByteBufferPool;
40 import org.eclipse.jetty.io.SelectChannelEndPoint;
41 import org.eclipse.jetty.io.SelectorManager;
42 import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
43 import org.eclipse.jetty.spdy.FlowControlStrategy;
44 import org.eclipse.jetty.spdy.api.GoAwayInfo;
45 import org.eclipse.jetty.spdy.api.Session;
46 import org.eclipse.jetty.spdy.api.SessionFrameListener;
47 import org.eclipse.jetty.util.Callback;
48 import org.eclipse.jetty.util.FuturePromise;
49 import org.eclipse.jetty.util.Promise;
50 import org.eclipse.jetty.util.component.ContainerLifeCycle;
51 import org.eclipse.jetty.util.ssl.SslContextFactory;
52 import org.eclipse.jetty.util.thread.QueuedThreadPool;
53 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
54 import org.eclipse.jetty.util.thread.Scheduler;
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 public class SPDYClient
73 {
74 private final short version;
75 private final Factory factory;
76 private volatile SocketAddress bindAddress;
77 private volatile long idleTimeout = -1;
78 private volatile int initialWindowSize;
79 private volatile boolean dispatchIO;
80 private volatile ClientConnectionFactory connectionFactory;
81
82 protected SPDYClient(short version, Factory factory)
83 {
84 this.version = version;
85 this.factory = factory;
86 setInitialWindowSize(65536);
87 setDispatchIO(true);
88 }
89
90 public short getVersion()
91 {
92 return version;
93 }
94
95 public Factory getFactory()
96 {
97 return factory;
98 }
99
100
101
102
103
104
105
106
107
108
109
110
111 public Session connect(SocketAddress address, SessionFrameListener listener) throws ExecutionException, InterruptedException
112 {
113 FuturePromise<Session> promise = new FuturePromise<>();
114 connect(address, listener, promise);
115 return promise.get();
116 }
117
118
119
120
121
122
123
124
125
126
127
128 public void connect(SocketAddress address, SessionFrameListener listener, Promise<Session> promise)
129 {
130 connect(address, listener, promise, new HashMap<String, Object>());
131 }
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 public void connect(final SocketAddress address, final SessionFrameListener listener, final Promise<Session> promise, Map<String, Object> context)
147 {
148 if (!factory.isStarted())
149 throw new IllegalStateException(Factory.class.getSimpleName() + " is not started");
150
151 try
152 {
153 SocketChannel channel = SocketChannel.open();
154 if (bindAddress != null)
155 channel.bind(bindAddress);
156 configure(channel);
157 channel.configureBlocking(false);
158 channel.connect(address);
159
160 context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, ((InetSocketAddress)address).getHostString());
161 context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, ((InetSocketAddress)address).getPort());
162 context.put(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY, this);
163 context.put(SPDYClientConnectionFactory.SPDY_SESSION_LISTENER_CONTEXT_KEY, listener);
164 context.put(SPDYClientConnectionFactory.SPDY_SESSION_PROMISE_CONTEXT_KEY, promise);
165
166 factory.selector.connect(channel, context);
167 }
168 catch (IOException x)
169 {
170 promise.failed(x);
171 }
172 }
173
174 protected void configure(SocketChannel channel) throws IOException
175 {
176 channel.socket().setTcpNoDelay(true);
177 }
178
179
180
181
182
183 public SocketAddress getBindAddress()
184 {
185 return bindAddress;
186 }
187
188
189
190
191
192 public void setBindAddress(SocketAddress bindAddress)
193 {
194 this.bindAddress = bindAddress;
195 }
196
197 public long getIdleTimeout()
198 {
199 return idleTimeout;
200 }
201
202 public void setIdleTimeout(long idleTimeout)
203 {
204 this.idleTimeout = idleTimeout;
205 }
206
207 public int getInitialWindowSize()
208 {
209 return initialWindowSize;
210 }
211
212 public void setInitialWindowSize(int initialWindowSize)
213 {
214 this.initialWindowSize = initialWindowSize;
215 }
216
217 public boolean isDispatchIO()
218 {
219 return dispatchIO;
220 }
221
222 public void setDispatchIO(boolean dispatchIO)
223 {
224 this.dispatchIO = dispatchIO;
225 }
226
227 public ClientConnectionFactory getClientConnectionFactory()
228 {
229 return connectionFactory;
230 }
231
232 public void setClientConnectionFactory(ClientConnectionFactory connectionFactory)
233 {
234 this.connectionFactory = connectionFactory;
235 }
236
237 protected FlowControlStrategy newFlowControlStrategy()
238 {
239 return FlowControlStrategyFactory.newFlowControlStrategy(version);
240 }
241
242 public static class Factory extends ContainerLifeCycle
243 {
244 private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
245 private final Scheduler scheduler;
246 private final Executor executor;
247 private final ByteBufferPool bufferPool;
248 private final SslContextFactory sslContextFactory;
249 private final SelectorManager selector;
250 private final long idleTimeout;
251 private long connectTimeout;
252
253 public Factory()
254 {
255 this(null, null);
256 }
257
258 public Factory(SslContextFactory sslContextFactory)
259 {
260 this(null, null, sslContextFactory);
261 }
262
263 public Factory(Executor executor)
264 {
265 this(executor, null);
266 }
267
268 public Factory(Executor executor, Scheduler scheduler)
269 {
270 this(executor, scheduler, null);
271 }
272
273 public Factory(Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory)
274 {
275 this(executor, scheduler, sslContextFactory, 30000);
276 }
277
278 public Factory(Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory, long idleTimeout)
279 {
280 this(executor, scheduler, null, sslContextFactory, idleTimeout);
281 }
282
283 public Factory(Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, SslContextFactory sslContextFactory, long idleTimeout)
284 {
285 this.idleTimeout = idleTimeout;
286 setConnectTimeout(15000);
287
288 if (executor == null)
289 executor = new QueuedThreadPool();
290 this.executor = executor;
291 addBean(executor);
292
293 if (scheduler == null)
294 scheduler = new ScheduledExecutorScheduler();
295 this.scheduler = scheduler;
296 addBean(scheduler);
297
298 if (bufferPool == null)
299 bufferPool = new MappedByteBufferPool();
300 this.bufferPool = bufferPool;
301 addBean(bufferPool);
302
303 this.sslContextFactory = sslContextFactory;
304 if (sslContextFactory != null)
305 addBean(sslContextFactory);
306
307 selector = new ClientSelectorManager(executor, scheduler);
308 selector.setConnectTimeout(getConnectTimeout());
309 addBean(selector);
310 }
311
312 public ByteBufferPool getByteBufferPool()
313 {
314 return bufferPool;
315 }
316
317 public Scheduler getScheduler()
318 {
319 return scheduler;
320 }
321
322 public Executor getExecutor()
323 {
324 return executor;
325 }
326
327 public SslContextFactory getSslContextFactory()
328 {
329 return sslContextFactory;
330 }
331
332 public long getConnectTimeout()
333 {
334 return connectTimeout;
335 }
336
337 public void setConnectTimeout(long connectTimeout)
338 {
339 this.connectTimeout = connectTimeout;
340 }
341
342 public SPDYClient newSPDYClient(short version)
343 {
344 return newSPDYClient(version, new NPNClientConnectionFactory(getExecutor(), new SPDYClientConnectionFactory(), "spdy/" + version));
345 }
346
347 public SPDYClient newSPDYClient(short version, NegotiatingClientConnectionFactory negotiatingFactory)
348 {
349 SPDYClient client = new SPDYClient(version, this);
350
351 ClientConnectionFactory connectionFactory = negotiatingFactory.getClientConnectionFactory();
352 if (sslContextFactory != null)
353 connectionFactory = new SslClientConnectionFactory(getSslContextFactory(), getByteBufferPool(), getExecutor(), negotiatingFactory);
354
355 client.setClientConnectionFactory(connectionFactory);
356 return client;
357 }
358
359 @Override
360 protected void doStop() throws Exception
361 {
362 closeConnections();
363 super.doStop();
364 }
365
366 boolean sessionOpened(Session session)
367 {
368
369 return isRunning() && sessions.offer(session);
370 }
371
372 boolean sessionClosed(Session session)
373 {
374
375
376 return isRunning() && sessions.remove(session);
377 }
378
379 private void closeConnections()
380 {
381 for (Session session : sessions)
382 session.goAway(new GoAwayInfo(), new Callback.Adapter());
383 sessions.clear();
384 }
385
386 public Collection<Session> getSessions()
387 {
388 return Collections.unmodifiableCollection(sessions);
389 }
390
391 @Override
392 protected void dumpThis(Appendable out) throws IOException
393 {
394 super.dumpThis(out);
395 dump(out, "", sessions);
396 }
397
398 private class ClientSelectorManager extends SelectorManager
399 {
400 private ClientSelectorManager(Executor executor, Scheduler scheduler)
401 {
402 super(executor, scheduler);
403 }
404
405 @Override
406 protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
407 {
408 @SuppressWarnings("unchecked")
409 Map<String, Object> context = (Map<String, Object>)key.attachment();
410 SPDYClient client = (SPDYClient)context.get(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY);
411 long clientIdleTimeout = client.getIdleTimeout();
412 if (clientIdleTimeout < 0)
413 clientIdleTimeout = idleTimeout;
414 return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), clientIdleTimeout);
415 }
416
417 @Override
418 public Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
419 {
420 @SuppressWarnings("unchecked")
421 Map<String, Object> context = (Map<String, Object>)attachment;
422 try
423 {
424 SPDYClient client = (SPDYClient)context.get(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY);
425 return client.getClientConnectionFactory().newConnection(endPoint, context);
426 }
427 catch (Throwable x)
428 {
429 @SuppressWarnings("unchecked")
430 Promise<Session> promise = (Promise<Session>)context.get(SPDYClientConnectionFactory.SPDY_SESSION_PROMISE_CONTEXT_KEY);
431 promise.failed(x);
432 throw x;
433 }
434 }
435 }
436 }
437 }