View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.client;
20  
21  import java.io.IOException;
22  import java.net.CookieStore;
23  import java.net.SocketAddress;
24  import java.net.URI;
25  import java.util.ArrayList;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Locale;
29  import java.util.Set;
30  import java.util.concurrent.Executor;
31  import java.util.concurrent.Future;
32  
33  import org.eclipse.jetty.io.ByteBufferPool;
34  import org.eclipse.jetty.io.MappedByteBufferPool;
35  import org.eclipse.jetty.io.SelectorManager;
36  import org.eclipse.jetty.util.HttpCookieStore;
37  import org.eclipse.jetty.util.StringUtil;
38  import org.eclipse.jetty.util.component.ContainerLifeCycle;
39  import org.eclipse.jetty.util.log.Log;
40  import org.eclipse.jetty.util.log.Logger;
41  import org.eclipse.jetty.util.ssl.SslContextFactory;
42  import org.eclipse.jetty.util.thread.QueuedThreadPool;
43  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
44  import org.eclipse.jetty.util.thread.Scheduler;
45  import org.eclipse.jetty.util.thread.ShutdownThread;
46  import org.eclipse.jetty.websocket.api.Session;
47  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48  import org.eclipse.jetty.websocket.api.extensions.Extension;
49  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
50  import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
51  import org.eclipse.jetty.websocket.client.io.ConnectPromise;
52  import org.eclipse.jetty.websocket.client.io.ConnectionManager;
53  import org.eclipse.jetty.websocket.client.io.UpgradeListener;
54  import org.eclipse.jetty.websocket.client.masks.Masker;
55  import org.eclipse.jetty.websocket.client.masks.RandomMasker;
56  import org.eclipse.jetty.websocket.common.SessionFactory;
57  import org.eclipse.jetty.websocket.common.SessionListener;
58  import org.eclipse.jetty.websocket.common.WebSocketSession;
59  import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
60  import org.eclipse.jetty.websocket.common.events.EventDriver;
61  import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
62  import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
63  
64  /**
65   * WebSocketClient provides a means of establishing connections to remote websocket endpoints.
66   */
67  public class WebSocketClient extends ContainerLifeCycle implements SessionListener
68  {
69      private static final Logger LOG = Log.getLogger(WebSocketClient.class);
70  
71      private final WebSocketPolicy policy;
72      private final SslContextFactory sslContextFactory;
73      private final WebSocketExtensionFactory extensionRegistry;
74      private boolean daemon = false;
75      private EventDriverFactory eventDriverFactory;
76      private SessionFactory sessionFactory;
77      private ByteBufferPool bufferPool;
78      private Executor executor;
79      private Scheduler scheduler;
80      private CookieStore cookieStore;
81      private ConnectionManager connectionManager;
82      private Masker masker;
83      private SocketAddress bindAddress;
84      private long connectTimeout = SelectorManager.DEFAULT_CONNECT_TIMEOUT;
85  
86      public WebSocketClient()
87      {
88          this(null,null);
89      }
90  
91      public WebSocketClient(Executor executor)
92      {
93          this(null,executor);
94      }
95      
96      public WebSocketClient(ByteBufferPool bufferPool)
97      {
98          this(null,null,bufferPool);
99      }
100 
101     public WebSocketClient(SslContextFactory sslContextFactory)
102     {
103         this(sslContextFactory,null);
104     }
105 
106     public WebSocketClient(SslContextFactory sslContextFactory, Executor executor)
107     {
108         this(sslContextFactory,executor,new MappedByteBufferPool());
109     }
110     
111     public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool)
112     {
113         this.executor = executor;
114         this.sslContextFactory = sslContextFactory;
115         this.policy = WebSocketPolicy.newClientPolicy();
116         this.bufferPool = bufferPool;
117         this.extensionRegistry = new WebSocketExtensionFactory(policy,bufferPool);
118         
119         // Bug #431459 - unregistering compression extensions till they are more stable
120         this.extensionRegistry.unregister("deflate-frame");
121         this.extensionRegistry.unregister("permessage-deflate");
122         this.extensionRegistry.unregister("x-webkit-deflate-frame");
123         
124         this.masker = new RandomMasker();
125         this.eventDriverFactory = new EventDriverFactory(policy);
126         this.sessionFactory = new WebSocketSessionFactory(this);
127         
128         addBean(this.executor);
129         addBean(this.sslContextFactory);
130         addBean(this.bufferPool);
131     }
132 
133     public Future<Session> connect(Object websocket, URI toUri) throws IOException
134     {
135         ClientUpgradeRequest request = new ClientUpgradeRequest(toUri);
136         request.setRequestURI(toUri);
137         request.setCookiesFrom(this.cookieStore);
138 
139         return connect(websocket,toUri,request);
140     }
141 
142     public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request) throws IOException
143     {
144         return connect(websocket,toUri,request,null);
145     }
146 
147     public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request, UpgradeListener upgradeListener) throws IOException
148     {
149         if (!isStarted())
150         {
151             throw new IllegalStateException(WebSocketClient.class.getSimpleName() + "@" + this.hashCode() + " is not started");
152         }
153 
154         // Validate websocket URI
155         if (!toUri.isAbsolute())
156         {
157             throw new IllegalArgumentException("WebSocket URI must be absolute");
158         }
159 
160         if (StringUtil.isBlank(toUri.getScheme()))
161         {
162             throw new IllegalArgumentException("WebSocket URI must include a scheme");
163         }
164 
165         String scheme = toUri.getScheme().toLowerCase(Locale.ENGLISH);
166         if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
167         {
168             throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
169         }
170 
171         request.setRequestURI(toUri);
172         request.setCookiesFrom(this.cookieStore);
173 
174         // Validate Requested Extensions
175         for (ExtensionConfig reqExt : request.getExtensions())
176         {
177             if (!extensionRegistry.isAvailable(reqExt.getName()))
178             {
179                 throw new IllegalArgumentException("Requested extension [" + reqExt.getName() + "] is not installed");
180             }
181         }
182 
183         if (LOG.isDebugEnabled())
184             LOG.debug("connect websocket {} to {}",websocket,toUri);
185 
186         // Grab Connection Manager
187         initialiseClient();
188         ConnectionManager manager = getConnectionManager();
189 
190         // Setup Driver for user provided websocket
191         EventDriver driver = null;
192         if (websocket instanceof EventDriver)
193         {
194             // Use the EventDriver as-is
195             driver = (EventDriver)websocket;
196         }
197         else
198         {
199             // Wrap websocket with appropriate EventDriver
200             driver = eventDriverFactory.wrap(websocket);
201         }
202 
203         if (driver == null)
204         {
205             throw new IllegalStateException("Unable to identify as websocket object: " + websocket.getClass().getName());
206         }
207 
208         // Create the appropriate (physical vs virtual) connection task
209         ConnectPromise promise = manager.connect(this,driver,request);
210 
211         if (upgradeListener != null)
212         {
213             promise.setUpgradeListener(upgradeListener);
214         }
215 
216         if (LOG.isDebugEnabled())
217             LOG.debug("Connect Promise: {}",promise);
218 
219         // Execute the connection on the executor thread
220         executor.execute(promise);
221 
222         // Return the future
223         return promise;
224     }
225 
226     @Override
227     protected void doStart() throws Exception
228     {
229         if (LOG.isDebugEnabled())
230             LOG.debug("Starting {}",this);
231 
232         if (sslContextFactory != null)
233         {
234             addBean(sslContextFactory);
235         }
236 
237         String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
238 
239         if (bufferPool == null)
240         {
241             bufferPool = new MappedByteBufferPool();
242         }
243         addBean(bufferPool);
244 
245         if (scheduler == null)
246         {
247             scheduler = new ScheduledExecutorScheduler(name + "-scheduler",daemon);
248         }
249         addBean(scheduler);
250 
251         if (cookieStore == null)
252         {
253             cookieStore = new HttpCookieStore.Empty();
254         }
255 
256         super.doStart();
257 
258         if (LOG.isDebugEnabled())
259             LOG.debug("Started {}",this);
260     }
261 
262     @Override
263     protected void doStop() throws Exception
264     {
265         if (LOG.isDebugEnabled())
266             LOG.debug("Stopping {}",this);
267         
268         if (ShutdownThread.isRegistered(this))
269         {
270             ShutdownThread.deregister(this);
271         }
272 
273         if (cookieStore != null)
274         {
275             cookieStore.removeAll();
276             cookieStore = null;
277         }
278 
279         super.doStop();
280 
281         if (LOG.isDebugEnabled())
282             LOG.debug("Stopped {}",this);
283     }
284 
285     /**
286      * Return the number of milliseconds for a timeout of an attempted write operation.
287      * 
288      * @return number of milliseconds for timeout of an attempted write operation
289      */
290     public long getAsyncWriteTimeout()
291     {
292         return this.policy.getAsyncWriteTimeout();
293     }
294 
295     public SocketAddress getBindAddress()
296     {
297         return bindAddress;
298     }
299 
300     public ByteBufferPool getBufferPool()
301     {
302         return bufferPool;
303     }
304 
305     public ConnectionManager getConnectionManager()
306     {
307         return connectionManager;
308     }
309 
310     public long getConnectTimeout()
311     {
312         return connectTimeout;
313     }
314 
315     public CookieStore getCookieStore()
316     {
317         return cookieStore;
318     }
319     
320     public EventDriverFactory getEventDriverFactory()
321     {
322         return eventDriverFactory;
323     }
324 
325     public Executor getExecutor()
326     {
327         return executor;
328     }
329 
330     public ExtensionFactory getExtensionFactory()
331     {
332         return extensionRegistry;
333     }
334 
335     public Masker getMasker()
336     {
337         return masker;
338     }
339 
340     /**
341      * Get the maximum size for buffering of a binary message.
342      * 
343      * @return the maximum size of a binary message buffer.
344      */
345     public int getMaxBinaryMessageBufferSize()
346     {
347         return this.policy.getMaxBinaryMessageBufferSize();
348     }
349 
350     /**
351      * Get the maximum size for a binary message.
352      * 
353      * @return the maximum size of a binary message.
354      */
355     public long getMaxBinaryMessageSize()
356     {
357         return this.policy.getMaxBinaryMessageSize();
358     }
359 
360     /**
361      * Get the max idle timeout for new connections.
362      * 
363      * @return the max idle timeout in milliseconds for new connections.
364      */
365     public long getMaxIdleTimeout()
366     {
367         return this.policy.getIdleTimeout();
368     }
369 
370     /**
371      * Get the maximum size for buffering of a text message.
372      * 
373      * @return the maximum size of a text message buffer.
374      */
375     public int getMaxTextMessageBufferSize()
376     {
377         return this.policy.getMaxTextMessageBufferSize();
378     }
379 
380     /**
381      * Get the maximum size for a text message.
382      * 
383      * @return the maximum size of a text message.
384      */
385     public long getMaxTextMessageSize()
386     {
387         return this.policy.getMaxTextMessageSize();
388     }
389 
390     public Set<WebSocketSession> getOpenSessions()
391     {
392         return new HashSet<>(getBeans(WebSocketSession.class));
393     }
394 
395     public WebSocketPolicy getPolicy()
396     {
397         return this.policy;
398     }
399 
400     public Scheduler getScheduler()
401     {
402         return scheduler;
403     }
404 
405     public SessionFactory getSessionFactory()
406     {
407         return sessionFactory;
408     }
409 
410     /**
411      * @return the {@link SslContextFactory} that manages TLS encryption
412      * @see #WebSocketClient(SslContextFactory)
413      */
414     public SslContextFactory getSslContextFactory()
415     {
416         return sslContextFactory;
417     }
418 
419     public List<Extension> initExtensions(List<ExtensionConfig> requested)
420     {
421         List<Extension> extensions = new ArrayList<Extension>();
422 
423         for (ExtensionConfig cfg : requested)
424         {
425             Extension extension = extensionRegistry.newInstance(cfg);
426 
427             if (extension == null)
428             {
429                 continue;
430             }
431 
432             if (LOG.isDebugEnabled())
433                 LOG.debug("added {}",extension);
434             extensions.add(extension);
435         }
436         if (LOG.isDebugEnabled())
437             LOG.debug("extensions={}",extensions);
438         return extensions;
439     }
440 
441     private synchronized void initialiseClient() throws IOException
442     {
443         if (!ShutdownThread.isRegistered(this))
444         {
445             ShutdownThread.register(this);
446         }
447 
448         if (executor == null)
449         {
450             QueuedThreadPool threadPool = new QueuedThreadPool();
451             String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
452             threadPool.setName(name);
453             threadPool.setDaemon(daemon);
454             executor = threadPool;
455             addManaged(threadPool);
456         }
457         else
458         {
459             addBean(executor,false);
460         }
461 
462         if (connectionManager == null)
463         {
464             connectionManager = newConnectionManager();
465             addManaged(connectionManager);
466         }
467     }
468 
469     /**
470      * Factory method for new ConnectionManager (used by other projects like cometd)
471      * 
472      * @return the ConnectionManager instance to use
473      */
474     protected ConnectionManager newConnectionManager()
475     {
476         return new ConnectionManager(this);
477     }
478 
479     @Override
480     public void onSessionClosed(WebSocketSession session)
481     {
482         if (LOG.isDebugEnabled())
483             LOG.debug("Session Closed: {}",session);
484         removeBean(session);
485     }
486 
487     @Override
488     public void onSessionOpened(WebSocketSession session)
489     {
490         if (LOG.isDebugEnabled())
491             LOG.debug("Session Opened: {}",session);
492     }
493 
494     public void setAsyncWriteTimeout(long ms)
495     {
496         this.policy.setAsyncWriteTimeout(ms);
497     }
498 
499     public void setBindAdddress(SocketAddress bindAddress)
500     {
501         this.bindAddress = bindAddress;
502     }
503 
504     public void setBufferPool(ByteBufferPool bufferPool)
505     {
506         this.bufferPool = bufferPool;
507     }
508 
509     /**
510      * Set the timeout for connecting to the remote server.
511      * 
512      * @param ms
513      *            the timeout in milliseconds
514      */
515     public void setConnectTimeout(long ms)
516     {
517         if (ms < 0)
518         {
519             throw new IllegalStateException("Connect Timeout cannot be negative");
520         }
521         this.connectTimeout = ms;
522     }
523 
524     public void setCookieStore(CookieStore cookieStore)
525     {
526         this.cookieStore = cookieStore;
527     }
528     
529     public void setDaemon(boolean daemon)
530     {
531         this.daemon = daemon;
532     }
533 
534     public void setEventDriverFactory(EventDriverFactory factory)
535     {
536         this.eventDriverFactory = factory;
537     }
538 
539     public void setExecutor(Executor executor)
540     {
541         updateBean(this.executor,executor);
542         this.executor = executor;
543     }
544 
545     public void setMasker(Masker masker)
546     {
547         this.masker = masker;
548     }
549 
550     public void setMaxBinaryMessageBufferSize(int max)
551     {
552         this.policy.setMaxBinaryMessageBufferSize(max);
553     }
554     
555     /**
556      * Set the max idle timeout for new connections.
557      * <p>
558      * Existing connections will not have their max idle timeout adjusted.
559      * 
560      * @param ms
561      *            the timeout in milliseconds
562      */
563     public void setMaxIdleTimeout(long ms)
564     {
565         this.policy.setIdleTimeout(ms);
566     }
567 
568     public void setMaxTextMessageBufferSize(int max)
569     {
570         this.policy.setMaxTextMessageBufferSize(max);
571     }
572 
573     public void setSessionFactory(SessionFactory sessionFactory)
574     {
575         this.sessionFactory = sessionFactory;
576     }
577 
578     @Override
579     public void dump(Appendable out, String indent) throws IOException
580     {
581         dumpThis(out);
582         dump(out, indent, getOpenSessions());
583     }
584 }