1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy.client;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.SocketChannel;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.Map;
30 import java.util.Queue;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.Executor;
34
35 import org.eclipse.jetty.io.ByteBufferPool;
36 import org.eclipse.jetty.io.ClientConnectionFactory;
37 import org.eclipse.jetty.io.Connection;
38 import org.eclipse.jetty.io.EndPoint;
39 import org.eclipse.jetty.io.MappedByteBufferPool;
40 import org.eclipse.jetty.io.NegotiatingClientConnectionFactory;
41 import org.eclipse.jetty.io.SelectChannelEndPoint;
42 import org.eclipse.jetty.io.SelectorManager;
43 import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
44 import org.eclipse.jetty.spdy.FlowControlStrategy;
45 import org.eclipse.jetty.spdy.api.GoAwayInfo;
46 import org.eclipse.jetty.spdy.api.Session;
47 import org.eclipse.jetty.spdy.api.SessionFrameListener;
48 import org.eclipse.jetty.util.Callback;
49 import org.eclipse.jetty.util.FuturePromise;
50 import org.eclipse.jetty.util.Promise;
51 import org.eclipse.jetty.util.component.ContainerLifeCycle;
52 import org.eclipse.jetty.util.ssl.SslContextFactory;
53 import org.eclipse.jetty.util.thread.QueuedThreadPool;
54 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
55 import org.eclipse.jetty.util.thread.Scheduler;
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 public class SPDYClient
74 {
75 private final short version;
76 private final Factory factory;
77 private volatile SocketAddress bindAddress;
78 private volatile long idleTimeout = -1;
79 private volatile int initialWindowSize;
80 private volatile boolean dispatchIO;
81 private volatile ClientConnectionFactory connectionFactory;
82
83 protected SPDYClient(short version, Factory factory)
84 {
85 this.version = version;
86 this.factory = factory;
87 setInitialWindowSize(65536);
88 setDispatchIO(true);
89 }
90
91 public short getVersion()
92 {
93 return version;
94 }
95
96 public Factory getFactory()
97 {
98 return factory;
99 }
100
101
102
103
104
105
106
107
108
109
110
111
112 public Session connect(SocketAddress address, SessionFrameListener listener) throws ExecutionException, InterruptedException
113 {
114 FuturePromise<Session> promise = new FuturePromise<>();
115 connect(address, listener, promise);
116 return promise.get();
117 }
118
119
120
121
122
123
124
125
126
127
128
129 public void connect(SocketAddress address, SessionFrameListener listener, Promise<Session> promise)
130 {
131 connect(address, listener, promise, new HashMap<String, Object>());
132 }
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 public void connect(final SocketAddress address, final SessionFrameListener listener, final Promise<Session> promise, Map<String, Object> context)
148 {
149 if (!factory.isStarted())
150 throw new IllegalStateException(Factory.class.getSimpleName() + " is not started");
151
152 try
153 {
154 SocketChannel channel = SocketChannel.open();
155 if (bindAddress != null)
156 channel.bind(bindAddress);
157 configure(channel);
158 channel.configureBlocking(false);
159
160 context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, ((InetSocketAddress)address).getHostString());
161 context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, ((InetSocketAddress)address).getPort());
162 context.put(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY, this);
163 context.put(SPDYClientConnectionFactory.SPDY_SESSION_LISTENER_CONTEXT_KEY, listener);
164 context.put(SPDYClientConnectionFactory.SPDY_SESSION_PROMISE_CONTEXT_KEY, promise);
165
166 if (channel.connect(address))
167 factory.selector.accept(channel, context);
168 else
169 factory.selector.connect(channel, context);
170 }
171 catch (IOException x)
172 {
173 promise.failed(x);
174 }
175 }
176
177 protected void configure(SocketChannel channel) throws IOException
178 {
179 channel.socket().setTcpNoDelay(true);
180 }
181
182
183
184
185
186 public SocketAddress getBindAddress()
187 {
188 return bindAddress;
189 }
190
191
192
193
194
195 public void setBindAddress(SocketAddress bindAddress)
196 {
197 this.bindAddress = bindAddress;
198 }
199
200 public long getIdleTimeout()
201 {
202 return idleTimeout;
203 }
204
205 public void setIdleTimeout(long idleTimeout)
206 {
207 this.idleTimeout = idleTimeout;
208 }
209
210 public int getInitialWindowSize()
211 {
212 return initialWindowSize;
213 }
214
215 public void setInitialWindowSize(int initialWindowSize)
216 {
217 this.initialWindowSize = initialWindowSize;
218 }
219
220 public boolean isDispatchIO()
221 {
222 return dispatchIO;
223 }
224
225 public void setDispatchIO(boolean dispatchIO)
226 {
227 this.dispatchIO = dispatchIO;
228 }
229
230 public ClientConnectionFactory getClientConnectionFactory()
231 {
232 return connectionFactory;
233 }
234
235 public void setClientConnectionFactory(ClientConnectionFactory connectionFactory)
236 {
237 this.connectionFactory = connectionFactory;
238 }
239
240 protected FlowControlStrategy newFlowControlStrategy()
241 {
242 return FlowControlStrategyFactory.newFlowControlStrategy(version);
243 }
244
245 public static class Factory extends ContainerLifeCycle
246 {
247 private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
248 private final Scheduler scheduler;
249 private final Executor executor;
250 private final ByteBufferPool bufferPool;
251 private final SslContextFactory sslContextFactory;
252 private final SelectorManager selector;
253 private final long idleTimeout;
254 private long connectTimeout;
255
256 public Factory()
257 {
258 this(null, null);
259 }
260
261 public Factory(SslContextFactory sslContextFactory)
262 {
263 this(null, null, sslContextFactory);
264 }
265
266 public Factory(Executor executor)
267 {
268 this(executor, null);
269 }
270
271 public Factory(Executor executor, Scheduler scheduler)
272 {
273 this(executor, scheduler, null);
274 }
275
276 public Factory(Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory)
277 {
278 this(executor, scheduler, sslContextFactory, 30000);
279 }
280
281 public Factory(Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory, long idleTimeout)
282 {
283 this(executor, scheduler, null, sslContextFactory, idleTimeout);
284 }
285
286 public Factory(Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, SslContextFactory sslContextFactory, long idleTimeout)
287 {
288 this.idleTimeout = idleTimeout;
289 setConnectTimeout(15000);
290
291 if (executor == null)
292 executor = new QueuedThreadPool();
293 this.executor = executor;
294 addBean(executor);
295
296 if (scheduler == null)
297 scheduler = new ScheduledExecutorScheduler();
298 this.scheduler = scheduler;
299 addBean(scheduler);
300
301 if (bufferPool == null)
302 bufferPool = new MappedByteBufferPool();
303 this.bufferPool = bufferPool;
304 addBean(bufferPool);
305
306 this.sslContextFactory = sslContextFactory;
307 if (sslContextFactory != null)
308 addBean(sslContextFactory);
309
310 selector = new ClientSelectorManager(executor, scheduler);
311 selector.setConnectTimeout(getConnectTimeout());
312 addBean(selector);
313 }
314
315 public ByteBufferPool getByteBufferPool()
316 {
317 return bufferPool;
318 }
319
320 public Scheduler getScheduler()
321 {
322 return scheduler;
323 }
324
325 public Executor getExecutor()
326 {
327 return executor;
328 }
329
330 public SslContextFactory getSslContextFactory()
331 {
332 return sslContextFactory;
333 }
334
335 public long getConnectTimeout()
336 {
337 return connectTimeout;
338 }
339
340 public void setConnectTimeout(long connectTimeout)
341 {
342 this.connectTimeout = connectTimeout;
343 }
344
345 public SPDYClient newSPDYClient(short version)
346 {
347 return newSPDYClient(version, new NPNClientConnectionFactory(getExecutor(), new SPDYClientConnectionFactory(), "spdy/" + version));
348 }
349
350 public SPDYClient newSPDYClient(short version, NegotiatingClientConnectionFactory negotiatingFactory)
351 {
352 SPDYClient client = new SPDYClient(version, this);
353
354 ClientConnectionFactory connectionFactory = negotiatingFactory.getClientConnectionFactory();
355 if (sslContextFactory != null)
356 connectionFactory = new SslClientConnectionFactory(getSslContextFactory(), getByteBufferPool(), getExecutor(), negotiatingFactory);
357
358 client.setClientConnectionFactory(connectionFactory);
359 return client;
360 }
361
362 @Override
363 protected void doStop() throws Exception
364 {
365 closeConnections();
366 super.doStop();
367 }
368
369 boolean sessionOpened(Session session)
370 {
371
372 return isRunning() && sessions.offer(session);
373 }
374
375 boolean sessionClosed(Session session)
376 {
377
378
379 return isRunning() && sessions.remove(session);
380 }
381
382 private void closeConnections()
383 {
384 for (Session session : sessions)
385 session.goAway(new GoAwayInfo(), Callback.Adapter.INSTANCE);
386 sessions.clear();
387 }
388
389 public Collection<Session> getSessions()
390 {
391 return Collections.unmodifiableCollection(sessions);
392 }
393
394 @Override
395 protected void dumpThis(Appendable out) throws IOException
396 {
397 super.dumpThis(out);
398 dump(out, "", sessions);
399 }
400
401 private class ClientSelectorManager extends SelectorManager
402 {
403 private ClientSelectorManager(Executor executor, Scheduler scheduler)
404 {
405 super(executor, scheduler);
406 }
407
408 @Override
409 protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
410 {
411 @SuppressWarnings("unchecked")
412 Map<String, Object> context = (Map<String, Object>)key.attachment();
413 SPDYClient client = (SPDYClient)context.get(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY);
414 long clientIdleTimeout = client.getIdleTimeout();
415 if (clientIdleTimeout < 0)
416 clientIdleTimeout = idleTimeout;
417 return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), clientIdleTimeout);
418 }
419
420 @Override
421 public Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
422 {
423 @SuppressWarnings("unchecked")
424 Map<String, Object> context = (Map<String, Object>)attachment;
425 try
426 {
427 SPDYClient client = (SPDYClient)context.get(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY);
428 return client.getClientConnectionFactory().newConnection(endPoint, context);
429 }
430 catch (Throwable x)
431 {
432 @SuppressWarnings("unchecked")
433 Promise<Session> promise = (Promise<Session>)context.get(SPDYClientConnectionFactory.SPDY_SESSION_PROMISE_CONTEXT_KEY);
434 promise.failed(x);
435 throw x;
436 }
437 }
438 }
439 }
440 }