1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.client;
20
21 import java.io.IOException;
22 import java.net.CookieStore;
23 import java.net.SocketAddress;
24 import java.net.URI;
25 import java.util.Collections;
26 import java.util.HashSet;
27 import java.util.Locale;
28 import java.util.Set;
29 import java.util.concurrent.Executor;
30 import java.util.concurrent.Future;
31
32 import org.eclipse.jetty.io.ByteBufferPool;
33 import org.eclipse.jetty.io.MappedByteBufferPool;
34 import org.eclipse.jetty.io.SelectorManager;
35 import org.eclipse.jetty.util.DecoratedObjectFactory;
36 import org.eclipse.jetty.util.HttpCookieStore;
37 import org.eclipse.jetty.util.StringUtil;
38 import org.eclipse.jetty.util.component.ContainerLifeCycle;
39 import org.eclipse.jetty.util.log.Log;
40 import org.eclipse.jetty.util.log.Logger;
41 import org.eclipse.jetty.util.ssl.SslContextFactory;
42 import org.eclipse.jetty.util.thread.QueuedThreadPool;
43 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
44 import org.eclipse.jetty.util.thread.Scheduler;
45 import org.eclipse.jetty.util.thread.ShutdownThread;
46 import org.eclipse.jetty.websocket.api.Session;
47 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
49 import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
50 import org.eclipse.jetty.websocket.client.io.ConnectPromise;
51 import org.eclipse.jetty.websocket.client.io.ConnectionManager;
52 import org.eclipse.jetty.websocket.client.io.UpgradeListener;
53 import org.eclipse.jetty.websocket.client.masks.Masker;
54 import org.eclipse.jetty.websocket.client.masks.RandomMasker;
55 import org.eclipse.jetty.websocket.common.SessionFactory;
56 import org.eclipse.jetty.websocket.common.WebSocketSession;
57 import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
58 import org.eclipse.jetty.websocket.common.events.EventDriver;
59 import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
60 import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
61 import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
62
63
64
65
66 public class WebSocketClient extends ContainerLifeCycle implements WebSocketContainerScope
67 {
68 private static final Logger LOG = Log.getLogger(WebSocketClient.class);
69
70 private final WebSocketPolicy policy = WebSocketPolicy.newClientPolicy();
71 private final SslContextFactory sslContextFactory;
72 private final WebSocketExtensionFactory extensionRegistry;
73 private boolean daemon = false;
74 private EventDriverFactory eventDriverFactory;
75 private SessionFactory sessionFactory;
76 private ByteBufferPool bufferPool;
77 private Executor executor;
78 private DecoratedObjectFactory objectFactory;
79 private Scheduler scheduler;
80 private CookieStore cookieStore;
81 private ConnectionManager connectionManager;
82 private Masker masker;
83 private SocketAddress bindAddress;
84 private long connectTimeout = SelectorManager.DEFAULT_CONNECT_TIMEOUT;
85 private boolean dispatchIO = true;
86
87 public WebSocketClient()
88 {
89 this((SslContextFactory)null,null);
90 }
91
92 public WebSocketClient(Executor executor)
93 {
94 this(null,executor);
95 }
96
97 public WebSocketClient(ByteBufferPool bufferPool)
98 {
99 this(null,null,bufferPool);
100 }
101
102 public WebSocketClient(SslContextFactory sslContextFactory)
103 {
104 this(sslContextFactory,null);
105 }
106
107 public WebSocketClient(SslContextFactory sslContextFactory, Executor executor)
108 {
109 this(sslContextFactory,executor,new MappedByteBufferPool());
110 }
111
112 public WebSocketClient(WebSocketContainerScope scope)
113 {
114 this(scope.getSslContextFactory(), scope.getExecutor(), scope.getBufferPool(), scope.getObjectFactory());
115 }
116
117 public WebSocketClient(WebSocketContainerScope scope, SslContextFactory sslContextFactory)
118 {
119 this(sslContextFactory, scope.getExecutor(), scope.getBufferPool(), scope.getObjectFactory());
120 }
121
122 public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool)
123 {
124 this(sslContextFactory, executor, bufferPool, new DecoratedObjectFactory());
125 }
126
127 public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool, DecoratedObjectFactory objectFactory)
128 {
129 this.executor = executor;
130 this.sslContextFactory = sslContextFactory;
131 this.bufferPool = bufferPool;
132 this.objectFactory = objectFactory;
133 this.extensionRegistry = new WebSocketExtensionFactory(this);
134
135 this.masker = new RandomMasker();
136 this.eventDriverFactory = new EventDriverFactory(policy);
137
138 addBean(this.executor);
139 addBean(this.sslContextFactory);
140 addBean(this.bufferPool);
141 }
142
143 public Future<Session> connect(Object websocket, URI toUri) throws IOException
144 {
145 ClientUpgradeRequest request = new ClientUpgradeRequest(toUri);
146 request.setRequestURI(toUri);
147 request.setCookiesFrom(this.cookieStore);
148
149 return connect(websocket,toUri,request);
150 }
151
152 public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request) throws IOException
153 {
154 return connect(websocket,toUri,request,null);
155 }
156
157 public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request, UpgradeListener upgradeListener) throws IOException
158 {
159 if (!isStarted())
160 {
161 throw new IllegalStateException(WebSocketClient.class.getSimpleName() + "@" + this.hashCode() + " is not started");
162 }
163
164
165 if (!toUri.isAbsolute())
166 {
167 throw new IllegalArgumentException("WebSocket URI must be absolute");
168 }
169
170 if (StringUtil.isBlank(toUri.getScheme()))
171 {
172 throw new IllegalArgumentException("WebSocket URI must include a scheme");
173 }
174
175 String scheme = toUri.getScheme().toLowerCase(Locale.ENGLISH);
176 if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
177 {
178 throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
179 }
180
181 request.setRequestURI(toUri);
182 request.setCookiesFrom(this.cookieStore);
183
184
185 for (ExtensionConfig reqExt : request.getExtensions())
186 {
187 if (!extensionRegistry.isAvailable(reqExt.getName()))
188 {
189 throw new IllegalArgumentException("Requested extension [" + reqExt.getName() + "] is not installed");
190 }
191 }
192
193 if (LOG.isDebugEnabled())
194 LOG.debug("connect websocket {} to {}",websocket,toUri);
195
196
197 initializeClient();
198 ConnectionManager manager = getConnectionManager();
199
200
201 EventDriver driver = null;
202 if (websocket instanceof EventDriver)
203 {
204
205 driver = (EventDriver)websocket;
206 }
207 else
208 {
209
210 driver = eventDriverFactory.wrap(websocket);
211 }
212
213 if (driver == null)
214 {
215 throw new IllegalStateException("Unable to identify as websocket object: " + websocket.getClass().getName());
216 }
217
218
219 ConnectPromise promise = manager.connect(this,driver,request);
220
221 if (upgradeListener != null)
222 {
223 promise.setUpgradeListener(upgradeListener);
224 }
225
226 if (LOG.isDebugEnabled())
227 LOG.debug("Connect Promise: {}",promise);
228
229
230 executor.execute(promise);
231
232
233 return promise;
234 }
235
236 @Override
237 protected void doStart() throws Exception
238 {
239 if (LOG.isDebugEnabled())
240 LOG.debug("Starting {}",this);
241
242 if (sslContextFactory != null)
243 {
244 addBean(sslContextFactory);
245 }
246
247 String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
248
249 if (bufferPool == null)
250 {
251 bufferPool = new MappedByteBufferPool();
252 }
253 addBean(bufferPool);
254
255 if (scheduler == null)
256 {
257 scheduler = new ScheduledExecutorScheduler(name + "-scheduler",daemon);
258 }
259 addBean(scheduler);
260
261 if (cookieStore == null)
262 {
263 cookieStore = new HttpCookieStore.Empty();
264 }
265
266 if(this.sessionFactory == null)
267 {
268 this.sessionFactory = new WebSocketSessionFactory(this);
269 }
270
271 if(this.objectFactory == null)
272 {
273 this.objectFactory = new DecoratedObjectFactory();
274 }
275
276 super.doStart();
277
278 if (LOG.isDebugEnabled())
279 LOG.debug("Started {}",this);
280 }
281
282 @Override
283 protected void doStop() throws Exception
284 {
285 if (LOG.isDebugEnabled())
286 LOG.debug("Stopping {}",this);
287
288 if (ShutdownThread.isRegistered(this))
289 {
290 ShutdownThread.deregister(this);
291 }
292
293 if (cookieStore != null)
294 {
295 cookieStore.removeAll();
296 cookieStore = null;
297 }
298
299 super.doStop();
300
301 if (LOG.isDebugEnabled())
302 LOG.debug("Stopped {}",this);
303 }
304
305 public boolean isDispatchIO()
306 {
307 return dispatchIO;
308 }
309
310
311
312
313
314
315 public long getAsyncWriteTimeout()
316 {
317 return this.policy.getAsyncWriteTimeout();
318 }
319
320 public SocketAddress getBindAddress()
321 {
322 return bindAddress;
323 }
324
325 public ByteBufferPool getBufferPool()
326 {
327 return bufferPool;
328 }
329
330 public ConnectionManager getConnectionManager()
331 {
332 return connectionManager;
333 }
334
335 public long getConnectTimeout()
336 {
337 return connectTimeout;
338 }
339
340 public CookieStore getCookieStore()
341 {
342 return cookieStore;
343 }
344
345 public EventDriverFactory getEventDriverFactory()
346 {
347 return eventDriverFactory;
348 }
349
350 public Executor getExecutor()
351 {
352 return executor;
353 }
354
355 public ExtensionFactory getExtensionFactory()
356 {
357 return extensionRegistry;
358 }
359
360 public Masker getMasker()
361 {
362 return masker;
363 }
364
365
366
367
368
369
370 public int getMaxBinaryMessageBufferSize()
371 {
372 return this.policy.getMaxBinaryMessageBufferSize();
373 }
374
375
376
377
378
379
380 public long getMaxBinaryMessageSize()
381 {
382 return this.policy.getMaxBinaryMessageSize();
383 }
384
385
386
387
388
389
390 public long getMaxIdleTimeout()
391 {
392 return this.policy.getIdleTimeout();
393 }
394
395
396
397
398
399
400 public int getMaxTextMessageBufferSize()
401 {
402 return this.policy.getMaxTextMessageBufferSize();
403 }
404
405
406
407
408
409
410 public long getMaxTextMessageSize()
411 {
412 return this.policy.getMaxTextMessageSize();
413 }
414
415 @Override
416 public DecoratedObjectFactory getObjectFactory()
417 {
418 return this.objectFactory;
419 }
420
421 public Set<WebSocketSession> getOpenSessions()
422 {
423 return Collections.unmodifiableSet(new HashSet<>(getBeans(WebSocketSession.class)));
424 }
425
426 public WebSocketPolicy getPolicy()
427 {
428 return this.policy;
429 }
430
431 public Scheduler getScheduler()
432 {
433 return scheduler;
434 }
435
436 public SessionFactory getSessionFactory()
437 {
438 return sessionFactory;
439 }
440
441
442
443
444
445 public SslContextFactory getSslContextFactory()
446 {
447 return sslContextFactory;
448 }
449
450 private synchronized void initializeClient() throws IOException
451 {
452 if (!ShutdownThread.isRegistered(this))
453 {
454 ShutdownThread.register(this);
455 }
456
457 if (executor == null)
458 {
459 QueuedThreadPool threadPool = new QueuedThreadPool();
460 String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
461 threadPool.setName(name);
462 threadPool.setDaemon(daemon);
463 executor = threadPool;
464 addManaged(threadPool);
465 }
466 else
467 {
468 addBean(executor,false);
469 }
470
471 if (connectionManager == null)
472 {
473 connectionManager = newConnectionManager();
474 addManaged(connectionManager);
475 }
476 }
477
478
479
480
481
482
483 protected ConnectionManager newConnectionManager()
484 {
485 return new ConnectionManager(this);
486 }
487
488 @Override
489 public void onSessionClosed(WebSocketSession session)
490 {
491 if (LOG.isDebugEnabled())
492 LOG.debug("Session Closed: {}",session);
493 removeBean(session);
494 }
495
496 @Override
497 public void onSessionOpened(WebSocketSession session)
498 {
499 if (LOG.isDebugEnabled())
500 LOG.debug("Session Opened: {}",session);
501 addManaged(session);
502 }
503
504 public void setAsyncWriteTimeout(long ms)
505 {
506 this.policy.setAsyncWriteTimeout(ms);
507 }
508
509
510
511
512
513 @Deprecated
514 public void setBindAdddress(SocketAddress bindAddress)
515 {
516 setBindAddress(bindAddress);
517 }
518
519 public void setBindAddress(SocketAddress bindAddress)
520 {
521 this.bindAddress = bindAddress;
522 }
523
524 public void setBufferPool(ByteBufferPool bufferPool)
525 {
526 this.bufferPool = bufferPool;
527 }
528
529
530
531
532
533
534
535 public void setConnectTimeout(long ms)
536 {
537 if (ms < 0)
538 {
539 throw new IllegalStateException("Connect Timeout cannot be negative");
540 }
541 this.connectTimeout = ms;
542 }
543
544 public void setCookieStore(CookieStore cookieStore)
545 {
546 this.cookieStore = cookieStore;
547 }
548
549 public void setDaemon(boolean daemon)
550 {
551 this.daemon = daemon;
552 }
553
554 public void setDispatchIO(boolean dispatchIO)
555 {
556 this.dispatchIO = dispatchIO;
557 }
558
559 public void setEventDriverFactory(EventDriverFactory factory)
560 {
561 this.eventDriverFactory = factory;
562 }
563
564 public void setExecutor(Executor executor)
565 {
566 updateBean(this.executor,executor);
567 this.executor = executor;
568 }
569
570 public void setMasker(Masker masker)
571 {
572 this.masker = masker;
573 }
574
575 public void setMaxBinaryMessageBufferSize(int max)
576 {
577 this.policy.setMaxBinaryMessageBufferSize(max);
578 }
579
580
581
582
583
584
585
586
587
588 public void setMaxIdleTimeout(long ms)
589 {
590 this.policy.setIdleTimeout(ms);
591 }
592
593 public void setMaxTextMessageBufferSize(int max)
594 {
595 this.policy.setMaxTextMessageBufferSize(max);
596 }
597
598 public void setSessionFactory(SessionFactory sessionFactory)
599 {
600 this.sessionFactory = sessionFactory;
601 }
602
603 @Override
604 public void dump(Appendable out, String indent) throws IOException
605 {
606 dumpThis(out);
607 dump(out, indent, getOpenSessions());
608 }
609 }