1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.client;
20
21 import java.io.IOException;
22 import java.net.CookieStore;
23 import java.net.SocketAddress;
24 import java.net.URI;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Locale;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.Future;
30
31 import org.eclipse.jetty.io.ByteBufferPool;
32 import org.eclipse.jetty.io.MappedByteBufferPool;
33 import org.eclipse.jetty.io.SelectorManager;
34 import org.eclipse.jetty.util.HttpCookieStore;
35 import org.eclipse.jetty.util.StringUtil;
36 import org.eclipse.jetty.util.component.ContainerLifeCycle;
37 import org.eclipse.jetty.util.log.Log;
38 import org.eclipse.jetty.util.log.Logger;
39 import org.eclipse.jetty.util.ssl.SslContextFactory;
40 import org.eclipse.jetty.util.thread.QueuedThreadPool;
41 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
42 import org.eclipse.jetty.util.thread.Scheduler;
43 import org.eclipse.jetty.websocket.api.Session;
44 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
45 import org.eclipse.jetty.websocket.api.extensions.Extension;
46 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
47 import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
48 import org.eclipse.jetty.websocket.client.io.ConnectPromise;
49 import org.eclipse.jetty.websocket.client.io.ConnectionManager;
50 import org.eclipse.jetty.websocket.client.io.UpgradeListener;
51 import org.eclipse.jetty.websocket.client.masks.Masker;
52 import org.eclipse.jetty.websocket.client.masks.RandomMasker;
53 import org.eclipse.jetty.websocket.common.SessionFactory;
54 import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
55 import org.eclipse.jetty.websocket.common.events.EventDriver;
56 import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
57 import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
58
59
60
61
62 public class WebSocketClient extends ContainerLifeCycle
63 {
64 private static final Logger LOG = Log.getLogger(WebSocketClient.class);
65
66 private final WebSocketPolicy policy;
67 private final SslContextFactory sslContextFactory;
68 private final WebSocketExtensionFactory extensionRegistry;
69 private EventDriverFactory eventDriverFactory;
70 private SessionFactory sessionFactory;
71 private ByteBufferPool bufferPool;
72 private Executor executor;
73 private Scheduler scheduler;
74 private CookieStore cookieStore;
75 private ConnectionManager connectionManager;
76 private Masker masker;
77 private SocketAddress bindAddress;
78 private long connectTimeout = SelectorManager.DEFAULT_CONNECT_TIMEOUT;
79
80 public WebSocketClient()
81 {
82 this(null,null);
83 }
84
85 public WebSocketClient(Executor executor)
86 {
87 this(null,executor);
88 }
89
90 public WebSocketClient(SslContextFactory sslContextFactory)
91 {
92 this(sslContextFactory,null);
93 }
94
95 public WebSocketClient(SslContextFactory sslContextFactory, Executor executor)
96 {
97 this.executor = executor;
98 this.sslContextFactory = sslContextFactory;
99 this.policy = WebSocketPolicy.newClientPolicy();
100 this.bufferPool = new MappedByteBufferPool();
101 this.extensionRegistry = new WebSocketExtensionFactory(policy,bufferPool);
102 this.masker = new RandomMasker();
103 this.eventDriverFactory = new EventDriverFactory(policy);
104 this.sessionFactory = new WebSocketSessionFactory();
105 }
106
107 public Future<Session> connect(Object websocket, URI toUri) throws IOException
108 {
109 ClientUpgradeRequest request = new ClientUpgradeRequest(toUri);
110 request.setRequestURI(toUri);
111 request.setCookiesFrom(this.cookieStore);
112
113 return connect(websocket,toUri,request);
114 }
115
116 public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request) throws IOException
117 {
118 return connect(websocket,toUri,request,null);
119 }
120
121 public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request, UpgradeListener upgradeListener) throws IOException
122 {
123 if (!isStarted())
124 {
125 throw new IllegalStateException(WebSocketClient.class.getSimpleName() + "@" + this.hashCode() + " is not started");
126 }
127
128
129 if (!toUri.isAbsolute())
130 {
131 throw new IllegalArgumentException("WebSocket URI must be absolute");
132 }
133
134 if (StringUtil.isBlank(toUri.getScheme()))
135 {
136 throw new IllegalArgumentException("WebSocket URI must include a scheme");
137 }
138
139 String scheme = toUri.getScheme().toLowerCase(Locale.ENGLISH);
140 if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
141 {
142 throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
143 }
144
145 request.setRequestURI(toUri);
146 request.setCookiesFrom(this.cookieStore);
147
148
149 for (ExtensionConfig reqExt : request.getExtensions())
150 {
151 if (!extensionRegistry.isAvailable(reqExt.getName()))
152 {
153 throw new IllegalArgumentException("Requested extension [" + reqExt.getName() + "] is not installed");
154 }
155 }
156
157
158 LOG.debug("connect websocket {} to {}",websocket,toUri);
159
160
161 initialiseClient();
162 ConnectionManager manager = getConnectionManager();
163
164
165 EventDriver driver = null;
166 if (websocket instanceof EventDriver)
167 {
168
169 driver = (EventDriver)websocket;
170 }
171 else
172 {
173
174 driver = eventDriverFactory.wrap(websocket);
175 }
176
177 if (driver == null)
178 {
179 throw new IllegalStateException("Unable to identify as websocket object: " + websocket.getClass().getName());
180 }
181
182
183 ConnectPromise promise = manager.connect(this,driver,request);
184
185 if (upgradeListener != null)
186 {
187 promise.setUpgradeListener(upgradeListener);
188 }
189
190 LOG.debug("Connect Promise: {}",promise);
191
192
193 executor.execute(promise);
194
195
196 return promise;
197 }
198
199 private synchronized void initialiseClient() throws IOException
200 {
201 if (executor == null)
202 {
203 QueuedThreadPool threadPool = new QueuedThreadPool();
204 String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
205 threadPool.setName(name);
206 executor = threadPool;
207 addBean(executor,true);
208 }
209 else
210 {
211 addBean(executor,false);
212 }
213
214 if (connectionManager != null)
215 {
216 return;
217 }
218 try
219 {
220 connectionManager = newConnectionManager();
221 addBean(connectionManager);
222 connectionManager.start();
223 }
224 catch (IOException e)
225 {
226 throw e;
227 }
228 catch (Exception e)
229 {
230 throw new IOException(e);
231 }
232 }
233
234 @Override
235 protected void doStart() throws Exception
236 {
237 LOG.debug("Starting {}",this);
238
239 if (sslContextFactory != null)
240 {
241 addBean(sslContextFactory);
242 }
243
244 String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
245
246 if (bufferPool == null)
247 {
248 bufferPool = new MappedByteBufferPool();
249 }
250 addBean(bufferPool);
251
252 if (scheduler == null)
253 {
254 scheduler = new ScheduledExecutorScheduler(name + "-scheduler",false);
255 }
256 addBean(scheduler);
257
258 if (cookieStore == null)
259 {
260 cookieStore = new HttpCookieStore.Empty();
261 }
262
263 super.doStart();
264
265 LOG.debug("Started {}",this);
266 }
267
268 @Override
269 protected void doStop() throws Exception
270 {
271 LOG.debug("Stopping {}",this);
272
273 if (cookieStore != null)
274 {
275 cookieStore.removeAll();
276 cookieStore = null;
277 }
278
279 super.doStop();
280 LOG.info("Stopped {}",this);
281 }
282
283
284
285
286
287
288 public long getAsyncWriteTimeout()
289 {
290 return this.policy.getAsyncWriteTimeout();
291 }
292
293 public SocketAddress getBindAddress()
294 {
295 return bindAddress;
296 }
297
298 public ByteBufferPool getBufferPool()
299 {
300 return bufferPool;
301 }
302
303 public ConnectionManager getConnectionManager()
304 {
305 return connectionManager;
306 }
307
308 public long getConnectTimeout()
309 {
310 return connectTimeout;
311 }
312
313 public CookieStore getCookieStore()
314 {
315 return cookieStore;
316 }
317
318 public EventDriverFactory getEventDriverFactory()
319 {
320 return eventDriverFactory;
321 }
322
323 public Executor getExecutor()
324 {
325 return executor;
326 }
327
328 public ExtensionFactory getExtensionFactory()
329 {
330 return extensionRegistry;
331 }
332
333 public Masker getMasker()
334 {
335 return masker;
336 }
337
338
339
340
341
342
343 public int getMaxBinaryMessageBufferSize()
344 {
345 return this.policy.getMaxBinaryMessageBufferSize();
346 }
347
348
349
350
351
352
353 public long getMaxBinaryMessageSize()
354 {
355 return this.policy.getMaxBinaryMessageSize();
356 }
357
358
359
360
361
362
363 public long getMaxIdleTimeout()
364 {
365 return this.policy.getIdleTimeout();
366 }
367
368
369
370
371
372
373 public int getMaxTextMessageBufferSize()
374 {
375 return this.policy.getMaxTextMessageBufferSize();
376 }
377
378
379
380
381
382
383 public long getMaxTextMessageSize()
384 {
385 return this.policy.getMaxTextMessageSize();
386 }
387
388 public WebSocketPolicy getPolicy()
389 {
390 return this.policy;
391 }
392
393 public Scheduler getScheduler()
394 {
395 return scheduler;
396 }
397
398 public SessionFactory getSessionFactory()
399 {
400 return sessionFactory;
401 }
402
403
404
405
406
407 public SslContextFactory getSslContextFactory()
408 {
409 return sslContextFactory;
410 }
411
412 public List<Extension> initExtensions(List<ExtensionConfig> requested)
413 {
414 List<Extension> extensions = new ArrayList<Extension>();
415
416 for (ExtensionConfig cfg : requested)
417 {
418 Extension extension = extensionRegistry.newInstance(cfg);
419
420 if (extension == null)
421 {
422 continue;
423 }
424
425 LOG.debug("added {}",extension);
426 extensions.add(extension);
427 }
428 LOG.debug("extensions={}",extensions);
429 return extensions;
430 }
431
432
433
434
435
436
437 protected ConnectionManager newConnectionManager()
438 {
439 return new ConnectionManager(this);
440 }
441
442 public void setAsyncWriteTimeout(long ms)
443 {
444 this.policy.setAsyncWriteTimeout(ms);
445 }
446
447 public void setBindAdddress(SocketAddress bindAddress)
448 {
449 this.bindAddress = bindAddress;
450 }
451
452 public void setBufferPool(ByteBufferPool bufferPool)
453 {
454 this.bufferPool = bufferPool;
455 }
456
457
458
459
460
461
462
463 public void setConnectTimeout(long ms)
464 {
465 if (ms < 0)
466 {
467 throw new IllegalStateException("Connect Timeout cannot be negative");
468 }
469 this.connectTimeout = ms;
470 }
471
472 public void setCookieStore(CookieStore cookieStore)
473 {
474 this.cookieStore = cookieStore;
475 }
476
477 public void setEventDriverFactory(EventDriverFactory factory)
478 {
479 this.eventDriverFactory = factory;
480 }
481
482 public void setExecutor(Executor executor)
483 {
484 updateBean(this.executor,executor);
485 this.executor = executor;
486 }
487
488 public void setMasker(Masker masker)
489 {
490 this.masker = masker;
491 }
492
493 public void setMaxBinaryMessageBufferSize(int max)
494 {
495 this.policy.setMaxBinaryMessageBufferSize(max);
496 }
497
498
499
500
501
502
503
504
505
506 public void setMaxIdleTimeout(long ms)
507 {
508 this.policy.setIdleTimeout(ms);
509 }
510
511 public void setMaxTextMessageBufferSize(int max)
512 {
513 this.policy.setMaxTextMessageBufferSize(max);
514 }
515
516 public void setSessionFactory(SessionFactory sessionFactory)
517 {
518 this.sessionFactory = sessionFactory;
519 }
520 }