View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.client;
20  
21  import java.io.IOException;
22  import java.net.CookieStore;
23  import java.net.SocketAddress;
24  import java.net.URI;
25  import java.util.Collections;
26  import java.util.HashSet;
27  import java.util.Locale;
28  import java.util.Set;
29  import java.util.concurrent.Executor;
30  import java.util.concurrent.Future;
31  
32  import org.eclipse.jetty.io.ByteBufferPool;
33  import org.eclipse.jetty.io.MappedByteBufferPool;
34  import org.eclipse.jetty.io.SelectorManager;
35  import org.eclipse.jetty.util.DecoratedObjectFactory;
36  import org.eclipse.jetty.util.HttpCookieStore;
37  import org.eclipse.jetty.util.StringUtil;
38  import org.eclipse.jetty.util.component.ContainerLifeCycle;
39  import org.eclipse.jetty.util.log.Log;
40  import org.eclipse.jetty.util.log.Logger;
41  import org.eclipse.jetty.util.ssl.SslContextFactory;
42  import org.eclipse.jetty.util.thread.QueuedThreadPool;
43  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
44  import org.eclipse.jetty.util.thread.Scheduler;
45  import org.eclipse.jetty.util.thread.ShutdownThread;
46  import org.eclipse.jetty.websocket.api.Session;
47  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
49  import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
50  import org.eclipse.jetty.websocket.client.io.ConnectPromise;
51  import org.eclipse.jetty.websocket.client.io.ConnectionManager;
52  import org.eclipse.jetty.websocket.client.io.UpgradeListener;
53  import org.eclipse.jetty.websocket.client.masks.Masker;
54  import org.eclipse.jetty.websocket.client.masks.RandomMasker;
55  import org.eclipse.jetty.websocket.common.SessionFactory;
56  import org.eclipse.jetty.websocket.common.WebSocketSession;
57  import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
58  import org.eclipse.jetty.websocket.common.events.EventDriver;
59  import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
60  import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
61  import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
62  
63  /**
64   * WebSocketClient provides a means of establishing connections to remote websocket endpoints.
65   */
66  public class WebSocketClient extends ContainerLifeCycle implements WebSocketContainerScope
67  {
68      private static final Logger LOG = Log.getLogger(WebSocketClient.class);
69  
70      private final WebSocketPolicy policy = WebSocketPolicy.newClientPolicy();
71      private final SslContextFactory sslContextFactory;
72      private final WebSocketExtensionFactory extensionRegistry;
73      private boolean daemon = false;
74      private EventDriverFactory eventDriverFactory;
75      private SessionFactory sessionFactory;
76      private ByteBufferPool bufferPool;
77      private Executor executor;
78      private DecoratedObjectFactory objectFactory;
79      private Scheduler scheduler;
80      private CookieStore cookieStore;
81      private ConnectionManager connectionManager;
82      private Masker masker;
83      private SocketAddress bindAddress;
84      private long connectTimeout = SelectorManager.DEFAULT_CONNECT_TIMEOUT;
85      private boolean dispatchIO = true;
86  
87      public WebSocketClient()
88      {
89          this((SslContextFactory)null,null);
90      }
91      
92      public WebSocketClient(Executor executor)
93      {
94          this(null,executor);
95      }
96      
97      public WebSocketClient(ByteBufferPool bufferPool)
98      {
99          this(null,null,bufferPool);
100     }
101 
102     public WebSocketClient(SslContextFactory sslContextFactory)
103     {
104         this(sslContextFactory,null);
105     }
106 
107     public WebSocketClient(SslContextFactory sslContextFactory, Executor executor)
108     {
109         this(sslContextFactory,executor,new MappedByteBufferPool());
110     }
111     
112     public WebSocketClient(WebSocketContainerScope scope)
113     {
114         this(scope.getSslContextFactory(), scope.getExecutor(), scope.getBufferPool(), scope.getObjectFactory());
115     }
116     
117     public WebSocketClient(WebSocketContainerScope scope, SslContextFactory sslContextFactory)
118     {
119         this(sslContextFactory, scope.getExecutor(), scope.getBufferPool(), scope.getObjectFactory());
120     }
121 
122     public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool)
123     {
124         this(sslContextFactory, executor, bufferPool, new DecoratedObjectFactory());
125     }
126 
127     public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool, DecoratedObjectFactory objectFactory)
128     {
129         this.executor = executor;
130         this.sslContextFactory = sslContextFactory;
131         this.bufferPool = bufferPool;
132         this.objectFactory = objectFactory;
133         this.extensionRegistry = new WebSocketExtensionFactory(this);
134         
135         this.masker = new RandomMasker();
136         this.eventDriverFactory = new EventDriverFactory(policy);
137         
138         addBean(this.executor);
139         addBean(this.sslContextFactory);
140         addBean(this.bufferPool);
141     }
142     
143     public Future<Session> connect(Object websocket, URI toUri) throws IOException
144     {
145         ClientUpgradeRequest request = new ClientUpgradeRequest(toUri);
146         request.setRequestURI(toUri);
147         request.setCookiesFrom(this.cookieStore);
148 
149         return connect(websocket,toUri,request);
150     }
151 
152     public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request) throws IOException
153     {
154         return connect(websocket,toUri,request,null);
155     }
156 
157     public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request, UpgradeListener upgradeListener) throws IOException
158     {
159         if (!isStarted())
160         {
161             throw new IllegalStateException(WebSocketClient.class.getSimpleName() + "@" + this.hashCode() + " is not started");
162         }
163 
164         // Validate websocket URI
165         if (!toUri.isAbsolute())
166         {
167             throw new IllegalArgumentException("WebSocket URI must be absolute");
168         }
169 
170         if (StringUtil.isBlank(toUri.getScheme()))
171         {
172             throw new IllegalArgumentException("WebSocket URI must include a scheme");
173         }
174 
175         String scheme = toUri.getScheme().toLowerCase(Locale.ENGLISH);
176         if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
177         {
178             throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
179         }
180 
181         request.setRequestURI(toUri);
182         request.setCookiesFrom(this.cookieStore);
183 
184         // Validate Requested Extensions
185         for (ExtensionConfig reqExt : request.getExtensions())
186         {
187             if (!extensionRegistry.isAvailable(reqExt.getName()))
188             {
189                 throw new IllegalArgumentException("Requested extension [" + reqExt.getName() + "] is not installed");
190             }
191         }
192 
193         if (LOG.isDebugEnabled())
194             LOG.debug("connect websocket {} to {}",websocket,toUri);
195 
196         // Grab Connection Manager
197         initializeClient();
198         ConnectionManager manager = getConnectionManager();
199 
200         // Setup Driver for user provided websocket
201         EventDriver driver = null;
202         if (websocket instanceof EventDriver)
203         {
204             // Use the EventDriver as-is
205             driver = (EventDriver)websocket;
206         }
207         else
208         {
209             // Wrap websocket with appropriate EventDriver
210             driver = eventDriverFactory.wrap(websocket);
211         }
212 
213         if (driver == null)
214         {
215             throw new IllegalStateException("Unable to identify as websocket object: " + websocket.getClass().getName());
216         }
217 
218         // Create the appropriate (physical vs virtual) connection task
219         ConnectPromise promise = manager.connect(this,driver,request);
220 
221         if (upgradeListener != null)
222         {
223             promise.setUpgradeListener(upgradeListener);
224         }
225 
226         if (LOG.isDebugEnabled())
227             LOG.debug("Connect Promise: {}",promise);
228 
229         // Execute the connection on the executor thread
230         executor.execute(promise);
231 
232         // Return the future
233         return promise;
234     }
235 
236     @Override
237     protected void doStart() throws Exception
238     {
239         if (LOG.isDebugEnabled())
240             LOG.debug("Starting {}",this);
241 
242         if (sslContextFactory != null)
243         {
244             addBean(sslContextFactory);
245         }
246 
247         String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
248 
249         if (bufferPool == null)
250         {
251             bufferPool = new MappedByteBufferPool();
252         }
253         addBean(bufferPool);
254 
255         if (scheduler == null)
256         {
257             scheduler = new ScheduledExecutorScheduler(name + "-scheduler",daemon);
258         }
259         addBean(scheduler);
260 
261         if (cookieStore == null)
262         {
263             cookieStore = new HttpCookieStore.Empty();
264         }
265 
266         if(this.sessionFactory == null)
267         {
268             this.sessionFactory = new WebSocketSessionFactory(this);
269         }
270         
271         if(this.objectFactory == null)
272         {
273             this.objectFactory = new DecoratedObjectFactory();
274         }
275 
276         super.doStart();
277         
278         if (LOG.isDebugEnabled())
279             LOG.debug("Started {}",this);
280     }
281 
282     @Override
283     protected void doStop() throws Exception
284     {
285         if (LOG.isDebugEnabled())
286             LOG.debug("Stopping {}",this);
287         
288         if (ShutdownThread.isRegistered(this))
289         {
290             ShutdownThread.deregister(this);
291         }
292 
293         if (cookieStore != null)
294         {
295             cookieStore.removeAll();
296             cookieStore = null;
297         }
298 
299         super.doStop();
300         
301         if (LOG.isDebugEnabled())
302             LOG.debug("Stopped {}",this);
303     }
304 
305     public boolean isDispatchIO()
306     {
307         return dispatchIO;
308     }
309 
310     /**
311      * Return the number of milliseconds for a timeout of an attempted write operation.
312      * 
313      * @return number of milliseconds for timeout of an attempted write operation
314      */
315     public long getAsyncWriteTimeout()
316     {
317         return this.policy.getAsyncWriteTimeout();
318     }
319 
320     public SocketAddress getBindAddress()
321     {
322         return bindAddress;
323     }
324 
325     public ByteBufferPool getBufferPool()
326     {
327         return bufferPool;
328     }
329 
330     public ConnectionManager getConnectionManager()
331     {
332         return connectionManager;
333     }
334 
335     public long getConnectTimeout()
336     {
337         return connectTimeout;
338     }
339 
340     public CookieStore getCookieStore()
341     {
342         return cookieStore;
343     }
344     
345     public EventDriverFactory getEventDriverFactory()
346     {
347         return eventDriverFactory;
348     }
349 
350     public Executor getExecutor()
351     {
352         return executor;
353     }
354 
355     public ExtensionFactory getExtensionFactory()
356     {
357         return extensionRegistry;
358     }
359 
360     public Masker getMasker()
361     {
362         return masker;
363     }
364 
365     /**
366      * Get the maximum size for buffering of a binary message.
367      * 
368      * @return the maximum size of a binary message buffer.
369      */
370     public int getMaxBinaryMessageBufferSize()
371     {
372         return this.policy.getMaxBinaryMessageBufferSize();
373     }
374 
375     /**
376      * Get the maximum size for a binary message.
377      * 
378      * @return the maximum size of a binary message.
379      */
380     public long getMaxBinaryMessageSize()
381     {
382         return this.policy.getMaxBinaryMessageSize();
383     }
384 
385     /**
386      * Get the max idle timeout for new connections.
387      * 
388      * @return the max idle timeout in milliseconds for new connections.
389      */
390     public long getMaxIdleTimeout()
391     {
392         return this.policy.getIdleTimeout();
393     }
394 
395     /**
396      * Get the maximum size for buffering of a text message.
397      * 
398      * @return the maximum size of a text message buffer.
399      */
400     public int getMaxTextMessageBufferSize()
401     {
402         return this.policy.getMaxTextMessageBufferSize();
403     }
404 
405     /**
406      * Get the maximum size for a text message.
407      * 
408      * @return the maximum size of a text message.
409      */
410     public long getMaxTextMessageSize()
411     {
412         return this.policy.getMaxTextMessageSize();
413     }
414 
415     @Override
416     public DecoratedObjectFactory getObjectFactory()
417     {
418         return this.objectFactory;
419     }
420 
421     public Set<WebSocketSession> getOpenSessions()
422     {
423         return Collections.unmodifiableSet(new HashSet<>(getBeans(WebSocketSession.class)));
424     }
425 
426     public WebSocketPolicy getPolicy()
427     {
428         return this.policy;
429     }
430 
431     public Scheduler getScheduler()
432     {
433         return scheduler;
434     }
435     
436     public SessionFactory getSessionFactory()
437     {
438         return sessionFactory;
439     }
440 
441     /**
442      * @return the {@link SslContextFactory} that manages TLS encryption
443      * @see #WebSocketClient(SslContextFactory)
444      */
445     public SslContextFactory getSslContextFactory()
446     {
447         return sslContextFactory;
448     }
449 
450     private synchronized void initializeClient() throws IOException
451     {
452         if (!ShutdownThread.isRegistered(this))
453         {
454             ShutdownThread.register(this);
455         }
456 
457         if (executor == null)
458         {
459             QueuedThreadPool threadPool = new QueuedThreadPool();
460             String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
461             threadPool.setName(name);
462             threadPool.setDaemon(daemon);
463             executor = threadPool;
464             addManaged(threadPool);
465         }
466         else
467         {
468             addBean(executor,false);
469         }
470 
471         if (connectionManager == null)
472         {
473             connectionManager = newConnectionManager();
474             addManaged(connectionManager);
475         }
476     }
477 
478     /**
479      * Factory method for new ConnectionManager (used by other projects like cometd)
480      * 
481      * @return the ConnectionManager instance to use
482      */
483     protected ConnectionManager newConnectionManager()
484     {
485         return new ConnectionManager(this);
486     }
487 
488     @Override
489     public void onSessionClosed(WebSocketSession session)
490     {
491         if (LOG.isDebugEnabled())
492             LOG.debug("Session Closed: {}",session);
493         removeBean(session);
494     }
495 
496     @Override
497     public void onSessionOpened(WebSocketSession session)
498     {
499         if (LOG.isDebugEnabled())
500             LOG.debug("Session Opened: {}",session);
501         addManaged(session);
502     }
503     
504     public void setAsyncWriteTimeout(long ms)
505     {
506         this.policy.setAsyncWriteTimeout(ms);
507     }
508 
509     /**
510      * @param bindAddress the address to bind to
511      * @deprecated use {@link #setBindAddress(SocketAddress)} instead
512      */
513     @Deprecated
514     public void setBindAdddress(SocketAddress bindAddress)
515     {
516         setBindAddress(bindAddress);
517     }
518 
519     public void setBindAddress(SocketAddress bindAddress)
520     {
521         this.bindAddress = bindAddress;
522     }
523 
524     public void setBufferPool(ByteBufferPool bufferPool)
525     {
526         this.bufferPool = bufferPool;
527     }
528 
529     /**
530      * Set the timeout for connecting to the remote server.
531      * 
532      * @param ms
533      *            the timeout in milliseconds
534      */
535     public void setConnectTimeout(long ms)
536     {
537         if (ms < 0)
538         {
539             throw new IllegalStateException("Connect Timeout cannot be negative");
540         }
541         this.connectTimeout = ms;
542     }
543 
544     public void setCookieStore(CookieStore cookieStore)
545     {
546         this.cookieStore = cookieStore;
547     }
548     
549     public void setDaemon(boolean daemon)
550     {
551         this.daemon = daemon;
552     }
553 
554     public void setDispatchIO(boolean dispatchIO)
555     {
556         this.dispatchIO = dispatchIO;
557     }
558 
559     public void setEventDriverFactory(EventDriverFactory factory)
560     {
561         this.eventDriverFactory = factory;
562     }
563 
564     public void setExecutor(Executor executor)
565     {
566         updateBean(this.executor,executor);
567         this.executor = executor;
568     }
569 
570     public void setMasker(Masker masker)
571     {
572         this.masker = masker;
573     }
574 
575     public void setMaxBinaryMessageBufferSize(int max)
576     {
577         this.policy.setMaxBinaryMessageBufferSize(max);
578     }
579     
580     /**
581      * Set the max idle timeout for new connections.
582      * <p>
583      * Existing connections will not have their max idle timeout adjusted.
584      * 
585      * @param ms
586      *            the timeout in milliseconds
587      */
588     public void setMaxIdleTimeout(long ms)
589     {
590         this.policy.setIdleTimeout(ms);
591     }
592 
593     public void setMaxTextMessageBufferSize(int max)
594     {
595         this.policy.setMaxTextMessageBufferSize(max);
596     }
597 
598     public void setSessionFactory(SessionFactory sessionFactory)
599     {
600         this.sessionFactory = sessionFactory;
601     }
602 
603     @Override
604     public void dump(Appendable out, String indent) throws IOException
605     {
606         dumpThis(out);
607         dump(out, indent, getOpenSessions());
608     }
609 }