1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.client;
20
21 import java.io.IOException;
22 import java.net.CookieStore;
23 import java.net.SocketAddress;
24 import java.net.URI;
25 import java.util.ArrayList;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Locale;
29 import java.util.Set;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.Future;
32
33 import org.eclipse.jetty.io.ByteBufferPool;
34 import org.eclipse.jetty.io.MappedByteBufferPool;
35 import org.eclipse.jetty.io.SelectorManager;
36 import org.eclipse.jetty.util.HttpCookieStore;
37 import org.eclipse.jetty.util.StringUtil;
38 import org.eclipse.jetty.util.component.ContainerLifeCycle;
39 import org.eclipse.jetty.util.log.Log;
40 import org.eclipse.jetty.util.log.Logger;
41 import org.eclipse.jetty.util.ssl.SslContextFactory;
42 import org.eclipse.jetty.util.thread.QueuedThreadPool;
43 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
44 import org.eclipse.jetty.util.thread.Scheduler;
45 import org.eclipse.jetty.util.thread.ShutdownThread;
46 import org.eclipse.jetty.websocket.api.Session;
47 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48 import org.eclipse.jetty.websocket.api.extensions.Extension;
49 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
50 import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
51 import org.eclipse.jetty.websocket.client.io.ConnectPromise;
52 import org.eclipse.jetty.websocket.client.io.ConnectionManager;
53 import org.eclipse.jetty.websocket.client.io.UpgradeListener;
54 import org.eclipse.jetty.websocket.client.masks.Masker;
55 import org.eclipse.jetty.websocket.client.masks.RandomMasker;
56 import org.eclipse.jetty.websocket.common.SessionFactory;
57 import org.eclipse.jetty.websocket.common.SessionListener;
58 import org.eclipse.jetty.websocket.common.WebSocketSession;
59 import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
60 import org.eclipse.jetty.websocket.common.events.EventDriver;
61 import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
62 import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
63
64
65
66
67 public class WebSocketClient extends ContainerLifeCycle implements SessionListener
68 {
69 private static final Logger LOG = Log.getLogger(WebSocketClient.class);
70
71 private final WebSocketPolicy policy;
72 private final SslContextFactory sslContextFactory;
73 private final WebSocketExtensionFactory extensionRegistry;
74 private boolean daemon = false;
75 private EventDriverFactory eventDriverFactory;
76 private SessionFactory sessionFactory;
77 private ByteBufferPool bufferPool;
78 private Executor executor;
79 private Scheduler scheduler;
80 private CookieStore cookieStore;
81 private ConnectionManager connectionManager;
82 private Masker masker;
83 private SocketAddress bindAddress;
84 private long connectTimeout = SelectorManager.DEFAULT_CONNECT_TIMEOUT;
85
86 public WebSocketClient()
87 {
88 this(null,null);
89 }
90
91 public WebSocketClient(Executor executor)
92 {
93 this(null,executor);
94 }
95
96 public WebSocketClient(ByteBufferPool bufferPool)
97 {
98 this(null,null,bufferPool);
99 }
100
101 public WebSocketClient(SslContextFactory sslContextFactory)
102 {
103 this(sslContextFactory,null);
104 }
105
106 public WebSocketClient(SslContextFactory sslContextFactory, Executor executor)
107 {
108 this(sslContextFactory,executor,new MappedByteBufferPool());
109 }
110
111 public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool)
112 {
113 this.executor = executor;
114 this.sslContextFactory = sslContextFactory;
115 this.policy = WebSocketPolicy.newClientPolicy();
116 this.bufferPool = bufferPool;
117 this.extensionRegistry = new WebSocketExtensionFactory(policy,bufferPool);
118
119
120 this.extensionRegistry.unregister("deflate-frame");
121 this.extensionRegistry.unregister("permessage-deflate");
122 this.extensionRegistry.unregister("x-webkit-deflate-frame");
123
124 this.masker = new RandomMasker();
125 this.eventDriverFactory = new EventDriverFactory(policy);
126 this.sessionFactory = new WebSocketSessionFactory(this);
127
128 addBean(this.executor);
129 addBean(this.sslContextFactory);
130 addBean(this.bufferPool);
131 }
132
133 public Future<Session> connect(Object websocket, URI toUri) throws IOException
134 {
135 ClientUpgradeRequest request = new ClientUpgradeRequest(toUri);
136 request.setRequestURI(toUri);
137 request.setCookiesFrom(this.cookieStore);
138
139 return connect(websocket,toUri,request);
140 }
141
142 public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request) throws IOException
143 {
144 return connect(websocket,toUri,request,null);
145 }
146
147 public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request, UpgradeListener upgradeListener) throws IOException
148 {
149 if (!isStarted())
150 {
151 throw new IllegalStateException(WebSocketClient.class.getSimpleName() + "@" + this.hashCode() + " is not started");
152 }
153
154
155 if (!toUri.isAbsolute())
156 {
157 throw new IllegalArgumentException("WebSocket URI must be absolute");
158 }
159
160 if (StringUtil.isBlank(toUri.getScheme()))
161 {
162 throw new IllegalArgumentException("WebSocket URI must include a scheme");
163 }
164
165 String scheme = toUri.getScheme().toLowerCase(Locale.ENGLISH);
166 if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
167 {
168 throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
169 }
170
171 request.setRequestURI(toUri);
172 request.setCookiesFrom(this.cookieStore);
173
174
175 for (ExtensionConfig reqExt : request.getExtensions())
176 {
177 if (!extensionRegistry.isAvailable(reqExt.getName()))
178 {
179 throw new IllegalArgumentException("Requested extension [" + reqExt.getName() + "] is not installed");
180 }
181 }
182
183
184 LOG.debug("connect websocket {} to {}",websocket,toUri);
185
186
187 initialiseClient();
188 ConnectionManager manager = getConnectionManager();
189
190
191 EventDriver driver = null;
192 if (websocket instanceof EventDriver)
193 {
194
195 driver = (EventDriver)websocket;
196 }
197 else
198 {
199
200 driver = eventDriverFactory.wrap(websocket);
201 }
202
203 if (driver == null)
204 {
205 throw new IllegalStateException("Unable to identify as websocket object: " + websocket.getClass().getName());
206 }
207
208
209 ConnectPromise promise = manager.connect(this,driver,request);
210
211 if (upgradeListener != null)
212 {
213 promise.setUpgradeListener(upgradeListener);
214 }
215
216 LOG.debug("Connect Promise: {}",promise);
217
218
219 executor.execute(promise);
220
221
222 return promise;
223 }
224
225 @Override
226 protected void doStart() throws Exception
227 {
228 LOG.debug("Starting {}",this);
229
230 if (sslContextFactory != null)
231 {
232 addBean(sslContextFactory);
233 }
234
235 String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
236
237 if (bufferPool == null)
238 {
239 bufferPool = new MappedByteBufferPool();
240 }
241 addBean(bufferPool);
242
243 if (scheduler == null)
244 {
245 scheduler = new ScheduledExecutorScheduler(name + "-scheduler",daemon);
246 }
247 addBean(scheduler);
248
249 if (cookieStore == null)
250 {
251 cookieStore = new HttpCookieStore.Empty();
252 }
253
254 super.doStart();
255
256 LOG.debug("Started {}",this);
257 }
258
259 @Override
260 protected void doStop() throws Exception
261 {
262 LOG.debug("Stopping {}",this);
263
264 if (cookieStore != null)
265 {
266 cookieStore.removeAll();
267 cookieStore = null;
268 }
269
270 super.doStop();
271 LOG.debug("Stopped {}",this);
272 }
273
274
275
276
277
278
279 public long getAsyncWriteTimeout()
280 {
281 return this.policy.getAsyncWriteTimeout();
282 }
283
284 public SocketAddress getBindAddress()
285 {
286 return bindAddress;
287 }
288
289 public ByteBufferPool getBufferPool()
290 {
291 return bufferPool;
292 }
293
294 public ConnectionManager getConnectionManager()
295 {
296 return connectionManager;
297 }
298
299 public long getConnectTimeout()
300 {
301 return connectTimeout;
302 }
303
304 public CookieStore getCookieStore()
305 {
306 return cookieStore;
307 }
308
309 public EventDriverFactory getEventDriverFactory()
310 {
311 return eventDriverFactory;
312 }
313
314 public Executor getExecutor()
315 {
316 return executor;
317 }
318
319 public ExtensionFactory getExtensionFactory()
320 {
321 return extensionRegistry;
322 }
323
324 public Masker getMasker()
325 {
326 return masker;
327 }
328
329
330
331
332
333
334 public int getMaxBinaryMessageBufferSize()
335 {
336 return this.policy.getMaxBinaryMessageBufferSize();
337 }
338
339
340
341
342
343
344 public long getMaxBinaryMessageSize()
345 {
346 return this.policy.getMaxBinaryMessageSize();
347 }
348
349
350
351
352
353
354 public long getMaxIdleTimeout()
355 {
356 return this.policy.getIdleTimeout();
357 }
358
359
360
361
362
363
364 public int getMaxTextMessageBufferSize()
365 {
366 return this.policy.getMaxTextMessageBufferSize();
367 }
368
369
370
371
372
373
374 public long getMaxTextMessageSize()
375 {
376 return this.policy.getMaxTextMessageSize();
377 }
378
379 public Set<WebSocketSession> getOpenSessions()
380 {
381 return new HashSet<>(getBeans(WebSocketSession.class));
382 }
383
384 public WebSocketPolicy getPolicy()
385 {
386 return this.policy;
387 }
388
389 public Scheduler getScheduler()
390 {
391 return scheduler;
392 }
393
394 public SessionFactory getSessionFactory()
395 {
396 return sessionFactory;
397 }
398
399
400
401
402
403 public SslContextFactory getSslContextFactory()
404 {
405 return sslContextFactory;
406 }
407
408 public List<Extension> initExtensions(List<ExtensionConfig> requested)
409 {
410 List<Extension> extensions = new ArrayList<Extension>();
411
412 for (ExtensionConfig cfg : requested)
413 {
414 Extension extension = extensionRegistry.newInstance(cfg);
415
416 if (extension == null)
417 {
418 continue;
419 }
420
421 LOG.debug("added {}",extension);
422 extensions.add(extension);
423 }
424 LOG.debug("extensions={}",extensions);
425 return extensions;
426 }
427
428 private synchronized void initialiseClient() throws IOException
429 {
430 ShutdownThread.register(this);
431
432 if (executor == null)
433 {
434 QueuedThreadPool threadPool = new QueuedThreadPool();
435 String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
436 threadPool.setName(name);
437 threadPool.setDaemon(daemon);
438 executor = threadPool;
439 addBean(executor,true);
440 }
441 else
442 {
443 addBean(executor,false);
444 }
445
446 if (connectionManager != null)
447 {
448 return;
449 }
450 try
451 {
452 connectionManager = newConnectionManager();
453 addBean(connectionManager);
454 connectionManager.start();
455 }
456 catch (IOException e)
457 {
458 throw e;
459 }
460 catch (Exception e)
461 {
462 throw new IOException(e);
463 }
464 }
465
466
467
468
469
470
471 protected ConnectionManager newConnectionManager()
472 {
473 return new ConnectionManager(this);
474 }
475
476 @Override
477 public void onSessionClosed(WebSocketSession session)
478 {
479 LOG.debug("Session Closed: {}",session);
480 removeBean(session);
481 }
482
483 @Override
484 public void onSessionOpened(WebSocketSession session)
485 {
486 LOG.debug("Session Opened: {}",session);
487 }
488
489 public void setAsyncWriteTimeout(long ms)
490 {
491 this.policy.setAsyncWriteTimeout(ms);
492 }
493
494 public void setBindAdddress(SocketAddress bindAddress)
495 {
496 this.bindAddress = bindAddress;
497 }
498
499 public void setBufferPool(ByteBufferPool bufferPool)
500 {
501 this.bufferPool = bufferPool;
502 }
503
504
505
506
507
508
509
510 public void setConnectTimeout(long ms)
511 {
512 if (ms < 0)
513 {
514 throw new IllegalStateException("Connect Timeout cannot be negative");
515 }
516 this.connectTimeout = ms;
517 }
518
519 public void setCookieStore(CookieStore cookieStore)
520 {
521 this.cookieStore = cookieStore;
522 }
523
524 public void setDaemon(boolean daemon)
525 {
526 this.daemon = daemon;
527 }
528
529 public void setEventDriverFactory(EventDriverFactory factory)
530 {
531 this.eventDriverFactory = factory;
532 }
533
534 public void setExecutor(Executor executor)
535 {
536 updateBean(this.executor,executor);
537 this.executor = executor;
538 }
539
540 public void setMasker(Masker masker)
541 {
542 this.masker = masker;
543 }
544
545 public void setMaxBinaryMessageBufferSize(int max)
546 {
547 this.policy.setMaxBinaryMessageBufferSize(max);
548 }
549
550
551
552
553
554
555
556
557
558 public void setMaxIdleTimeout(long ms)
559 {
560 this.policy.setIdleTimeout(ms);
561 }
562
563 public void setMaxTextMessageBufferSize(int max)
564 {
565 this.policy.setMaxTextMessageBufferSize(max);
566 }
567
568 public void setSessionFactory(SessionFactory sessionFactory)
569 {
570 this.sessionFactory = sessionFactory;
571 }
572
573 @Override
574 public void dump(Appendable out, String indent) throws IOException
575 {
576 dumpThis(out);
577 dump(out, indent, getOpenSessions());
578 }
579 }