View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.client;
20  
21  import java.io.IOException;
22  import java.net.CookieStore;
23  import java.net.SocketAddress;
24  import java.net.URI;
25  import java.util.ArrayList;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Locale;
29  import java.util.Set;
30  import java.util.concurrent.Executor;
31  import java.util.concurrent.Future;
32  
33  import org.eclipse.jetty.io.ByteBufferPool;
34  import org.eclipse.jetty.io.MappedByteBufferPool;
35  import org.eclipse.jetty.io.SelectorManager;
36  import org.eclipse.jetty.util.HttpCookieStore;
37  import org.eclipse.jetty.util.StringUtil;
38  import org.eclipse.jetty.util.component.ContainerLifeCycle;
39  import org.eclipse.jetty.util.log.Log;
40  import org.eclipse.jetty.util.log.Logger;
41  import org.eclipse.jetty.util.ssl.SslContextFactory;
42  import org.eclipse.jetty.util.thread.QueuedThreadPool;
43  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
44  import org.eclipse.jetty.util.thread.Scheduler;
45  import org.eclipse.jetty.util.thread.ShutdownThread;
46  import org.eclipse.jetty.websocket.api.Session;
47  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48  import org.eclipse.jetty.websocket.api.extensions.Extension;
49  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
50  import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
51  import org.eclipse.jetty.websocket.client.io.ConnectPromise;
52  import org.eclipse.jetty.websocket.client.io.ConnectionManager;
53  import org.eclipse.jetty.websocket.client.io.UpgradeListener;
54  import org.eclipse.jetty.websocket.client.masks.Masker;
55  import org.eclipse.jetty.websocket.client.masks.RandomMasker;
56  import org.eclipse.jetty.websocket.common.SessionFactory;
57  import org.eclipse.jetty.websocket.common.SessionListener;
58  import org.eclipse.jetty.websocket.common.WebSocketSession;
59  import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
60  import org.eclipse.jetty.websocket.common.events.EventDriver;
61  import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
62  import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
63  
64  /**
65   * WebSocketClient provides a means of establishing connections to remote websocket endpoints.
66   */
67  public class WebSocketClient extends ContainerLifeCycle implements SessionListener
68  {
69      private static final Logger LOG = Log.getLogger(WebSocketClient.class);
70  
71      private final WebSocketPolicy policy;
72      private final SslContextFactory sslContextFactory;
73      private final WebSocketExtensionFactory extensionRegistry;
74      private boolean daemon = false;
75      private EventDriverFactory eventDriverFactory;
76      private SessionFactory sessionFactory;
77      private ByteBufferPool bufferPool;
78      private Executor executor;
79      private Scheduler scheduler;
80      private CookieStore cookieStore;
81      private ConnectionManager connectionManager;
82      private Masker masker;
83      private SocketAddress bindAddress;
84      private long connectTimeout = SelectorManager.DEFAULT_CONNECT_TIMEOUT;
85  
86      public WebSocketClient()
87      {
88          this(null,null);
89      }
90  
91      public WebSocketClient(Executor executor)
92      {
93          this(null,executor);
94      }
95      
96      public WebSocketClient(ByteBufferPool bufferPool)
97      {
98          this(null,null,bufferPool);
99      }
100 
101     public WebSocketClient(SslContextFactory sslContextFactory)
102     {
103         this(sslContextFactory,null);
104     }
105 
106     public WebSocketClient(SslContextFactory sslContextFactory, Executor executor)
107     {
108         this(sslContextFactory,executor,new MappedByteBufferPool());
109     }
110     
111     public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool)
112     {
113         this.executor = executor;
114         this.sslContextFactory = sslContextFactory;
115         this.policy = WebSocketPolicy.newClientPolicy();
116         this.bufferPool = bufferPool;
117         this.extensionRegistry = new WebSocketExtensionFactory(policy,bufferPool);
118         
119         // Bug #431459 - unregistering compression extensions till they are more stable
120         this.extensionRegistry.unregister("deflate-frame");
121         this.extensionRegistry.unregister("permessage-deflate");
122         this.extensionRegistry.unregister("x-webkit-deflate-frame");
123         
124         this.masker = new RandomMasker();
125         this.eventDriverFactory = new EventDriverFactory(policy);
126         this.sessionFactory = new WebSocketSessionFactory(this);
127         
128         addBean(this.executor);
129         addBean(this.sslContextFactory);
130         addBean(this.bufferPool);
131     }
132 
133     public Future<Session> connect(Object websocket, URI toUri) throws IOException
134     {
135         ClientUpgradeRequest request = new ClientUpgradeRequest(toUri);
136         request.setRequestURI(toUri);
137         request.setCookiesFrom(this.cookieStore);
138 
139         return connect(websocket,toUri,request);
140     }
141 
142     public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request) throws IOException
143     {
144         return connect(websocket,toUri,request,null);
145     }
146 
147     public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request, UpgradeListener upgradeListener) throws IOException
148     {
149         if (!isStarted())
150         {
151             throw new IllegalStateException(WebSocketClient.class.getSimpleName() + "@" + this.hashCode() + " is not started");
152         }
153 
154         // Validate websocket URI
155         if (!toUri.isAbsolute())
156         {
157             throw new IllegalArgumentException("WebSocket URI must be absolute");
158         }
159 
160         if (StringUtil.isBlank(toUri.getScheme()))
161         {
162             throw new IllegalArgumentException("WebSocket URI must include a scheme");
163         }
164 
165         String scheme = toUri.getScheme().toLowerCase(Locale.ENGLISH);
166         if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
167         {
168             throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
169         }
170 
171         request.setRequestURI(toUri);
172         request.setCookiesFrom(this.cookieStore);
173 
174         // Validate Requested Extensions
175         for (ExtensionConfig reqExt : request.getExtensions())
176         {
177             if (!extensionRegistry.isAvailable(reqExt.getName()))
178             {
179                 throw new IllegalArgumentException("Requested extension [" + reqExt.getName() + "] is not installed");
180             }
181         }
182 
183         // Validate websocket URI
184         LOG.debug("connect websocket {} to {}",websocket,toUri);
185 
186         // Grab Connection Manager
187         initialiseClient();
188         ConnectionManager manager = getConnectionManager();
189 
190         // Setup Driver for user provided websocket
191         EventDriver driver = null;
192         if (websocket instanceof EventDriver)
193         {
194             // Use the EventDriver as-is
195             driver = (EventDriver)websocket;
196         }
197         else
198         {
199             // Wrap websocket with appropriate EventDriver
200             driver = eventDriverFactory.wrap(websocket);
201         }
202 
203         if (driver == null)
204         {
205             throw new IllegalStateException("Unable to identify as websocket object: " + websocket.getClass().getName());
206         }
207 
208         // Create the appropriate (physical vs virtual) connection task
209         ConnectPromise promise = manager.connect(this,driver,request);
210 
211         if (upgradeListener != null)
212         {
213             promise.setUpgradeListener(upgradeListener);
214         }
215 
216         LOG.debug("Connect Promise: {}",promise);
217 
218         // Execute the connection on the executor thread
219         executor.execute(promise);
220 
221         // Return the future
222         return promise;
223     }
224 
225     @Override
226     protected void doStart() throws Exception
227     {
228         LOG.debug("Starting {}",this);
229 
230         if (sslContextFactory != null)
231         {
232             addBean(sslContextFactory);
233         }
234 
235         String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
236 
237         if (bufferPool == null)
238         {
239             bufferPool = new MappedByteBufferPool();
240         }
241         addBean(bufferPool);
242 
243         if (scheduler == null)
244         {
245             scheduler = new ScheduledExecutorScheduler(name + "-scheduler",daemon);
246         }
247         addBean(scheduler);
248 
249         if (cookieStore == null)
250         {
251             cookieStore = new HttpCookieStore.Empty();
252         }
253 
254         super.doStart();
255 
256         LOG.debug("Started {}",this);
257     }
258 
259     @Override
260     protected void doStop() throws Exception
261     {
262         LOG.debug("Stopping {}",this);
263 
264         if (cookieStore != null)
265         {
266             cookieStore.removeAll();
267             cookieStore = null;
268         }
269 
270         super.doStop();
271         LOG.debug("Stopped {}",this);
272     }
273 
274     /**
275      * Return the number of milliseconds for a timeout of an attempted write operation.
276      * 
277      * @return number of milliseconds for timeout of an attempted write operation
278      */
279     public long getAsyncWriteTimeout()
280     {
281         return this.policy.getAsyncWriteTimeout();
282     }
283 
284     public SocketAddress getBindAddress()
285     {
286         return bindAddress;
287     }
288 
289     public ByteBufferPool getBufferPool()
290     {
291         return bufferPool;
292     }
293 
294     public ConnectionManager getConnectionManager()
295     {
296         return connectionManager;
297     }
298 
299     public long getConnectTimeout()
300     {
301         return connectTimeout;
302     }
303 
304     public CookieStore getCookieStore()
305     {
306         return cookieStore;
307     }
308     
309     public EventDriverFactory getEventDriverFactory()
310     {
311         return eventDriverFactory;
312     }
313 
314     public Executor getExecutor()
315     {
316         return executor;
317     }
318 
319     public ExtensionFactory getExtensionFactory()
320     {
321         return extensionRegistry;
322     }
323 
324     public Masker getMasker()
325     {
326         return masker;
327     }
328 
329     /**
330      * Get the maximum size for buffering of a binary message.
331      * 
332      * @return the maximum size of a binary message buffer.
333      */
334     public int getMaxBinaryMessageBufferSize()
335     {
336         return this.policy.getMaxBinaryMessageBufferSize();
337     }
338 
339     /**
340      * Get the maximum size for a binary message.
341      * 
342      * @return the maximum size of a binary message.
343      */
344     public long getMaxBinaryMessageSize()
345     {
346         return this.policy.getMaxBinaryMessageSize();
347     }
348 
349     /**
350      * Get the max idle timeout for new connections.
351      * 
352      * @return the max idle timeout in milliseconds for new connections.
353      */
354     public long getMaxIdleTimeout()
355     {
356         return this.policy.getIdleTimeout();
357     }
358 
359     /**
360      * Get the maximum size for buffering of a text message.
361      * 
362      * @return the maximum size of a text message buffer.
363      */
364     public int getMaxTextMessageBufferSize()
365     {
366         return this.policy.getMaxTextMessageBufferSize();
367     }
368 
369     /**
370      * Get the maximum size for a text message.
371      * 
372      * @return the maximum size of a text message.
373      */
374     public long getMaxTextMessageSize()
375     {
376         return this.policy.getMaxTextMessageSize();
377     }
378 
379     public Set<WebSocketSession> getOpenSessions()
380     {
381         return new HashSet<>(getBeans(WebSocketSession.class));
382     }
383 
384     public WebSocketPolicy getPolicy()
385     {
386         return this.policy;
387     }
388 
389     public Scheduler getScheduler()
390     {
391         return scheduler;
392     }
393 
394     public SessionFactory getSessionFactory()
395     {
396         return sessionFactory;
397     }
398 
399     /**
400      * @return the {@link SslContextFactory} that manages TLS encryption
401      * @see #WebSocketClient(SslContextFactory)
402      */
403     public SslContextFactory getSslContextFactory()
404     {
405         return sslContextFactory;
406     }
407 
408     public List<Extension> initExtensions(List<ExtensionConfig> requested)
409     {
410         List<Extension> extensions = new ArrayList<Extension>();
411 
412         for (ExtensionConfig cfg : requested)
413         {
414             Extension extension = extensionRegistry.newInstance(cfg);
415 
416             if (extension == null)
417             {
418                 continue;
419             }
420 
421             LOG.debug("added {}",extension);
422             extensions.add(extension);
423         }
424         LOG.debug("extensions={}",extensions);
425         return extensions;
426     }
427 
428     private synchronized void initialiseClient() throws IOException
429     {
430         ShutdownThread.register(this);
431 
432         if (executor == null)
433         {
434             QueuedThreadPool threadPool = new QueuedThreadPool();
435             String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
436             threadPool.setName(name);
437             threadPool.setDaemon(daemon);
438             executor = threadPool;
439             addBean(executor,true);
440         }
441         else
442         {
443             addBean(executor,false);
444         }
445 
446         if (connectionManager != null)
447         {
448             return;
449         }
450         try
451         {
452             connectionManager = newConnectionManager();
453             addBean(connectionManager);
454             connectionManager.start();
455         }
456         catch (IOException e)
457         {
458             throw e;
459         }
460         catch (Exception e)
461         {
462             throw new IOException(e);
463         }
464     }
465 
466     /**
467      * Factory method for new ConnectionManager (used by other projects like cometd)
468      * 
469      * @return the ConnectionManager instance to use
470      */
471     protected ConnectionManager newConnectionManager()
472     {
473         return new ConnectionManager(this);
474     }
475 
476     @Override
477     public void onSessionClosed(WebSocketSession session)
478     {
479         LOG.debug("Session Closed: {}",session);
480         removeBean(session);
481     }
482 
483     @Override
484     public void onSessionOpened(WebSocketSession session)
485     {
486         LOG.debug("Session Opened: {}",session);
487     }
488 
489     public void setAsyncWriteTimeout(long ms)
490     {
491         this.policy.setAsyncWriteTimeout(ms);
492     }
493 
494     public void setBindAdddress(SocketAddress bindAddress)
495     {
496         this.bindAddress = bindAddress;
497     }
498 
499     public void setBufferPool(ByteBufferPool bufferPool)
500     {
501         this.bufferPool = bufferPool;
502     }
503 
504     /**
505      * Set the timeout for connecting to the remote server.
506      * 
507      * @param ms
508      *            the timeout in milliseconds
509      */
510     public void setConnectTimeout(long ms)
511     {
512         if (ms < 0)
513         {
514             throw new IllegalStateException("Connect Timeout cannot be negative");
515         }
516         this.connectTimeout = ms;
517     }
518 
519     public void setCookieStore(CookieStore cookieStore)
520     {
521         this.cookieStore = cookieStore;
522     }
523     
524     public void setDaemon(boolean daemon)
525     {
526         this.daemon = daemon;
527     }
528 
529     public void setEventDriverFactory(EventDriverFactory factory)
530     {
531         this.eventDriverFactory = factory;
532     }
533 
534     public void setExecutor(Executor executor)
535     {
536         updateBean(this.executor,executor);
537         this.executor = executor;
538     }
539 
540     public void setMasker(Masker masker)
541     {
542         this.masker = masker;
543     }
544 
545     public void setMaxBinaryMessageBufferSize(int max)
546     {
547         this.policy.setMaxBinaryMessageBufferSize(max);
548     }
549     
550     /**
551      * Set the max idle timeout for new connections.
552      * <p>
553      * Existing connections will not have their max idle timeout adjusted.
554      * 
555      * @param ms
556      *            the timeout in milliseconds
557      */
558     public void setMaxIdleTimeout(long ms)
559     {
560         this.policy.setIdleTimeout(ms);
561     }
562 
563     public void setMaxTextMessageBufferSize(int max)
564     {
565         this.policy.setMaxTextMessageBufferSize(max);
566     }
567 
568     public void setSessionFactory(SessionFactory sessionFactory)
569     {
570         this.sessionFactory = sessionFactory;
571     }
572 
573     @Override
574     public void dump(Appendable out, String indent) throws IOException
575     {
576         dumpThis(out);
577         dump(out, indent, getOpenSessions());
578     }
579 }