View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.client;
20  
21  import java.io.IOException;
22  import java.net.CookieStore;
23  import java.net.SocketAddress;
24  import java.net.URI;
25  import java.util.ArrayList;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Locale;
29  import java.util.Set;
30  import java.util.concurrent.Executor;
31  import java.util.concurrent.Future;
32  
33  import org.eclipse.jetty.io.ByteBufferPool;
34  import org.eclipse.jetty.io.MappedByteBufferPool;
35  import org.eclipse.jetty.io.SelectorManager;
36  import org.eclipse.jetty.util.HttpCookieStore;
37  import org.eclipse.jetty.util.StringUtil;
38  import org.eclipse.jetty.util.component.ContainerLifeCycle;
39  import org.eclipse.jetty.util.component.LifeCycle;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  import org.eclipse.jetty.util.ssl.SslContextFactory;
43  import org.eclipse.jetty.util.thread.QueuedThreadPool;
44  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
45  import org.eclipse.jetty.util.thread.Scheduler;
46  import org.eclipse.jetty.util.thread.ShutdownThread;
47  import org.eclipse.jetty.websocket.api.Session;
48  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
49  import org.eclipse.jetty.websocket.api.extensions.Extension;
50  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
51  import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
52  import org.eclipse.jetty.websocket.client.io.ConnectPromise;
53  import org.eclipse.jetty.websocket.client.io.ConnectionManager;
54  import org.eclipse.jetty.websocket.client.io.UpgradeListener;
55  import org.eclipse.jetty.websocket.client.masks.Masker;
56  import org.eclipse.jetty.websocket.client.masks.RandomMasker;
57  import org.eclipse.jetty.websocket.common.SessionFactory;
58  import org.eclipse.jetty.websocket.common.SessionListener;
59  import org.eclipse.jetty.websocket.common.WebSocketSession;
60  import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
61  import org.eclipse.jetty.websocket.common.events.EventDriver;
62  import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
63  import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
64  
65  /**
66   * WebSocketClient provides a means of establishing connections to remote websocket endpoints.
67   */
68  public class WebSocketClient extends ContainerLifeCycle implements SessionListener
69  {
70      private static final Logger LOG = Log.getLogger(WebSocketClient.class);
71  
72      private final WebSocketPolicy policy;
73      private final SslContextFactory sslContextFactory;
74      private final WebSocketExtensionFactory extensionRegistry;
75      private boolean daemon = false;
76      private EventDriverFactory eventDriverFactory;
77      private SessionFactory sessionFactory;
78      private ByteBufferPool bufferPool;
79      private Executor executor;
80      private Scheduler scheduler;
81      private CookieStore cookieStore;
82      private ConnectionManager connectionManager;
83      private Masker masker;
84      private SocketAddress bindAddress;
85      private long connectTimeout = SelectorManager.DEFAULT_CONNECT_TIMEOUT;
86  
87      public WebSocketClient()
88      {
89          this(null,null);
90      }
91  
92      public WebSocketClient(Executor executor)
93      {
94          this(null,executor);
95      }
96      
97      public WebSocketClient(ByteBufferPool bufferPool)
98      {
99          this(null,null,bufferPool);
100     }
101 
102     public WebSocketClient(SslContextFactory sslContextFactory)
103     {
104         this(sslContextFactory,null);
105     }
106 
107     public WebSocketClient(SslContextFactory sslContextFactory, Executor executor)
108     {
109         this(sslContextFactory,executor,new MappedByteBufferPool());
110     }
111     
112     public WebSocketClient(SslContextFactory sslContextFactory, Executor executor, ByteBufferPool bufferPool)
113     {
114         this.executor = executor;
115         this.sslContextFactory = sslContextFactory;
116         this.policy = WebSocketPolicy.newClientPolicy();
117         this.bufferPool = bufferPool;
118         this.extensionRegistry = new WebSocketExtensionFactory(policy,bufferPool);
119         
120         // Bug #431459 - unregistering compression extensions till they are more stable
121         this.extensionRegistry.unregister("deflate-frame");
122         this.extensionRegistry.unregister("permessage-deflate");
123         this.extensionRegistry.unregister("x-webkit-deflate-frame");
124         
125         this.masker = new RandomMasker();
126         this.eventDriverFactory = new EventDriverFactory(policy);
127         this.sessionFactory = new WebSocketSessionFactory(this);
128         
129         addBean(this.executor);
130         addBean(this.sslContextFactory);
131         addBean(this.bufferPool);
132     }
133 
134     public Future<Session> connect(Object websocket, URI toUri) throws IOException
135     {
136         ClientUpgradeRequest request = new ClientUpgradeRequest(toUri);
137         request.setRequestURI(toUri);
138         request.setCookiesFrom(this.cookieStore);
139 
140         return connect(websocket,toUri,request);
141     }
142 
143     public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request) throws IOException
144     {
145         return connect(websocket,toUri,request,null);
146     }
147 
148     public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request, UpgradeListener upgradeListener) throws IOException
149     {
150         if (!isStarted())
151         {
152             throw new IllegalStateException(WebSocketClient.class.getSimpleName() + "@" + this.hashCode() + " is not started");
153         }
154 
155         // Validate websocket URI
156         if (!toUri.isAbsolute())
157         {
158             throw new IllegalArgumentException("WebSocket URI must be absolute");
159         }
160 
161         if (StringUtil.isBlank(toUri.getScheme()))
162         {
163             throw new IllegalArgumentException("WebSocket URI must include a scheme");
164         }
165 
166         String scheme = toUri.getScheme().toLowerCase(Locale.ENGLISH);
167         if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
168         {
169             throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
170         }
171 
172         request.setRequestURI(toUri);
173         request.setCookiesFrom(this.cookieStore);
174 
175         // Validate Requested Extensions
176         for (ExtensionConfig reqExt : request.getExtensions())
177         {
178             if (!extensionRegistry.isAvailable(reqExt.getName()))
179             {
180                 throw new IllegalArgumentException("Requested extension [" + reqExt.getName() + "] is not installed");
181             }
182         }
183 
184         // Validate websocket URI
185         LOG.debug("connect websocket {} to {}",websocket,toUri);
186 
187         // Grab Connection Manager
188         initialiseClient();
189         ConnectionManager manager = getConnectionManager();
190 
191         // Setup Driver for user provided websocket
192         EventDriver driver = null;
193         if (websocket instanceof EventDriver)
194         {
195             // Use the EventDriver as-is
196             driver = (EventDriver)websocket;
197         }
198         else
199         {
200             // Wrap websocket with appropriate EventDriver
201             driver = eventDriverFactory.wrap(websocket);
202         }
203 
204         if (driver == null)
205         {
206             throw new IllegalStateException("Unable to identify as websocket object: " + websocket.getClass().getName());
207         }
208 
209         // Create the appropriate (physical vs virtual) connection task
210         ConnectPromise promise = manager.connect(this,driver,request);
211 
212         if (upgradeListener != null)
213         {
214             promise.setUpgradeListener(upgradeListener);
215         }
216 
217         LOG.debug("Connect Promise: {}",promise);
218 
219         // Execute the connection on the executor thread
220         executor.execute(promise);
221 
222         // Return the future
223         return promise;
224     }
225 
226     @Override
227     protected void doStart() throws Exception
228     {
229         LOG.debug("Starting {}",this);
230 
231         if (sslContextFactory != null)
232         {
233             addBean(sslContextFactory);
234         }
235 
236         String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
237 
238         if (bufferPool == null)
239         {
240             bufferPool = new MappedByteBufferPool();
241         }
242         addBean(bufferPool);
243 
244         if (scheduler == null)
245         {
246             scheduler = new ScheduledExecutorScheduler(name + "-scheduler",daemon);
247         }
248         addBean(scheduler);
249 
250         if (cookieStore == null)
251         {
252             cookieStore = new HttpCookieStore.Empty();
253         }
254 
255         super.doStart();
256 
257         LOG.debug("Started {}",this);
258     }
259 
260     @Override
261     protected void doStop() throws Exception
262     {
263         LOG.debug("Stopping {}",this);
264 
265         if (cookieStore != null)
266         {
267             cookieStore.removeAll();
268             cookieStore = null;
269         }
270 
271         super.doStop();
272         LOG.debug("Stopped {}",this);
273     }
274 
275     /**
276      * Return the number of milliseconds for a timeout of an attempted write operation.
277      * 
278      * @return number of milliseconds for timeout of an attempted write operation
279      */
280     public long getAsyncWriteTimeout()
281     {
282         return this.policy.getAsyncWriteTimeout();
283     }
284 
285     public SocketAddress getBindAddress()
286     {
287         return bindAddress;
288     }
289 
290     public ByteBufferPool getBufferPool()
291     {
292         return bufferPool;
293     }
294 
295     public ConnectionManager getConnectionManager()
296     {
297         return connectionManager;
298     }
299 
300     public long getConnectTimeout()
301     {
302         return connectTimeout;
303     }
304 
305     public CookieStore getCookieStore()
306     {
307         return cookieStore;
308     }
309     
310     public EventDriverFactory getEventDriverFactory()
311     {
312         return eventDriverFactory;
313     }
314 
315     public Executor getExecutor()
316     {
317         return executor;
318     }
319 
320     public ExtensionFactory getExtensionFactory()
321     {
322         return extensionRegistry;
323     }
324 
325     public Masker getMasker()
326     {
327         return masker;
328     }
329 
330     /**
331      * Get the maximum size for buffering of a binary message.
332      * 
333      * @return the maximum size of a binary message buffer.
334      */
335     public int getMaxBinaryMessageBufferSize()
336     {
337         return this.policy.getMaxBinaryMessageBufferSize();
338     }
339 
340     /**
341      * Get the maximum size for a binary message.
342      * 
343      * @return the maximum size of a binary message.
344      */
345     public long getMaxBinaryMessageSize()
346     {
347         return this.policy.getMaxBinaryMessageSize();
348     }
349 
350     /**
351      * Get the max idle timeout for new connections.
352      * 
353      * @return the max idle timeout in milliseconds for new connections.
354      */
355     public long getMaxIdleTimeout()
356     {
357         return this.policy.getIdleTimeout();
358     }
359 
360     /**
361      * Get the maximum size for buffering of a text message.
362      * 
363      * @return the maximum size of a text message buffer.
364      */
365     public int getMaxTextMessageBufferSize()
366     {
367         return this.policy.getMaxTextMessageBufferSize();
368     }
369 
370     /**
371      * Get the maximum size for a text message.
372      * 
373      * @return the maximum size of a text message.
374      */
375     public long getMaxTextMessageSize()
376     {
377         return this.policy.getMaxTextMessageSize();
378     }
379 
380     public Set<WebSocketSession> getOpenSessions()
381     {
382         return new HashSet<>(getBeans(WebSocketSession.class));
383     }
384 
385     public WebSocketPolicy getPolicy()
386     {
387         return this.policy;
388     }
389 
390     public Scheduler getScheduler()
391     {
392         return scheduler;
393     }
394 
395     public SessionFactory getSessionFactory()
396     {
397         return sessionFactory;
398     }
399 
400     /**
401      * @return the {@link SslContextFactory} that manages TLS encryption
402      * @see #WebSocketClient(SslContextFactory)
403      */
404     public SslContextFactory getSslContextFactory()
405     {
406         return sslContextFactory;
407     }
408 
409     public List<Extension> initExtensions(List<ExtensionConfig> requested)
410     {
411         List<Extension> extensions = new ArrayList<Extension>();
412 
413         for (ExtensionConfig cfg : requested)
414         {
415             Extension extension = extensionRegistry.newInstance(cfg);
416 
417             if (extension == null)
418             {
419                 continue;
420             }
421 
422             LOG.debug("added {}",extension);
423             extensions.add(extension);
424         }
425         LOG.debug("extensions={}",extensions);
426         return extensions;
427     }
428 
429     private synchronized void initialiseClient() throws IOException
430     {
431         ShutdownThread.register(this);
432 
433         if (executor == null)
434         {
435             QueuedThreadPool threadPool = new QueuedThreadPool();
436             String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
437             threadPool.setName(name);
438             threadPool.setDaemon(daemon);
439             executor = threadPool;
440             addManaged(threadPool);
441         }
442         else
443         {
444             addBean(executor,false);
445         }
446 
447         if (connectionManager == null)
448         {
449             connectionManager = newConnectionManager();
450             addManaged(connectionManager);
451         }
452     }
453 
454     /**
455      * Factory method for new ConnectionManager (used by other projects like cometd)
456      * 
457      * @return the ConnectionManager instance to use
458      */
459     protected ConnectionManager newConnectionManager()
460     {
461         return new ConnectionManager(this);
462     }
463 
464     @Override
465     public void onSessionClosed(WebSocketSession session)
466     {
467         LOG.debug("Session Closed: {}",session);
468         removeBean(session);
469     }
470 
471     @Override
472     public void onSessionOpened(WebSocketSession session)
473     {
474         LOG.debug("Session Opened: {}",session);
475     }
476 
477     public void setAsyncWriteTimeout(long ms)
478     {
479         this.policy.setAsyncWriteTimeout(ms);
480     }
481 
482     public void setBindAdddress(SocketAddress bindAddress)
483     {
484         this.bindAddress = bindAddress;
485     }
486 
487     public void setBufferPool(ByteBufferPool bufferPool)
488     {
489         this.bufferPool = bufferPool;
490     }
491 
492     /**
493      * Set the timeout for connecting to the remote server.
494      * 
495      * @param ms
496      *            the timeout in milliseconds
497      */
498     public void setConnectTimeout(long ms)
499     {
500         if (ms < 0)
501         {
502             throw new IllegalStateException("Connect Timeout cannot be negative");
503         }
504         this.connectTimeout = ms;
505     }
506 
507     public void setCookieStore(CookieStore cookieStore)
508     {
509         this.cookieStore = cookieStore;
510     }
511     
512     public void setDaemon(boolean daemon)
513     {
514         this.daemon = daemon;
515     }
516 
517     public void setEventDriverFactory(EventDriverFactory factory)
518     {
519         this.eventDriverFactory = factory;
520     }
521 
522     public void setExecutor(Executor executor)
523     {
524         updateBean(this.executor,executor);
525         this.executor = executor;
526     }
527 
528     public void setMasker(Masker masker)
529     {
530         this.masker = masker;
531     }
532 
533     public void setMaxBinaryMessageBufferSize(int max)
534     {
535         this.policy.setMaxBinaryMessageBufferSize(max);
536     }
537     
538     /**
539      * Set the max idle timeout for new connections.
540      * <p>
541      * Existing connections will not have their max idle timeout adjusted.
542      * 
543      * @param ms
544      *            the timeout in milliseconds
545      */
546     public void setMaxIdleTimeout(long ms)
547     {
548         this.policy.setIdleTimeout(ms);
549     }
550 
551     public void setMaxTextMessageBufferSize(int max)
552     {
553         this.policy.setMaxTextMessageBufferSize(max);
554     }
555 
556     public void setSessionFactory(SessionFactory sessionFactory)
557     {
558         this.sessionFactory = sessionFactory;
559     }
560 
561     @Override
562     public void dump(Appendable out, String indent) throws IOException
563     {
564         dumpThis(out);
565         dump(out, indent, getOpenSessions());
566     }
567 }