View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.client;
20  
21  import java.io.IOException;
22  import java.net.CookieStore;
23  import java.net.SocketAddress;
24  import java.net.URI;
25  import java.util.Collections;
26  import java.util.HashSet;
27  import java.util.Locale;
28  import java.util.Set;
29  import java.util.concurrent.Executor;
30  import java.util.concurrent.Future;
31  
32  import org.eclipse.jetty.io.ByteBufferPool;
33  import org.eclipse.jetty.io.MappedByteBufferPool;
34  import org.eclipse.jetty.io.SelectorManager;
35  import org.eclipse.jetty.util.DecoratedObjectFactory;
36  import org.eclipse.jetty.util.HttpCookieStore;
37  import org.eclipse.jetty.util.StringUtil;
38  import org.eclipse.jetty.util.component.ContainerLifeCycle;
39  import org.eclipse.jetty.util.log.Log;
40  import org.eclipse.jetty.util.log.Logger;
41  import org.eclipse.jetty.util.ssl.SslContextFactory;
42  import org.eclipse.jetty.util.thread.QueuedThreadPool;
43  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
44  import org.eclipse.jetty.util.thread.Scheduler;
45  import org.eclipse.jetty.util.thread.ShutdownThread;
46  import org.eclipse.jetty.websocket.api.Session;
47  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
49  import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
50  import org.eclipse.jetty.websocket.client.io.ConnectPromise;
51  import org.eclipse.jetty.websocket.client.io.ConnectionManager;
52  import org.eclipse.jetty.websocket.client.io.UpgradeListener;
53  import org.eclipse.jetty.websocket.client.masks.Masker;
54  import org.eclipse.jetty.websocket.client.masks.RandomMasker;
55  import org.eclipse.jetty.websocket.common.SessionFactory;
56  import org.eclipse.jetty.websocket.common.SessionListener;
57  import org.eclipse.jetty.websocket.common.WebSocketSession;
58  import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
59  import org.eclipse.jetty.websocket.common.events.EventDriver;
60  import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
61  import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
62  import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
63  
64  /**
65   * WebSocketClient provides a means of establishing connections to remote websocket endpoints.
66   */
67  public class WebSocketClient extends ContainerLifeCycle implements SessionListener, WebSocketContainerScope
68  {
69      private static final Logger LOG = Log.getLogger(WebSocketClient.class);
70  
71      private final WebSocketPolicy policy = WebSocketPolicy.newClientPolicy();
72      private final SslContextFactory sslContextFactory;
73      private final WebSocketExtensionFactory extensionRegistry;
74      private boolean daemon = false;
75      private EventDriverFactory eventDriverFactory;
76      private SessionFactory sessionFactory;
77      private ByteBufferPool bufferPool;
78      private Executor executor;
79      private DecoratedObjectFactory objectFactory;
80      private Scheduler scheduler;
81      private CookieStore cookieStore;
82      private ConnectionManager connectionManager;
83      private Masker masker;
84      private SocketAddress bindAddress;
85      private long connectTimeout = SelectorManager.DEFAULT_CONNECT_TIMEOUT;
86      private boolean dispatchIO = true;
87  
88      public WebSocketClient()
89      {
90          this((SslContextFactory)null,null);
91      }
92      
93      public WebSocketClient(Executor executor)
94      {
95          this(null,executor);
96      }
97      
98      public WebSocketClient(ByteBufferPool bufferPool)
99      {
100         this(null,null,bufferPool);
101     }
102 
103     public WebSocketClient(SslContextFactory sslContextFactory)
104     {
105         this(sslContextFactory,null);
106     }
107 
108     public WebSocketClient(SslContextFactory sslContextFactory, Executor executor)
109     {
110         this(sslContextFactory,executor,new MappedByteBufferPool());
111     }
112     
113     public WebSocketClient(WebSocketContainerScope scope)
114     {
115         this(scope.getSslContextFactory(), scope.getExecutor(), scope.getBufferPool(), scope.getObjectFactory());
116     }
117     
118     public WebSocketClient(WebSocketContainerScope scope, SslContextFactory sslContextFactory)
119     {
120         this(sslContextFactory, scope.getExecutor(), scope.getBufferPool(), scope.getObjectFactory());
121     }
122 
123     public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool)
124     {
125         this(sslContextFactory, executor, bufferPool, new DecoratedObjectFactory());
126     }
127 
128     public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool, DecoratedObjectFactory objectFactory)
129     {
130         this.executor = executor;
131         this.sslContextFactory = sslContextFactory;
132         this.bufferPool = bufferPool;
133         this.objectFactory = objectFactory;
134         this.extensionRegistry = new WebSocketExtensionFactory(this);
135         
136         this.masker = new RandomMasker();
137         this.eventDriverFactory = new EventDriverFactory(policy);
138         
139         addBean(this.executor);
140         addBean(this.sslContextFactory);
141         addBean(this.bufferPool);
142     }
143     
144     public Future<Session> connect(Object websocket, URI toUri) throws IOException
145     {
146         ClientUpgradeRequest request = new ClientUpgradeRequest(toUri);
147         request.setRequestURI(toUri);
148         request.setCookiesFrom(this.cookieStore);
149 
150         return connect(websocket,toUri,request);
151     }
152 
153     public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request) throws IOException
154     {
155         return connect(websocket,toUri,request,null);
156     }
157 
158     public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request, UpgradeListener upgradeListener) throws IOException
159     {
160         if (!isStarted())
161         {
162             throw new IllegalStateException(WebSocketClient.class.getSimpleName() + "@" + this.hashCode() + " is not started");
163         }
164 
165         // Validate websocket URI
166         if (!toUri.isAbsolute())
167         {
168             throw new IllegalArgumentException("WebSocket URI must be absolute");
169         }
170 
171         if (StringUtil.isBlank(toUri.getScheme()))
172         {
173             throw new IllegalArgumentException("WebSocket URI must include a scheme");
174         }
175 
176         String scheme = toUri.getScheme().toLowerCase(Locale.ENGLISH);
177         if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
178         {
179             throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
180         }
181 
182         request.setRequestURI(toUri);
183         request.setCookiesFrom(this.cookieStore);
184 
185         // Validate Requested Extensions
186         for (ExtensionConfig reqExt : request.getExtensions())
187         {
188             if (!extensionRegistry.isAvailable(reqExt.getName()))
189             {
190                 throw new IllegalArgumentException("Requested extension [" + reqExt.getName() + "] is not installed");
191             }
192         }
193 
194         if (LOG.isDebugEnabled())
195             LOG.debug("connect websocket {} to {}",websocket,toUri);
196 
197         // Grab Connection Manager
198         initializeClient();
199         ConnectionManager manager = getConnectionManager();
200 
201         // Setup Driver for user provided websocket
202         EventDriver driver = null;
203         if (websocket instanceof EventDriver)
204         {
205             // Use the EventDriver as-is
206             driver = (EventDriver)websocket;
207         }
208         else
209         {
210             // Wrap websocket with appropriate EventDriver
211             driver = eventDriverFactory.wrap(websocket);
212         }
213 
214         if (driver == null)
215         {
216             throw new IllegalStateException("Unable to identify as websocket object: " + websocket.getClass().getName());
217         }
218 
219         // Create the appropriate (physical vs virtual) connection task
220         ConnectPromise promise = manager.connect(this,driver,request);
221 
222         if (upgradeListener != null)
223         {
224             promise.setUpgradeListener(upgradeListener);
225         }
226 
227         if (LOG.isDebugEnabled())
228             LOG.debug("Connect Promise: {}",promise);
229 
230         // Execute the connection on the executor thread
231         executor.execute(promise);
232 
233         // Return the future
234         return promise;
235     }
236 
237     @Override
238     protected void doStart() throws Exception
239     {
240         if (LOG.isDebugEnabled())
241             LOG.debug("Starting {}",this);
242 
243         if (sslContextFactory != null)
244         {
245             addBean(sslContextFactory);
246         }
247 
248         String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
249 
250         if (bufferPool == null)
251         {
252             bufferPool = new MappedByteBufferPool();
253         }
254         addBean(bufferPool);
255 
256         if (scheduler == null)
257         {
258             scheduler = new ScheduledExecutorScheduler(name + "-scheduler",daemon);
259         }
260         addBean(scheduler);
261 
262         if (cookieStore == null)
263         {
264             cookieStore = new HttpCookieStore.Empty();
265         }
266 
267         if(this.sessionFactory == null)
268         {
269             this.sessionFactory = new WebSocketSessionFactory(this);
270         }
271         
272         if(this.objectFactory == null)
273         {
274             this.objectFactory = new DecoratedObjectFactory();
275         }
276 
277         super.doStart();
278         
279         if (LOG.isDebugEnabled())
280             LOG.debug("Started {}",this);
281     }
282 
283     @Override
284     protected void doStop() throws Exception
285     {
286         if (LOG.isDebugEnabled())
287             LOG.debug("Stopping {}",this);
288         
289         if (ShutdownThread.isRegistered(this))
290         {
291             ShutdownThread.deregister(this);
292         }
293 
294         if (cookieStore != null)
295         {
296             cookieStore.removeAll();
297             cookieStore = null;
298         }
299 
300         super.doStop();
301         
302         if (LOG.isDebugEnabled())
303             LOG.debug("Stopped {}",this);
304     }
305 
306     public boolean isDispatchIO()
307     {
308         return dispatchIO;
309     }
310 
311     /**
312      * Return the number of milliseconds for a timeout of an attempted write operation.
313      * 
314      * @return number of milliseconds for timeout of an attempted write operation
315      */
316     public long getAsyncWriteTimeout()
317     {
318         return this.policy.getAsyncWriteTimeout();
319     }
320 
321     public SocketAddress getBindAddress()
322     {
323         return bindAddress;
324     }
325 
326     public ByteBufferPool getBufferPool()
327     {
328         return bufferPool;
329     }
330 
331     public ConnectionManager getConnectionManager()
332     {
333         return connectionManager;
334     }
335 
336     public long getConnectTimeout()
337     {
338         return connectTimeout;
339     }
340 
341     public CookieStore getCookieStore()
342     {
343         return cookieStore;
344     }
345     
346     public EventDriverFactory getEventDriverFactory()
347     {
348         return eventDriverFactory;
349     }
350 
351     public Executor getExecutor()
352     {
353         return executor;
354     }
355 
356     public ExtensionFactory getExtensionFactory()
357     {
358         return extensionRegistry;
359     }
360 
361     public Masker getMasker()
362     {
363         return masker;
364     }
365 
366     /**
367      * Get the maximum size for buffering of a binary message.
368      * 
369      * @return the maximum size of a binary message buffer.
370      */
371     public int getMaxBinaryMessageBufferSize()
372     {
373         return this.policy.getMaxBinaryMessageBufferSize();
374     }
375 
376     /**
377      * Get the maximum size for a binary message.
378      * 
379      * @return the maximum size of a binary message.
380      */
381     public long getMaxBinaryMessageSize()
382     {
383         return this.policy.getMaxBinaryMessageSize();
384     }
385 
386     /**
387      * Get the max idle timeout for new connections.
388      * 
389      * @return the max idle timeout in milliseconds for new connections.
390      */
391     public long getMaxIdleTimeout()
392     {
393         return this.policy.getIdleTimeout();
394     }
395 
396     /**
397      * Get the maximum size for buffering of a text message.
398      * 
399      * @return the maximum size of a text message buffer.
400      */
401     public int getMaxTextMessageBufferSize()
402     {
403         return this.policy.getMaxTextMessageBufferSize();
404     }
405 
406     /**
407      * Get the maximum size for a text message.
408      * 
409      * @return the maximum size of a text message.
410      */
411     public long getMaxTextMessageSize()
412     {
413         return this.policy.getMaxTextMessageSize();
414     }
415 
416     @Override
417     public DecoratedObjectFactory getObjectFactory()
418     {
419         return this.objectFactory;
420     }
421 
422     public Set<WebSocketSession> getOpenSessions()
423     {
424         return Collections.unmodifiableSet(new HashSet<>(getBeans(WebSocketSession.class)));
425     }
426 
427     public WebSocketPolicy getPolicy()
428     {
429         return this.policy;
430     }
431 
432     public Scheduler getScheduler()
433     {
434         return scheduler;
435     }
436     
437     public SessionFactory getSessionFactory()
438     {
439         return sessionFactory;
440     }
441 
442     /**
443      * @return the {@link SslContextFactory} that manages TLS encryption
444      * @see #WebSocketClient(SslContextFactory)
445      */
446     public SslContextFactory getSslContextFactory()
447     {
448         return sslContextFactory;
449     }
450 
451     private synchronized void initializeClient() throws IOException
452     {
453         if (!ShutdownThread.isRegistered(this))
454         {
455             ShutdownThread.register(this);
456         }
457 
458         if (executor == null)
459         {
460             QueuedThreadPool threadPool = new QueuedThreadPool();
461             String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
462             threadPool.setName(name);
463             threadPool.setDaemon(daemon);
464             executor = threadPool;
465             addManaged(threadPool);
466         }
467         else
468         {
469             addBean(executor,false);
470         }
471 
472         if (connectionManager == null)
473         {
474             connectionManager = newConnectionManager();
475             addManaged(connectionManager);
476         }
477     }
478 
479     /**
480      * Factory method for new ConnectionManager (used by other projects like cometd)
481      * 
482      * @return the ConnectionManager instance to use
483      */
484     protected ConnectionManager newConnectionManager()
485     {
486         return new ConnectionManager(this);
487     }
488 
489     @Override
490     public void onSessionClosed(WebSocketSession session)
491     {
492         if (LOG.isDebugEnabled())
493             LOG.debug("Session Closed: {}",session);
494         removeBean(session);
495     }
496 
497     @Override
498     public void onSessionOpened(WebSocketSession session)
499     {
500         if (LOG.isDebugEnabled())
501             LOG.debug("Session Opened: {}",session);
502         addManaged(session);
503     }
504     
505     public void setAsyncWriteTimeout(long ms)
506     {
507         this.policy.setAsyncWriteTimeout(ms);
508     }
509 
510     /**
511      * @param bindAddress the address to bind to
512      * @deprecated use {@link #setBindAddress(SocketAddress)} instead
513      */
514     @Deprecated
515     public void setBindAdddress(SocketAddress bindAddress)
516     {
517         setBindAddress(bindAddress);
518     }
519 
520     public void setBindAddress(SocketAddress bindAddress)
521     {
522         this.bindAddress = bindAddress;
523     }
524 
525     public void setBufferPool(ByteBufferPool bufferPool)
526     {
527         this.bufferPool = bufferPool;
528     }
529 
530     /**
531      * Set the timeout for connecting to the remote server.
532      * 
533      * @param ms
534      *            the timeout in milliseconds
535      */
536     public void setConnectTimeout(long ms)
537     {
538         if (ms < 0)
539         {
540             throw new IllegalStateException("Connect Timeout cannot be negative");
541         }
542         this.connectTimeout = ms;
543     }
544 
545     public void setCookieStore(CookieStore cookieStore)
546     {
547         this.cookieStore = cookieStore;
548     }
549     
550     public void setDaemon(boolean daemon)
551     {
552         this.daemon = daemon;
553     }
554 
555     public void setDispatchIO(boolean dispatchIO)
556     {
557         this.dispatchIO = dispatchIO;
558     }
559 
560     public void setEventDriverFactory(EventDriverFactory factory)
561     {
562         this.eventDriverFactory = factory;
563     }
564 
565     public void setExecutor(Executor executor)
566     {
567         updateBean(this.executor,executor);
568         this.executor = executor;
569     }
570 
571     public void setMasker(Masker masker)
572     {
573         this.masker = masker;
574     }
575 
576     public void setMaxBinaryMessageBufferSize(int max)
577     {
578         this.policy.setMaxBinaryMessageBufferSize(max);
579     }
580     
581     /**
582      * Set the max idle timeout for new connections.
583      * <p>
584      * Existing connections will not have their max idle timeout adjusted.
585      * 
586      * @param ms
587      *            the timeout in milliseconds
588      */
589     public void setMaxIdleTimeout(long ms)
590     {
591         this.policy.setIdleTimeout(ms);
592     }
593 
594     public void setMaxTextMessageBufferSize(int max)
595     {
596         this.policy.setMaxTextMessageBufferSize(max);
597     }
598 
599     public void setSessionFactory(SessionFactory sessionFactory)
600     {
601         this.sessionFactory = sessionFactory;
602     }
603 
604     @Override
605     public void dump(Appendable out, String indent) throws IOException
606     {
607         dumpThis(out);
608         dump(out, indent, getOpenSessions());
609     }
610 }