1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.client;
20
21 import java.io.IOException;
22 import java.net.CookieStore;
23 import java.net.SocketAddress;
24 import java.net.URI;
25 import java.util.Collections;
26 import java.util.HashSet;
27 import java.util.Locale;
28 import java.util.Set;
29 import java.util.concurrent.Executor;
30 import java.util.concurrent.Future;
31
32 import org.eclipse.jetty.io.ByteBufferPool;
33 import org.eclipse.jetty.io.MappedByteBufferPool;
34 import org.eclipse.jetty.io.SelectorManager;
35 import org.eclipse.jetty.util.DecoratedObjectFactory;
36 import org.eclipse.jetty.util.HttpCookieStore;
37 import org.eclipse.jetty.util.StringUtil;
38 import org.eclipse.jetty.util.component.ContainerLifeCycle;
39 import org.eclipse.jetty.util.log.Log;
40 import org.eclipse.jetty.util.log.Logger;
41 import org.eclipse.jetty.util.ssl.SslContextFactory;
42 import org.eclipse.jetty.util.thread.QueuedThreadPool;
43 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
44 import org.eclipse.jetty.util.thread.Scheduler;
45 import org.eclipse.jetty.util.thread.ShutdownThread;
46 import org.eclipse.jetty.websocket.api.Session;
47 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
49 import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
50 import org.eclipse.jetty.websocket.client.io.ConnectPromise;
51 import org.eclipse.jetty.websocket.client.io.ConnectionManager;
52 import org.eclipse.jetty.websocket.client.io.UpgradeListener;
53 import org.eclipse.jetty.websocket.client.masks.Masker;
54 import org.eclipse.jetty.websocket.client.masks.RandomMasker;
55 import org.eclipse.jetty.websocket.common.SessionFactory;
56 import org.eclipse.jetty.websocket.common.SessionListener;
57 import org.eclipse.jetty.websocket.common.WebSocketSession;
58 import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
59 import org.eclipse.jetty.websocket.common.events.EventDriver;
60 import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
61 import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
62 import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
63
64
65
66
67 public class WebSocketClient extends ContainerLifeCycle implements SessionListener, WebSocketContainerScope
68 {
69 private static final Logger LOG = Log.getLogger(WebSocketClient.class);
70
71 private final WebSocketPolicy policy = WebSocketPolicy.newClientPolicy();
72 private final SslContextFactory sslContextFactory;
73 private final WebSocketExtensionFactory extensionRegistry;
74 private boolean daemon = false;
75 private EventDriverFactory eventDriverFactory;
76 private SessionFactory sessionFactory;
77 private ByteBufferPool bufferPool;
78 private Executor executor;
79 private DecoratedObjectFactory objectFactory;
80 private Scheduler scheduler;
81 private CookieStore cookieStore;
82 private ConnectionManager connectionManager;
83 private Masker masker;
84 private SocketAddress bindAddress;
85 private long connectTimeout = SelectorManager.DEFAULT_CONNECT_TIMEOUT;
86 private boolean dispatchIO = true;
87
88 public WebSocketClient()
89 {
90 this((SslContextFactory)null,null);
91 }
92
93 public WebSocketClient(Executor executor)
94 {
95 this(null,executor);
96 }
97
98 public WebSocketClient(ByteBufferPool bufferPool)
99 {
100 this(null,null,bufferPool);
101 }
102
103 public WebSocketClient(SslContextFactory sslContextFactory)
104 {
105 this(sslContextFactory,null);
106 }
107
108 public WebSocketClient(SslContextFactory sslContextFactory, Executor executor)
109 {
110 this(sslContextFactory,executor,new MappedByteBufferPool());
111 }
112
113 public WebSocketClient(WebSocketContainerScope scope)
114 {
115 this(scope.getSslContextFactory(), scope.getExecutor(), scope.getBufferPool(), scope.getObjectFactory());
116 }
117
118 public WebSocketClient(WebSocketContainerScope scope, SslContextFactory sslContextFactory)
119 {
120 this(sslContextFactory, scope.getExecutor(), scope.getBufferPool(), scope.getObjectFactory());
121 }
122
123 public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool)
124 {
125 this(sslContextFactory, executor, bufferPool, new DecoratedObjectFactory());
126 }
127
128 public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool, DecoratedObjectFactory objectFactory)
129 {
130 this.executor = executor;
131 this.sslContextFactory = sslContextFactory;
132 this.bufferPool = bufferPool;
133 this.objectFactory = objectFactory;
134 this.extensionRegistry = new WebSocketExtensionFactory(this);
135
136 this.masker = new RandomMasker();
137 this.eventDriverFactory = new EventDriverFactory(policy);
138
139 addBean(this.executor);
140 addBean(this.sslContextFactory);
141 addBean(this.bufferPool);
142 }
143
144 public Future<Session> connect(Object websocket, URI toUri) throws IOException
145 {
146 ClientUpgradeRequest request = new ClientUpgradeRequest(toUri);
147 request.setRequestURI(toUri);
148 request.setCookiesFrom(this.cookieStore);
149
150 return connect(websocket,toUri,request);
151 }
152
153 public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request) throws IOException
154 {
155 return connect(websocket,toUri,request,null);
156 }
157
158 public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request, UpgradeListener upgradeListener) throws IOException
159 {
160 if (!isStarted())
161 {
162 throw new IllegalStateException(WebSocketClient.class.getSimpleName() + "@" + this.hashCode() + " is not started");
163 }
164
165
166 if (!toUri.isAbsolute())
167 {
168 throw new IllegalArgumentException("WebSocket URI must be absolute");
169 }
170
171 if (StringUtil.isBlank(toUri.getScheme()))
172 {
173 throw new IllegalArgumentException("WebSocket URI must include a scheme");
174 }
175
176 String scheme = toUri.getScheme().toLowerCase(Locale.ENGLISH);
177 if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
178 {
179 throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
180 }
181
182 request.setRequestURI(toUri);
183 request.setCookiesFrom(this.cookieStore);
184
185
186 for (ExtensionConfig reqExt : request.getExtensions())
187 {
188 if (!extensionRegistry.isAvailable(reqExt.getName()))
189 {
190 throw new IllegalArgumentException("Requested extension [" + reqExt.getName() + "] is not installed");
191 }
192 }
193
194 if (LOG.isDebugEnabled())
195 LOG.debug("connect websocket {} to {}",websocket,toUri);
196
197
198 initializeClient();
199 ConnectionManager manager = getConnectionManager();
200
201
202 EventDriver driver = null;
203 if (websocket instanceof EventDriver)
204 {
205
206 driver = (EventDriver)websocket;
207 }
208 else
209 {
210
211 driver = eventDriverFactory.wrap(websocket);
212 }
213
214 if (driver == null)
215 {
216 throw new IllegalStateException("Unable to identify as websocket object: " + websocket.getClass().getName());
217 }
218
219
220 ConnectPromise promise = manager.connect(this,driver,request);
221
222 if (upgradeListener != null)
223 {
224 promise.setUpgradeListener(upgradeListener);
225 }
226
227 if (LOG.isDebugEnabled())
228 LOG.debug("Connect Promise: {}",promise);
229
230
231 executor.execute(promise);
232
233
234 return promise;
235 }
236
237 @Override
238 protected void doStart() throws Exception
239 {
240 if (LOG.isDebugEnabled())
241 LOG.debug("Starting {}",this);
242
243 if (sslContextFactory != null)
244 {
245 addBean(sslContextFactory);
246 }
247
248 String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
249
250 if (bufferPool == null)
251 {
252 bufferPool = new MappedByteBufferPool();
253 }
254 addBean(bufferPool);
255
256 if (scheduler == null)
257 {
258 scheduler = new ScheduledExecutorScheduler(name + "-scheduler",daemon);
259 }
260 addBean(scheduler);
261
262 if (cookieStore == null)
263 {
264 cookieStore = new HttpCookieStore.Empty();
265 }
266
267 if(this.sessionFactory == null)
268 {
269 this.sessionFactory = new WebSocketSessionFactory(this);
270 }
271
272 if(this.objectFactory == null)
273 {
274 this.objectFactory = new DecoratedObjectFactory();
275 }
276
277 super.doStart();
278
279 if (LOG.isDebugEnabled())
280 LOG.debug("Started {}",this);
281 }
282
283 @Override
284 protected void doStop() throws Exception
285 {
286 if (LOG.isDebugEnabled())
287 LOG.debug("Stopping {}",this);
288
289 if (ShutdownThread.isRegistered(this))
290 {
291 ShutdownThread.deregister(this);
292 }
293
294 if (cookieStore != null)
295 {
296 cookieStore.removeAll();
297 cookieStore = null;
298 }
299
300 super.doStop();
301
302 if (LOG.isDebugEnabled())
303 LOG.debug("Stopped {}",this);
304 }
305
306 public boolean isDispatchIO()
307 {
308 return dispatchIO;
309 }
310
311
312
313
314
315
316 public long getAsyncWriteTimeout()
317 {
318 return this.policy.getAsyncWriteTimeout();
319 }
320
321 public SocketAddress getBindAddress()
322 {
323 return bindAddress;
324 }
325
326 public ByteBufferPool getBufferPool()
327 {
328 return bufferPool;
329 }
330
331 public ConnectionManager getConnectionManager()
332 {
333 return connectionManager;
334 }
335
336 public long getConnectTimeout()
337 {
338 return connectTimeout;
339 }
340
341 public CookieStore getCookieStore()
342 {
343 return cookieStore;
344 }
345
346 public EventDriverFactory getEventDriverFactory()
347 {
348 return eventDriverFactory;
349 }
350
351 public Executor getExecutor()
352 {
353 return executor;
354 }
355
356 public ExtensionFactory getExtensionFactory()
357 {
358 return extensionRegistry;
359 }
360
361 public Masker getMasker()
362 {
363 return masker;
364 }
365
366
367
368
369
370
371 public int getMaxBinaryMessageBufferSize()
372 {
373 return this.policy.getMaxBinaryMessageBufferSize();
374 }
375
376
377
378
379
380
381 public long getMaxBinaryMessageSize()
382 {
383 return this.policy.getMaxBinaryMessageSize();
384 }
385
386
387
388
389
390
391 public long getMaxIdleTimeout()
392 {
393 return this.policy.getIdleTimeout();
394 }
395
396
397
398
399
400
401 public int getMaxTextMessageBufferSize()
402 {
403 return this.policy.getMaxTextMessageBufferSize();
404 }
405
406
407
408
409
410
411 public long getMaxTextMessageSize()
412 {
413 return this.policy.getMaxTextMessageSize();
414 }
415
416 @Override
417 public DecoratedObjectFactory getObjectFactory()
418 {
419 return this.objectFactory;
420 }
421
422 public Set<WebSocketSession> getOpenSessions()
423 {
424 return Collections.unmodifiableSet(new HashSet<>(getBeans(WebSocketSession.class)));
425 }
426
427 public WebSocketPolicy getPolicy()
428 {
429 return this.policy;
430 }
431
432 public Scheduler getScheduler()
433 {
434 return scheduler;
435 }
436
437 public SessionFactory getSessionFactory()
438 {
439 return sessionFactory;
440 }
441
442
443
444
445
446 public SslContextFactory getSslContextFactory()
447 {
448 return sslContextFactory;
449 }
450
451 private synchronized void initializeClient() throws IOException
452 {
453 if (!ShutdownThread.isRegistered(this))
454 {
455 ShutdownThread.register(this);
456 }
457
458 if (executor == null)
459 {
460 QueuedThreadPool threadPool = new QueuedThreadPool();
461 String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
462 threadPool.setName(name);
463 threadPool.setDaemon(daemon);
464 executor = threadPool;
465 addManaged(threadPool);
466 }
467 else
468 {
469 addBean(executor,false);
470 }
471
472 if (connectionManager == null)
473 {
474 connectionManager = newConnectionManager();
475 addManaged(connectionManager);
476 }
477 }
478
479
480
481
482
483
484 protected ConnectionManager newConnectionManager()
485 {
486 return new ConnectionManager(this);
487 }
488
489 @Override
490 public void onSessionClosed(WebSocketSession session)
491 {
492 if (LOG.isDebugEnabled())
493 LOG.debug("Session Closed: {}",session);
494 removeBean(session);
495 }
496
497 @Override
498 public void onSessionOpened(WebSocketSession session)
499 {
500 if (LOG.isDebugEnabled())
501 LOG.debug("Session Opened: {}",session);
502 addManaged(session);
503 }
504
505 public void setAsyncWriteTimeout(long ms)
506 {
507 this.policy.setAsyncWriteTimeout(ms);
508 }
509
510
511
512
513
514 @Deprecated
515 public void setBindAdddress(SocketAddress bindAddress)
516 {
517 setBindAddress(bindAddress);
518 }
519
520 public void setBindAddress(SocketAddress bindAddress)
521 {
522 this.bindAddress = bindAddress;
523 }
524
525 public void setBufferPool(ByteBufferPool bufferPool)
526 {
527 this.bufferPool = bufferPool;
528 }
529
530
531
532
533
534
535
536 public void setConnectTimeout(long ms)
537 {
538 if (ms < 0)
539 {
540 throw new IllegalStateException("Connect Timeout cannot be negative");
541 }
542 this.connectTimeout = ms;
543 }
544
545 public void setCookieStore(CookieStore cookieStore)
546 {
547 this.cookieStore = cookieStore;
548 }
549
550 public void setDaemon(boolean daemon)
551 {
552 this.daemon = daemon;
553 }
554
555 public void setDispatchIO(boolean dispatchIO)
556 {
557 this.dispatchIO = dispatchIO;
558 }
559
560 public void setEventDriverFactory(EventDriverFactory factory)
561 {
562 this.eventDriverFactory = factory;
563 }
564
565 public void setExecutor(Executor executor)
566 {
567 updateBean(this.executor,executor);
568 this.executor = executor;
569 }
570
571 public void setMasker(Masker masker)
572 {
573 this.masker = masker;
574 }
575
576 public void setMaxBinaryMessageBufferSize(int max)
577 {
578 this.policy.setMaxBinaryMessageBufferSize(max);
579 }
580
581
582
583
584
585
586
587
588
589 public void setMaxIdleTimeout(long ms)
590 {
591 this.policy.setIdleTimeout(ms);
592 }
593
594 public void setMaxTextMessageBufferSize(int max)
595 {
596 this.policy.setMaxTextMessageBufferSize(max);
597 }
598
599 public void setSessionFactory(SessionFactory sessionFactory)
600 {
601 this.sessionFactory = sessionFactory;
602 }
603
604 @Override
605 public void dump(Appendable out, String indent) throws IOException
606 {
607 dumpThis(out);
608 dump(out, indent, getOpenSessions());
609 }
610 }