1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.spdy;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.SocketChannel;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Queue;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentLinkedQueue;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.RejectedExecutionException;
38 import java.util.concurrent.ScheduledExecutorService;
39 import javax.net.ssl.SSLEngine;
40 import javax.net.ssl.SSLException;
41
42 import org.eclipse.jetty.io.AsyncEndPoint;
43 import org.eclipse.jetty.io.ConnectedEndPoint;
44 import org.eclipse.jetty.io.Connection;
45 import org.eclipse.jetty.io.nio.AsyncConnection;
46 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
47 import org.eclipse.jetty.io.nio.SelectorManager;
48 import org.eclipse.jetty.io.nio.SslConnection;
49 import org.eclipse.jetty.npn.NextProtoNego;
50 import org.eclipse.jetty.spdy.api.Session;
51 import org.eclipse.jetty.spdy.api.SessionFrameListener;
52 import org.eclipse.jetty.spdy.generator.Generator;
53 import org.eclipse.jetty.spdy.parser.Parser;
54 import org.eclipse.jetty.util.component.AggregateLifeCycle;
55 import org.eclipse.jetty.util.ssl.SslContextFactory;
56 import org.eclipse.jetty.util.thread.QueuedThreadPool;
57
58 public class SPDYClient
59 {
60 private final Map<String, AsyncConnectionFactory> factories = new ConcurrentHashMap<>();
61 private final short version;
62 private final Factory factory;
63 private SocketAddress bindAddress;
64 private long maxIdleTime = -1;
65 private volatile int initialWindowSize = 65536;
66
67 protected SPDYClient(short version, Factory factory)
68 {
69 this.version = version;
70 this.factory = factory;
71 }
72
73
74
75
76
77 public SocketAddress getBindAddress()
78 {
79 return bindAddress;
80 }
81
82
83
84
85
86 public void setBindAddress(SocketAddress bindAddress)
87 {
88 this.bindAddress = bindAddress;
89 }
90
91 public Future<Session> connect(InetSocketAddress address, SessionFrameListener listener) throws IOException
92 {
93 if (!factory.isStarted())
94 throw new IllegalStateException(Factory.class.getSimpleName() + " is not started");
95
96 SocketChannel channel = SocketChannel.open();
97 if (bindAddress != null)
98 channel.bind(bindAddress);
99 channel.socket().setTcpNoDelay(true);
100 channel.configureBlocking(false);
101
102 SessionPromise result = new SessionPromise(channel, this, listener);
103
104 channel.connect(address);
105 factory.selector.register(channel, result);
106
107 return result;
108 }
109
110 public long getMaxIdleTime()
111 {
112 return maxIdleTime;
113 }
114
115 public void setMaxIdleTime(long maxIdleTime)
116 {
117 this.maxIdleTime = maxIdleTime;
118 }
119
120 public int getInitialWindowSize()
121 {
122 return initialWindowSize;
123 }
124
125 public void setInitialWindowSize(int initialWindowSize)
126 {
127 this.initialWindowSize = initialWindowSize;
128 }
129
130 protected String selectProtocol(List<String> serverProtocols)
131 {
132 if (serverProtocols == null)
133 return "spdy/2";
134
135 for (String serverProtocol : serverProtocols)
136 {
137 for (String protocol : factories.keySet())
138 {
139 if (serverProtocol.equals(protocol))
140 return protocol;
141 }
142 String protocol = factory.selectProtocol(serverProtocols);
143 if (protocol != null)
144 return protocol;
145 }
146
147 return null;
148 }
149
150 public AsyncConnectionFactory getAsyncConnectionFactory(String protocol)
151 {
152 for (Map.Entry<String, AsyncConnectionFactory> entry : factories.entrySet())
153 {
154 if (protocol.equals(entry.getKey()))
155 return entry.getValue();
156 }
157 for (Map.Entry<String, AsyncConnectionFactory> entry : factory.factories.entrySet())
158 {
159 if (protocol.equals(entry.getKey()))
160 return entry.getValue();
161 }
162 return null;
163 }
164
165 public void putAsyncConnectionFactory(String protocol, AsyncConnectionFactory factory)
166 {
167 factories.put(protocol, factory);
168 }
169
170 public AsyncConnectionFactory removeAsyncConnectionFactory(String protocol)
171 {
172 return factories.remove(protocol);
173 }
174
175 protected SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SocketChannel channel)
176 {
177 String peerHost = channel.socket().getInetAddress().getHostAddress();
178 int peerPort = channel.socket().getPort();
179 SSLEngine engine = sslContextFactory.newSslEngine(peerHost, peerPort);
180 engine.setUseClientMode(true);
181 return engine;
182 }
183
184 protected FlowControlStrategy newFlowControlStrategy()
185 {
186 return FlowControlStrategyFactory.newFlowControlStrategy(version);
187 }
188
189 public static class Factory extends AggregateLifeCycle
190 {
191 private final Map<String, AsyncConnectionFactory> factories = new ConcurrentHashMap<>();
192 private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
193 private final ByteBufferPool bufferPool = new StandardByteBufferPool();
194 private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
195 private final Executor threadPool;
196 private final SslContextFactory sslContextFactory;
197 private final SelectorManager selector;
198
199 public Factory()
200 {
201 this(null, null);
202 }
203
204 public Factory(SslContextFactory sslContextFactory)
205 {
206 this(null, sslContextFactory);
207 }
208
209 public Factory(Executor threadPool)
210 {
211 this(threadPool, null);
212 }
213
214 public Factory(Executor threadPool, SslContextFactory sslContextFactory)
215 {
216 if (threadPool == null)
217 threadPool = new QueuedThreadPool();
218 this.threadPool = threadPool;
219 addBean(threadPool);
220
221 this.sslContextFactory = sslContextFactory;
222 if (sslContextFactory != null)
223 addBean(sslContextFactory);
224
225 selector = new ClientSelectorManager();
226 addBean(selector);
227
228 factories.put("spdy/2", new ClientSPDYAsyncConnectionFactory());
229 }
230
231 public SPDYClient newSPDYClient(short version)
232 {
233 return new SPDYClient(version, this);
234 }
235
236 @Override
237 protected void doStop() throws Exception
238 {
239 closeConnections();
240 super.doStop();
241 }
242
243 protected String selectProtocol(List<String> serverProtocols)
244 {
245 for (String serverProtocol : serverProtocols)
246 {
247 for (String protocol : factories.keySet())
248 {
249 if (serverProtocol.equals(protocol))
250 return protocol;
251 }
252 }
253 return null;
254 }
255
256 private boolean sessionOpened(Session session)
257 {
258
259 return isRunning() && sessions.offer(session);
260 }
261
262 private boolean sessionClosed(Session session)
263 {
264
265
266 return isRunning() && sessions.remove(session);
267 }
268
269 private void closeConnections()
270 {
271 for (Session session : sessions)
272 session.goAway();
273 sessions.clear();
274 }
275
276 protected Collection<Session> getSessions()
277 {
278 return Collections.unmodifiableCollection(sessions);
279 }
280
281 private class ClientSelectorManager extends SelectorManager
282 {
283 @Override
284 public boolean dispatch(Runnable task)
285 {
286 try
287 {
288 threadPool.execute(task);
289 return true;
290 }
291 catch (RejectedExecutionException x)
292 {
293 return false;
294 }
295 }
296
297 @Override
298 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
299 {
300 SessionPromise attachment = (SessionPromise)key.attachment();
301
302 long maxIdleTime = attachment.client.getMaxIdleTime();
303 if (maxIdleTime < 0)
304 maxIdleTime = getMaxIdleTime();
305 SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, (int)maxIdleTime);
306
307 AsyncConnection connection = newConnection(channel, result, attachment);
308 result.setConnection(connection);
309
310 return result;
311 }
312
313 @Override
314 protected void endPointOpened(SelectChannelEndPoint endpoint)
315 {
316 }
317
318 @Override
319 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
320 {
321 }
322
323 @Override
324 protected void endPointClosed(SelectChannelEndPoint endpoint)
325 {
326 endpoint.getConnection().onClose();
327 }
328
329 @Override
330 public AsyncConnection newConnection(final SocketChannel channel, AsyncEndPoint endPoint, final Object attachment)
331 {
332 SessionPromise sessionPromise = (SessionPromise)attachment;
333 final SPDYClient client = sessionPromise.client;
334
335 try
336 {
337 if (sslContextFactory != null)
338 {
339 final SSLEngine engine = client.newSSLEngine(sslContextFactory, channel);
340 SslConnection sslConnection = new SslConnection(engine, endPoint)
341 {
342 @Override
343 public void onClose()
344 {
345 NextProtoNego.remove(engine);
346 super.onClose();
347 }
348 };
349 endPoint.setConnection(sslConnection);
350 final AsyncEndPoint sslEndPoint = sslConnection.getSslEndPoint();
351 NextProtoNego.put(engine, new NextProtoNego.ClientProvider()
352 {
353 @Override
354 public boolean supports()
355 {
356 return true;
357 }
358
359 @Override
360 public void unsupported()
361 {
362
363 ClientSPDYAsyncConnectionFactory connectionFactory = new ClientSPDYAsyncConnectionFactory();
364 AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, attachment);
365 sslEndPoint.setConnection(connection);
366 }
367
368 @Override
369 public String selectProtocol(List<String> protocols)
370 {
371 String protocol = client.selectProtocol(protocols);
372 if (protocol == null)
373 return null;
374
375 AsyncConnectionFactory connectionFactory = client.getAsyncConnectionFactory(protocol);
376 AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, attachment);
377 sslEndPoint.setConnection(connection);
378 return protocol;
379 }
380 });
381
382 AsyncConnection connection = new EmptyAsyncConnection(sslEndPoint);
383 sslEndPoint.setConnection(connection);
384
385 startHandshake(engine);
386
387 return sslConnection;
388 }
389 else
390 {
391 AsyncConnectionFactory connectionFactory = new ClientSPDYAsyncConnectionFactory();
392 AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, attachment);
393 endPoint.setConnection(connection);
394 return connection;
395 }
396 }
397 catch (RuntimeException x)
398 {
399 sessionPromise.failed(null,x);
400 throw x;
401 }
402 }
403
404 private void startHandshake(SSLEngine engine)
405 {
406 try
407 {
408 engine.beginHandshake();
409 }
410 catch (SSLException x)
411 {
412 throw new RuntimeException(x);
413 }
414 }
415 }
416 }
417
418 private static class SessionPromise extends Promise<Session>
419 {
420 private final SocketChannel channel;
421 private final SPDYClient client;
422 private final SessionFrameListener listener;
423
424 private SessionPromise(SocketChannel channel, SPDYClient client, SessionFrameListener listener)
425 {
426 this.channel = channel;
427 this.client = client;
428 this.listener = listener;
429 }
430
431 @Override
432 public boolean cancel(boolean mayInterruptIfRunning)
433 {
434 try
435 {
436 super.cancel(mayInterruptIfRunning);
437 channel.close();
438 return true;
439 }
440 catch (IOException x)
441 {
442 return true;
443 }
444 }
445 }
446
447 private static class ClientSPDYAsyncConnectionFactory implements AsyncConnectionFactory
448 {
449 @Override
450 public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
451 {
452 SessionPromise sessionPromise = (SessionPromise)attachment;
453 SPDYClient client = sessionPromise.client;
454 Factory factory = client.factory;
455
456 CompressionFactory compressionFactory = new StandardCompressionFactory();
457 Parser parser = new Parser(compressionFactory.newDecompressor());
458 Generator generator = new Generator(factory.bufferPool, compressionFactory.newCompressor());
459
460 SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, factory.bufferPool, parser, factory);
461 endPoint.setConnection(connection);
462
463 FlowControlStrategy flowControlStrategy = client.newFlowControlStrategy();
464
465 StandardSession session = new StandardSession(client.version, factory.bufferPool, factory.threadPool, factory.scheduler, connection, connection, 1, sessionPromise.listener, generator, flowControlStrategy);
466 session.setWindowSize(client.getInitialWindowSize());
467 parser.addListener(session);
468 sessionPromise.completed(session);
469 connection.setSession(session);
470
471 factory.sessionOpened(session);
472
473 return connection;
474 }
475
476 private class ClientSPDYAsyncConnection extends SPDYAsyncConnection
477 {
478 private final Factory factory;
479
480 public ClientSPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Factory factory)
481 {
482 super(endPoint, bufferPool, parser);
483 this.factory = factory;
484 }
485
486 @Override
487 public void onClose()
488 {
489 super.onClose();
490 factory.sessionClosed(getSession());
491 }
492 }
493 }
494 }