1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.client;
20
21 import java.io.IOException;
22 import java.net.CookieStore;
23 import java.net.SocketAddress;
24 import java.net.URI;
25 import java.util.ArrayList;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Locale;
29 import java.util.Set;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.Future;
32
33 import org.eclipse.jetty.io.ByteBufferPool;
34 import org.eclipse.jetty.io.MappedByteBufferPool;
35 import org.eclipse.jetty.io.SelectorManager;
36 import org.eclipse.jetty.util.HttpCookieStore;
37 import org.eclipse.jetty.util.StringUtil;
38 import org.eclipse.jetty.util.component.ContainerLifeCycle;
39 import org.eclipse.jetty.util.log.Log;
40 import org.eclipse.jetty.util.log.Logger;
41 import org.eclipse.jetty.util.ssl.SslContextFactory;
42 import org.eclipse.jetty.util.thread.QueuedThreadPool;
43 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
44 import org.eclipse.jetty.util.thread.Scheduler;
45 import org.eclipse.jetty.util.thread.ShutdownThread;
46 import org.eclipse.jetty.websocket.api.Session;
47 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48 import org.eclipse.jetty.websocket.api.extensions.Extension;
49 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
50 import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
51 import org.eclipse.jetty.websocket.client.io.ConnectPromise;
52 import org.eclipse.jetty.websocket.client.io.ConnectionManager;
53 import org.eclipse.jetty.websocket.client.io.UpgradeListener;
54 import org.eclipse.jetty.websocket.client.masks.Masker;
55 import org.eclipse.jetty.websocket.client.masks.RandomMasker;
56 import org.eclipse.jetty.websocket.common.SessionFactory;
57 import org.eclipse.jetty.websocket.common.SessionListener;
58 import org.eclipse.jetty.websocket.common.WebSocketSession;
59 import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
60 import org.eclipse.jetty.websocket.common.events.EventDriver;
61 import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
62 import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
63
64
65
66
67 public class WebSocketClient extends ContainerLifeCycle implements SessionListener
68 {
69 private static final Logger LOG = Log.getLogger(WebSocketClient.class);
70
71 private final WebSocketPolicy policy;
72 private final SslContextFactory sslContextFactory;
73 private final WebSocketExtensionFactory extensionRegistry;
74 private boolean daemon = false;
75 private EventDriverFactory eventDriverFactory;
76 private SessionFactory sessionFactory;
77 private ByteBufferPool bufferPool;
78 private Executor executor;
79 private Scheduler scheduler;
80 private CookieStore cookieStore;
81 private ConnectionManager connectionManager;
82 private Masker masker;
83 private SocketAddress bindAddress;
84 private long connectTimeout = SelectorManager.DEFAULT_CONNECT_TIMEOUT;
85
86 public WebSocketClient()
87 {
88 this(null,null);
89 }
90
91 public WebSocketClient(Executor executor)
92 {
93 this(null,executor);
94 }
95
96 public WebSocketClient(ByteBufferPool bufferPool)
97 {
98 this(null,null,bufferPool);
99 }
100
101 public WebSocketClient(SslContextFactory sslContextFactory)
102 {
103 this(sslContextFactory,null);
104 }
105
106 public WebSocketClient(SslContextFactory sslContextFactory, Executor executor)
107 {
108 this(sslContextFactory,executor,new MappedByteBufferPool());
109 }
110
111 public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool)
112 {
113 this.executor = executor;
114 this.sslContextFactory = sslContextFactory;
115 this.policy = WebSocketPolicy.newClientPolicy();
116 this.bufferPool = bufferPool;
117 this.extensionRegistry = new WebSocketExtensionFactory(policy,bufferPool);
118
119
120 this.extensionRegistry.unregister("deflate-frame");
121 this.extensionRegistry.unregister("permessage-deflate");
122 this.extensionRegistry.unregister("x-webkit-deflate-frame");
123
124 this.masker = new RandomMasker();
125 this.eventDriverFactory = new EventDriverFactory(policy);
126 this.sessionFactory = new WebSocketSessionFactory(this);
127
128 addBean(this.executor);
129 addBean(this.sslContextFactory);
130 addBean(this.bufferPool);
131 }
132
133 public Future<Session> connect(Object websocket, URI toUri) throws IOException
134 {
135 ClientUpgradeRequest request = new ClientUpgradeRequest(toUri);
136 request.setRequestURI(toUri);
137 request.setCookiesFrom(this.cookieStore);
138
139 return connect(websocket,toUri,request);
140 }
141
142 public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request) throws IOException
143 {
144 return connect(websocket,toUri,request,null);
145 }
146
147 public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request, UpgradeListener upgradeListener) throws IOException
148 {
149 if (!isStarted())
150 {
151 throw new IllegalStateException(WebSocketClient.class.getSimpleName() + "@" + this.hashCode() + " is not started");
152 }
153
154
155 if (!toUri.isAbsolute())
156 {
157 throw new IllegalArgumentException("WebSocket URI must be absolute");
158 }
159
160 if (StringUtil.isBlank(toUri.getScheme()))
161 {
162 throw new IllegalArgumentException("WebSocket URI must include a scheme");
163 }
164
165 String scheme = toUri.getScheme().toLowerCase(Locale.ENGLISH);
166 if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
167 {
168 throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
169 }
170
171 request.setRequestURI(toUri);
172 request.setCookiesFrom(this.cookieStore);
173
174
175 for (ExtensionConfig reqExt : request.getExtensions())
176 {
177 if (!extensionRegistry.isAvailable(reqExt.getName()))
178 {
179 throw new IllegalArgumentException("Requested extension [" + reqExt.getName() + "] is not installed");
180 }
181 }
182
183 if (LOG.isDebugEnabled())
184 LOG.debug("connect websocket {} to {}",websocket,toUri);
185
186
187 initialiseClient();
188 ConnectionManager manager = getConnectionManager();
189
190
191 EventDriver driver = null;
192 if (websocket instanceof EventDriver)
193 {
194
195 driver = (EventDriver)websocket;
196 }
197 else
198 {
199
200 driver = eventDriverFactory.wrap(websocket);
201 }
202
203 if (driver == null)
204 {
205 throw new IllegalStateException("Unable to identify as websocket object: " + websocket.getClass().getName());
206 }
207
208
209 ConnectPromise promise = manager.connect(this,driver,request);
210
211 if (upgradeListener != null)
212 {
213 promise.setUpgradeListener(upgradeListener);
214 }
215
216 if (LOG.isDebugEnabled())
217 LOG.debug("Connect Promise: {}",promise);
218
219
220 executor.execute(promise);
221
222
223 return promise;
224 }
225
226 @Override
227 protected void doStart() throws Exception
228 {
229 if (LOG.isDebugEnabled())
230 LOG.debug("Starting {}",this);
231
232 if (sslContextFactory != null)
233 {
234 addBean(sslContextFactory);
235 }
236
237 String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
238
239 if (bufferPool == null)
240 {
241 bufferPool = new MappedByteBufferPool();
242 }
243 addBean(bufferPool);
244
245 if (scheduler == null)
246 {
247 scheduler = new ScheduledExecutorScheduler(name + "-scheduler",daemon);
248 }
249 addBean(scheduler);
250
251 if (cookieStore == null)
252 {
253 cookieStore = new HttpCookieStore.Empty();
254 }
255
256 super.doStart();
257
258 if (LOG.isDebugEnabled())
259 LOG.debug("Started {}",this);
260 }
261
262 @Override
263 protected void doStop() throws Exception
264 {
265 if (LOG.isDebugEnabled())
266 LOG.debug("Stopping {}",this);
267
268 if (ShutdownThread.isRegistered(this))
269 {
270 ShutdownThread.deregister(this);
271 }
272
273 if (cookieStore != null)
274 {
275 cookieStore.removeAll();
276 cookieStore = null;
277 }
278
279 super.doStop();
280
281 if (LOG.isDebugEnabled())
282 LOG.debug("Stopped {}",this);
283 }
284
285
286
287
288
289
290 public long getAsyncWriteTimeout()
291 {
292 return this.policy.getAsyncWriteTimeout();
293 }
294
295 public SocketAddress getBindAddress()
296 {
297 return bindAddress;
298 }
299
300 public ByteBufferPool getBufferPool()
301 {
302 return bufferPool;
303 }
304
305 public ConnectionManager getConnectionManager()
306 {
307 return connectionManager;
308 }
309
310 public long getConnectTimeout()
311 {
312 return connectTimeout;
313 }
314
315 public CookieStore getCookieStore()
316 {
317 return cookieStore;
318 }
319
320 public EventDriverFactory getEventDriverFactory()
321 {
322 return eventDriverFactory;
323 }
324
325 public Executor getExecutor()
326 {
327 return executor;
328 }
329
330 public ExtensionFactory getExtensionFactory()
331 {
332 return extensionRegistry;
333 }
334
335 public Masker getMasker()
336 {
337 return masker;
338 }
339
340
341
342
343
344
345 public int getMaxBinaryMessageBufferSize()
346 {
347 return this.policy.getMaxBinaryMessageBufferSize();
348 }
349
350
351
352
353
354
355 public long getMaxBinaryMessageSize()
356 {
357 return this.policy.getMaxBinaryMessageSize();
358 }
359
360
361
362
363
364
365 public long getMaxIdleTimeout()
366 {
367 return this.policy.getIdleTimeout();
368 }
369
370
371
372
373
374
375 public int getMaxTextMessageBufferSize()
376 {
377 return this.policy.getMaxTextMessageBufferSize();
378 }
379
380
381
382
383
384
385 public long getMaxTextMessageSize()
386 {
387 return this.policy.getMaxTextMessageSize();
388 }
389
390 public Set<WebSocketSession> getOpenSessions()
391 {
392 return new HashSet<>(getBeans(WebSocketSession.class));
393 }
394
395 public WebSocketPolicy getPolicy()
396 {
397 return this.policy;
398 }
399
400 public Scheduler getScheduler()
401 {
402 return scheduler;
403 }
404
405 public SessionFactory getSessionFactory()
406 {
407 return sessionFactory;
408 }
409
410
411
412
413
414 public SslContextFactory getSslContextFactory()
415 {
416 return sslContextFactory;
417 }
418
419 public List<Extension> initExtensions(List<ExtensionConfig> requested)
420 {
421 List<Extension> extensions = new ArrayList<Extension>();
422
423 for (ExtensionConfig cfg : requested)
424 {
425 Extension extension = extensionRegistry.newInstance(cfg);
426
427 if (extension == null)
428 {
429 continue;
430 }
431
432 if (LOG.isDebugEnabled())
433 LOG.debug("added {}",extension);
434 extensions.add(extension);
435 }
436 if (LOG.isDebugEnabled())
437 LOG.debug("extensions={}",extensions);
438 return extensions;
439 }
440
441 private synchronized void initialiseClient() throws IOException
442 {
443 if (!ShutdownThread.isRegistered(this))
444 {
445 ShutdownThread.register(this);
446 }
447
448 if (executor == null)
449 {
450 QueuedThreadPool threadPool = new QueuedThreadPool();
451 String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
452 threadPool.setName(name);
453 threadPool.setDaemon(daemon);
454 executor = threadPool;
455 addManaged(threadPool);
456 }
457 else
458 {
459 addBean(executor,false);
460 }
461
462 if (connectionManager == null)
463 {
464 connectionManager = newConnectionManager();
465 addManaged(connectionManager);
466 }
467 }
468
469
470
471
472
473
474 protected ConnectionManager newConnectionManager()
475 {
476 return new ConnectionManager(this);
477 }
478
479 @Override
480 public void onSessionClosed(WebSocketSession session)
481 {
482 if (LOG.isDebugEnabled())
483 LOG.debug("Session Closed: {}",session);
484 removeBean(session);
485 }
486
487 @Override
488 public void onSessionOpened(WebSocketSession session)
489 {
490 if (LOG.isDebugEnabled())
491 LOG.debug("Session Opened: {}",session);
492 }
493
494 public void setAsyncWriteTimeout(long ms)
495 {
496 this.policy.setAsyncWriteTimeout(ms);
497 }
498
499 public void setBindAdddress(SocketAddress bindAddress)
500 {
501 this.bindAddress = bindAddress;
502 }
503
504 public void setBufferPool(ByteBufferPool bufferPool)
505 {
506 this.bufferPool = bufferPool;
507 }
508
509
510
511
512
513
514
515 public void setConnectTimeout(long ms)
516 {
517 if (ms < 0)
518 {
519 throw new IllegalStateException("Connect Timeout cannot be negative");
520 }
521 this.connectTimeout = ms;
522 }
523
524 public void setCookieStore(CookieStore cookieStore)
525 {
526 this.cookieStore = cookieStore;
527 }
528
529 public void setDaemon(boolean daemon)
530 {
531 this.daemon = daemon;
532 }
533
534 public void setEventDriverFactory(EventDriverFactory factory)
535 {
536 this.eventDriverFactory = factory;
537 }
538
539 public void setExecutor(Executor executor)
540 {
541 updateBean(this.executor,executor);
542 this.executor = executor;
543 }
544
545 public void setMasker(Masker masker)
546 {
547 this.masker = masker;
548 }
549
550 public void setMaxBinaryMessageBufferSize(int max)
551 {
552 this.policy.setMaxBinaryMessageBufferSize(max);
553 }
554
555
556
557
558
559
560
561
562
563 public void setMaxIdleTimeout(long ms)
564 {
565 this.policy.setIdleTimeout(ms);
566 }
567
568 public void setMaxTextMessageBufferSize(int max)
569 {
570 this.policy.setMaxTextMessageBufferSize(max);
571 }
572
573 public void setSessionFactory(SessionFactory sessionFactory)
574 {
575 this.sessionFactory = sessionFactory;
576 }
577
578 @Override
579 public void dump(Appendable out, String indent) throws IOException
580 {
581 dumpThis(out);
582 dump(out, indent, getOpenSessions());
583 }
584 }