1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.client;
20
21 import java.io.IOException;
22 import java.net.CookieStore;
23 import java.net.SocketAddress;
24 import java.net.URI;
25 import java.util.ArrayList;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Locale;
29 import java.util.Set;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.Future;
32
33 import org.eclipse.jetty.io.ByteBufferPool;
34 import org.eclipse.jetty.io.MappedByteBufferPool;
35 import org.eclipse.jetty.io.SelectorManager;
36 import org.eclipse.jetty.util.HttpCookieStore;
37 import org.eclipse.jetty.util.StringUtil;
38 import org.eclipse.jetty.util.component.ContainerLifeCycle;
39 import org.eclipse.jetty.util.component.LifeCycle;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 import org.eclipse.jetty.util.ssl.SslContextFactory;
43 import org.eclipse.jetty.util.thread.QueuedThreadPool;
44 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
45 import org.eclipse.jetty.util.thread.Scheduler;
46 import org.eclipse.jetty.util.thread.ShutdownThread;
47 import org.eclipse.jetty.websocket.api.Session;
48 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
49 import org.eclipse.jetty.websocket.api.extensions.Extension;
50 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
51 import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
52 import org.eclipse.jetty.websocket.client.io.ConnectPromise;
53 import org.eclipse.jetty.websocket.client.io.ConnectionManager;
54 import org.eclipse.jetty.websocket.client.io.UpgradeListener;
55 import org.eclipse.jetty.websocket.client.masks.Masker;
56 import org.eclipse.jetty.websocket.client.masks.RandomMasker;
57 import org.eclipse.jetty.websocket.common.SessionFactory;
58 import org.eclipse.jetty.websocket.common.SessionListener;
59 import org.eclipse.jetty.websocket.common.WebSocketSession;
60 import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
61 import org.eclipse.jetty.websocket.common.events.EventDriver;
62 import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
63 import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
64
65
66
67
68 public class WebSocketClient extends ContainerLifeCycle implements SessionListener
69 {
70 private static final Logger LOG = Log.getLogger(WebSocketClient.class);
71
72 private final WebSocketPolicy policy;
73 private final SslContextFactory sslContextFactory;
74 private final WebSocketExtensionFactory extensionRegistry;
75 private boolean daemon = false;
76 private EventDriverFactory eventDriverFactory;
77 private SessionFactory sessionFactory;
78 private ByteBufferPool bufferPool;
79 private Executor executor;
80 private Scheduler scheduler;
81 private CookieStore cookieStore;
82 private ConnectionManager connectionManager;
83 private Masker masker;
84 private SocketAddress bindAddress;
85 private long connectTimeout = SelectorManager.DEFAULT_CONNECT_TIMEOUT;
86
87 public WebSocketClient()
88 {
89 this(null,null);
90 }
91
92 public WebSocketClient(Executor executor)
93 {
94 this(null,executor);
95 }
96
97 public WebSocketClient(ByteBufferPool bufferPool)
98 {
99 this(null,null,bufferPool);
100 }
101
102 public WebSocketClient(SslContextFactory sslContextFactory)
103 {
104 this(sslContextFactory,null);
105 }
106
107 public WebSocketClient(SslContextFactory sslContextFactory, Executor executor)
108 {
109 this(sslContextFactory,executor,new MappedByteBufferPool());
110 }
111
112 public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool)
113 {
114 this.executor = executor;
115 this.sslContextFactory = sslContextFactory;
116 this.policy = WebSocketPolicy.newClientPolicy();
117 this.bufferPool = bufferPool;
118 this.extensionRegistry = new WebSocketExtensionFactory(policy,bufferPool);
119
120
121 this.extensionRegistry.unregister("deflate-frame");
122 this.extensionRegistry.unregister("permessage-deflate");
123 this.extensionRegistry.unregister("x-webkit-deflate-frame");
124
125 this.masker = new RandomMasker();
126 this.eventDriverFactory = new EventDriverFactory(policy);
127 this.sessionFactory = new WebSocketSessionFactory(this);
128
129 addBean(this.executor);
130 addBean(this.sslContextFactory);
131 addBean(this.bufferPool);
132 }
133
134 public Future<Session> connect(Object websocket, URI toUri) throws IOException
135 {
136 ClientUpgradeRequest request = new ClientUpgradeRequest(toUri);
137 request.setRequestURI(toUri);
138 request.setCookiesFrom(this.cookieStore);
139
140 return connect(websocket,toUri,request);
141 }
142
143 public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request) throws IOException
144 {
145 return connect(websocket,toUri,request,null);
146 }
147
148 public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request, UpgradeListener upgradeListener) throws IOException
149 {
150 if (!isStarted())
151 {
152 throw new IllegalStateException(WebSocketClient.class.getSimpleName() + "@" + this.hashCode() + " is not started");
153 }
154
155
156 if (!toUri.isAbsolute())
157 {
158 throw new IllegalArgumentException("WebSocket URI must be absolute");
159 }
160
161 if (StringUtil.isBlank(toUri.getScheme()))
162 {
163 throw new IllegalArgumentException("WebSocket URI must include a scheme");
164 }
165
166 String scheme = toUri.getScheme().toLowerCase(Locale.ENGLISH);
167 if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
168 {
169 throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
170 }
171
172 request.setRequestURI(toUri);
173 request.setCookiesFrom(this.cookieStore);
174
175
176 for (ExtensionConfig reqExt : request.getExtensions())
177 {
178 if (!extensionRegistry.isAvailable(reqExt.getName()))
179 {
180 throw new IllegalArgumentException("Requested extension [" + reqExt.getName() + "] is not installed");
181 }
182 }
183
184
185 LOG.debug("connect websocket {} to {}",websocket,toUri);
186
187
188 initialiseClient();
189 ConnectionManager manager = getConnectionManager();
190
191
192 EventDriver driver = null;
193 if (websocket instanceof EventDriver)
194 {
195
196 driver = (EventDriver)websocket;
197 }
198 else
199 {
200
201 driver = eventDriverFactory.wrap(websocket);
202 }
203
204 if (driver == null)
205 {
206 throw new IllegalStateException("Unable to identify as websocket object: " + websocket.getClass().getName());
207 }
208
209
210 ConnectPromise promise = manager.connect(this,driver,request);
211
212 if (upgradeListener != null)
213 {
214 promise.setUpgradeListener(upgradeListener);
215 }
216
217 LOG.debug("Connect Promise: {}",promise);
218
219
220 executor.execute(promise);
221
222
223 return promise;
224 }
225
226 @Override
227 protected void doStart() throws Exception
228 {
229 LOG.debug("Starting {}",this);
230
231 if (sslContextFactory != null)
232 {
233 addBean(sslContextFactory);
234 }
235
236 String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
237
238 if (bufferPool == null)
239 {
240 bufferPool = new MappedByteBufferPool();
241 }
242 addBean(bufferPool);
243
244 if (scheduler == null)
245 {
246 scheduler = new ScheduledExecutorScheduler(name + "-scheduler",daemon);
247 }
248 addBean(scheduler);
249
250 if (cookieStore == null)
251 {
252 cookieStore = new HttpCookieStore.Empty();
253 }
254
255 super.doStart();
256
257 LOG.debug("Started {}",this);
258 }
259
260 @Override
261 protected void doStop() throws Exception
262 {
263 LOG.debug("Stopping {}",this);
264
265 if (cookieStore != null)
266 {
267 cookieStore.removeAll();
268 cookieStore = null;
269 }
270
271 super.doStop();
272 LOG.debug("Stopped {}",this);
273 }
274
275
276
277
278
279
280 public long getAsyncWriteTimeout()
281 {
282 return this.policy.getAsyncWriteTimeout();
283 }
284
285 public SocketAddress getBindAddress()
286 {
287 return bindAddress;
288 }
289
290 public ByteBufferPool getBufferPool()
291 {
292 return bufferPool;
293 }
294
295 public ConnectionManager getConnectionManager()
296 {
297 return connectionManager;
298 }
299
300 public long getConnectTimeout()
301 {
302 return connectTimeout;
303 }
304
305 public CookieStore getCookieStore()
306 {
307 return cookieStore;
308 }
309
310 public EventDriverFactory getEventDriverFactory()
311 {
312 return eventDriverFactory;
313 }
314
315 public Executor getExecutor()
316 {
317 return executor;
318 }
319
320 public ExtensionFactory getExtensionFactory()
321 {
322 return extensionRegistry;
323 }
324
325 public Masker getMasker()
326 {
327 return masker;
328 }
329
330
331
332
333
334
335 public int getMaxBinaryMessageBufferSize()
336 {
337 return this.policy.getMaxBinaryMessageBufferSize();
338 }
339
340
341
342
343
344
345 public long getMaxBinaryMessageSize()
346 {
347 return this.policy.getMaxBinaryMessageSize();
348 }
349
350
351
352
353
354
355 public long getMaxIdleTimeout()
356 {
357 return this.policy.getIdleTimeout();
358 }
359
360
361
362
363
364
365 public int getMaxTextMessageBufferSize()
366 {
367 return this.policy.getMaxTextMessageBufferSize();
368 }
369
370
371
372
373
374
375 public long getMaxTextMessageSize()
376 {
377 return this.policy.getMaxTextMessageSize();
378 }
379
380 public Set<WebSocketSession> getOpenSessions()
381 {
382 return new HashSet<>(getBeans(WebSocketSession.class));
383 }
384
385 public WebSocketPolicy getPolicy()
386 {
387 return this.policy;
388 }
389
390 public Scheduler getScheduler()
391 {
392 return scheduler;
393 }
394
395 public SessionFactory getSessionFactory()
396 {
397 return sessionFactory;
398 }
399
400
401
402
403
404 public SslContextFactory getSslContextFactory()
405 {
406 return sslContextFactory;
407 }
408
409 public List<Extension> initExtensions(List<ExtensionConfig> requested)
410 {
411 List<Extension> extensions = new ArrayList<Extension>();
412
413 for (ExtensionConfig cfg : requested)
414 {
415 Extension extension = extensionRegistry.newInstance(cfg);
416
417 if (extension == null)
418 {
419 continue;
420 }
421
422 LOG.debug("added {}",extension);
423 extensions.add(extension);
424 }
425 LOG.debug("extensions={}",extensions);
426 return extensions;
427 }
428
429 private synchronized void initialiseClient() throws IOException
430 {
431 ShutdownThread.register(this);
432
433 if (executor == null)
434 {
435 QueuedThreadPool threadPool = new QueuedThreadPool();
436 String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
437 threadPool.setName(name);
438 threadPool.setDaemon(daemon);
439 executor = threadPool;
440 addManaged(threadPool);
441 }
442 else
443 {
444 addBean(executor,false);
445 }
446
447 if (connectionManager == null)
448 {
449 connectionManager = newConnectionManager();
450 addManaged(connectionManager);
451 }
452 }
453
454
455
456
457
458
459 protected ConnectionManager newConnectionManager()
460 {
461 return new ConnectionManager(this);
462 }
463
464 @Override
465 public void onSessionClosed(WebSocketSession session)
466 {
467 LOG.debug("Session Closed: {}",session);
468 removeBean(session);
469 }
470
471 @Override
472 public void onSessionOpened(WebSocketSession session)
473 {
474 LOG.debug("Session Opened: {}",session);
475 }
476
477 public void setAsyncWriteTimeout(long ms)
478 {
479 this.policy.setAsyncWriteTimeout(ms);
480 }
481
482 public void setBindAdddress(SocketAddress bindAddress)
483 {
484 this.bindAddress = bindAddress;
485 }
486
487 public void setBufferPool(ByteBufferPool bufferPool)
488 {
489 this.bufferPool = bufferPool;
490 }
491
492
493
494
495
496
497
498 public void setConnectTimeout(long ms)
499 {
500 if (ms < 0)
501 {
502 throw new IllegalStateException("Connect Timeout cannot be negative");
503 }
504 this.connectTimeout = ms;
505 }
506
507 public void setCookieStore(CookieStore cookieStore)
508 {
509 this.cookieStore = cookieStore;
510 }
511
512 public void setDaemon(boolean daemon)
513 {
514 this.daemon = daemon;
515 }
516
517 public void setEventDriverFactory(EventDriverFactory factory)
518 {
519 this.eventDriverFactory = factory;
520 }
521
522 public void setExecutor(Executor executor)
523 {
524 updateBean(this.executor,executor);
525 this.executor = executor;
526 }
527
528 public void setMasker(Masker masker)
529 {
530 this.masker = masker;
531 }
532
533 public void setMaxBinaryMessageBufferSize(int max)
534 {
535 this.policy.setMaxBinaryMessageBufferSize(max);
536 }
537
538
539
540
541
542
543
544
545
546 public void setMaxIdleTimeout(long ms)
547 {
548 this.policy.setIdleTimeout(ms);
549 }
550
551 public void setMaxTextMessageBufferSize(int max)
552 {
553 this.policy.setMaxTextMessageBufferSize(max);
554 }
555
556 public void setSessionFactory(SessionFactory sessionFactory)
557 {
558 this.sessionFactory = sessionFactory;
559 }
560
561 @Override
562 public void dump(Appendable out, String indent) throws IOException
563 {
564 dumpThis(out);
565 dump(out, indent, getOpenSessions());
566 }
567 }