View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.client;
20  
21  import java.io.IOException;
22  import java.net.CookieStore;
23  import java.net.SocketAddress;
24  import java.net.URI;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Locale;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.Future;
30  
31  import org.eclipse.jetty.io.ByteBufferPool;
32  import org.eclipse.jetty.io.MappedByteBufferPool;
33  import org.eclipse.jetty.io.SelectorManager;
34  import org.eclipse.jetty.util.HttpCookieStore;
35  import org.eclipse.jetty.util.StringUtil;
36  import org.eclipse.jetty.util.component.ContainerLifeCycle;
37  import org.eclipse.jetty.util.log.Log;
38  import org.eclipse.jetty.util.log.Logger;
39  import org.eclipse.jetty.util.ssl.SslContextFactory;
40  import org.eclipse.jetty.util.thread.QueuedThreadPool;
41  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
42  import org.eclipse.jetty.util.thread.Scheduler;
43  import org.eclipse.jetty.websocket.api.Session;
44  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
45  import org.eclipse.jetty.websocket.api.extensions.Extension;
46  import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
47  import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
48  import org.eclipse.jetty.websocket.client.io.ConnectPromise;
49  import org.eclipse.jetty.websocket.client.io.ConnectionManager;
50  import org.eclipse.jetty.websocket.client.io.UpgradeListener;
51  import org.eclipse.jetty.websocket.client.masks.Masker;
52  import org.eclipse.jetty.websocket.client.masks.RandomMasker;
53  import org.eclipse.jetty.websocket.common.SessionFactory;
54  import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
55  import org.eclipse.jetty.websocket.common.events.EventDriver;
56  import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
57  import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
58  
59  /**
60   * WebSocketClient provides a means of establishing connections to remote websocket endpoints.
61   */
62  public class WebSocketClient extends ContainerLifeCycle
63  {
64      private static final Logger LOG = Log.getLogger(WebSocketClient.class);
65  
66      private final WebSocketPolicy policy;
67      private final SslContextFactory sslContextFactory;
68      private final WebSocketExtensionFactory extensionRegistry;
69      private EventDriverFactory eventDriverFactory;
70      private SessionFactory sessionFactory;
71      private ByteBufferPool bufferPool;
72      private Executor executor;
73      private Scheduler scheduler;
74      private CookieStore cookieStore;
75      private ConnectionManager connectionManager;
76      private Masker masker;
77      private SocketAddress bindAddress;
78      private long connectTimeout = SelectorManager.DEFAULT_CONNECT_TIMEOUT;
79  
80      public WebSocketClient()
81      {
82          this(null,null);
83      }
84  
85      public WebSocketClient(Executor executor)
86      {
87          this(null,executor);
88      }
89  
90      public WebSocketClient(SslContextFactory sslContextFactory)
91      {
92          this(sslContextFactory,null);
93      }
94  
95      public WebSocketClient(SslContextFactory sslContextFactory, Executor executor)
96      {
97          this.executor = executor;
98          this.sslContextFactory = sslContextFactory;
99          this.policy = WebSocketPolicy.newClientPolicy();
100         this.bufferPool = new MappedByteBufferPool();
101         this.extensionRegistry = new WebSocketExtensionFactory(policy,bufferPool);
102         this.masker = new RandomMasker();
103         this.eventDriverFactory = new EventDriverFactory(policy);
104         this.sessionFactory = new WebSocketSessionFactory();
105     }
106 
107     public Future<Session> connect(Object websocket, URI toUri) throws IOException
108     {
109         ClientUpgradeRequest request = new ClientUpgradeRequest(toUri);
110         request.setRequestURI(toUri);
111         request.setCookiesFrom(this.cookieStore);
112 
113         return connect(websocket,toUri,request);
114     }
115 
116     public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request) throws IOException
117     {
118         return connect(websocket,toUri,request,null);
119     }
120 
121     public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request, UpgradeListener upgradeListener) throws IOException
122     {
123         if (!isStarted())
124         {
125             throw new IllegalStateException(WebSocketClient.class.getSimpleName() + "@" + this.hashCode() + " is not started");
126         }
127 
128         // Validate websocket URI
129         if (!toUri.isAbsolute())
130         {
131             throw new IllegalArgumentException("WebSocket URI must be absolute");
132         }
133 
134         if (StringUtil.isBlank(toUri.getScheme()))
135         {
136             throw new IllegalArgumentException("WebSocket URI must include a scheme");
137         }
138 
139         String scheme = toUri.getScheme().toLowerCase(Locale.ENGLISH);
140         if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
141         {
142             throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
143         }
144 
145         request.setRequestURI(toUri);
146         request.setCookiesFrom(this.cookieStore);
147 
148         // Validate Requested Extensions
149         for (ExtensionConfig reqExt : request.getExtensions())
150         {
151             if (!extensionRegistry.isAvailable(reqExt.getName()))
152             {
153                 throw new IllegalArgumentException("Requested extension [" + reqExt.getName() + "] is not installed");
154             }
155         }
156 
157         // Validate websocket URI
158         LOG.debug("connect websocket {} to {}",websocket,toUri);
159 
160         // Grab Connection Manager
161         initialiseClient();
162         ConnectionManager manager = getConnectionManager();
163 
164         // Setup Driver for user provided websocket
165         EventDriver driver = null;
166         if (websocket instanceof EventDriver)
167         {
168             // Use the EventDriver as-is
169             driver = (EventDriver)websocket;
170         }
171         else
172         {
173             // Wrap websocket with appropriate EventDriver
174             driver = eventDriverFactory.wrap(websocket);
175         }
176 
177         if (driver == null)
178         {
179             throw new IllegalStateException("Unable to identify as websocket object: " + websocket.getClass().getName());
180         }
181 
182         // Create the appropriate (physical vs virtual) connection task
183         ConnectPromise promise = manager.connect(this,driver,request);
184 
185         if (upgradeListener != null)
186         {
187             promise.setUpgradeListener(upgradeListener);
188         }
189 
190         LOG.debug("Connect Promise: {}",promise);
191 
192         // Execute the connection on the executor thread
193         executor.execute(promise);
194 
195         // Return the future
196         return promise;
197     }
198 
199     private synchronized void initialiseClient() throws IOException
200     {
201         if (executor == null)
202         {
203             QueuedThreadPool threadPool = new QueuedThreadPool();
204             String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
205             threadPool.setName(name);
206             executor = threadPool;
207             addBean(executor,true);
208         }
209         else
210         {
211             addBean(executor,false);
212         }
213 
214         if (connectionManager != null)
215         {
216             return;
217         }
218         try
219         {
220             connectionManager = newConnectionManager();
221             addBean(connectionManager);
222             connectionManager.start();
223         }
224         catch (IOException e)
225         {
226             throw e;
227         }
228         catch (Exception e)
229         {
230             throw new IOException(e);
231         }
232     }
233 
234     @Override
235     protected void doStart() throws Exception
236     {
237         LOG.debug("Starting {}",this);
238 
239         if (sslContextFactory != null)
240         {
241             addBean(sslContextFactory);
242         }
243 
244         String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
245 
246         if (bufferPool == null)
247         {
248             bufferPool = new MappedByteBufferPool();
249         }
250         addBean(bufferPool);
251 
252         if (scheduler == null)
253         {
254             scheduler = new ScheduledExecutorScheduler(name + "-scheduler",false);
255         }
256         addBean(scheduler);
257 
258         if (cookieStore == null)
259         {
260             cookieStore = new HttpCookieStore.Empty();
261         }
262 
263         super.doStart();
264 
265         LOG.debug("Started {}",this);
266     }
267 
268     @Override
269     protected void doStop() throws Exception
270     {
271         LOG.debug("Stopping {}",this);
272 
273         if (cookieStore != null)
274         {
275             cookieStore.removeAll();
276             cookieStore = null;
277         }
278 
279         super.doStop();
280         LOG.info("Stopped {}",this);
281     }
282 
283     /**
284      * Return the number of milliseconds for a timeout of an attempted write operation.
285      * 
286      * @return number of milliseconds for timeout of an attempted write operation
287      */
288     public long getAsyncWriteTimeout()
289     {
290         return this.policy.getAsyncWriteTimeout();
291     }
292 
293     public SocketAddress getBindAddress()
294     {
295         return bindAddress;
296     }
297 
298     public ByteBufferPool getBufferPool()
299     {
300         return bufferPool;
301     }
302 
303     public ConnectionManager getConnectionManager()
304     {
305         return connectionManager;
306     }
307 
308     public long getConnectTimeout()
309     {
310         return connectTimeout;
311     }
312 
313     public CookieStore getCookieStore()
314     {
315         return cookieStore;
316     }
317 
318     public EventDriverFactory getEventDriverFactory()
319     {
320         return eventDriverFactory;
321     }
322 
323     public Executor getExecutor()
324     {
325         return executor;
326     }
327 
328     public ExtensionFactory getExtensionFactory()
329     {
330         return extensionRegistry;
331     }
332 
333     public Masker getMasker()
334     {
335         return masker;
336     }
337 
338     /**
339      * Get the maximum size for buffering of a binary message.
340      * 
341      * @return the maximum size of a binary message buffer.
342      */
343     public int getMaxBinaryMessageBufferSize()
344     {
345         return this.policy.getMaxBinaryMessageBufferSize();
346     }
347 
348     /**
349      * Get the maximum size for a binary message.
350      * 
351      * @return the maximum size of a binary message.
352      */
353     public long getMaxBinaryMessageSize()
354     {
355         return this.policy.getMaxBinaryMessageSize();
356     }
357 
358     /**
359      * Get the max idle timeout for new connections.
360      * 
361      * @return the max idle timeout in milliseconds for new connections.
362      */
363     public long getMaxIdleTimeout()
364     {
365         return this.policy.getIdleTimeout();
366     }
367 
368     /**
369      * Get the maximum size for buffering of a text message.
370      * 
371      * @return the maximum size of a text message buffer.
372      */
373     public int getMaxTextMessageBufferSize()
374     {
375         return this.policy.getMaxTextMessageBufferSize();
376     }
377 
378     /**
379      * Get the maximum size for a text message.
380      * 
381      * @return the maximum size of a text message.
382      */
383     public long getMaxTextMessageSize()
384     {
385         return this.policy.getMaxTextMessageSize();
386     }
387 
388     public WebSocketPolicy getPolicy()
389     {
390         return this.policy;
391     }
392 
393     public Scheduler getScheduler()
394     {
395         return scheduler;
396     }
397 
398     public SessionFactory getSessionFactory()
399     {
400         return sessionFactory;
401     }
402 
403     /**
404      * @return the {@link SslContextFactory} that manages TLS encryption
405      * @see #WebSocketClient(SslContextFactory)
406      */
407     public SslContextFactory getSslContextFactory()
408     {
409         return sslContextFactory;
410     }
411 
412     public List<Extension> initExtensions(List<ExtensionConfig> requested)
413     {
414         List<Extension> extensions = new ArrayList<Extension>();
415 
416         for (ExtensionConfig cfg : requested)
417         {
418             Extension extension = extensionRegistry.newInstance(cfg);
419 
420             if (extension == null)
421             {
422                 continue;
423             }
424 
425             LOG.debug("added {}",extension);
426             extensions.add(extension);
427         }
428         LOG.debug("extensions={}",extensions);
429         return extensions;
430     }
431 
432     /**
433      * Factory method for new ConnectionManager (used by other projects like cometd)
434      * 
435      * @return the ConnectionManager instance to use
436      */
437     protected ConnectionManager newConnectionManager()
438     {
439         return new ConnectionManager(this);
440     }
441 
442     public void setAsyncWriteTimeout(long ms)
443     {
444         this.policy.setAsyncWriteTimeout(ms);
445     }
446 
447     public void setBindAdddress(SocketAddress bindAddress)
448     {
449         this.bindAddress = bindAddress;
450     }
451 
452     public void setBufferPool(ByteBufferPool bufferPool)
453     {
454         this.bufferPool = bufferPool;
455     }
456 
457     /**
458      * Set the timeout for connecting to the remote server.
459      * 
460      * @param ms
461      *            the timeout in milliseconds
462      */
463     public void setConnectTimeout(long ms)
464     {
465         if (ms < 0)
466         {
467             throw new IllegalStateException("Connect Timeout cannot be negative");
468         }
469         this.connectTimeout = ms;
470     }
471 
472     public void setCookieStore(CookieStore cookieStore)
473     {
474         this.cookieStore = cookieStore;
475     }
476 
477     public void setEventDriverFactory(EventDriverFactory factory)
478     {
479         this.eventDriverFactory = factory;
480     }
481 
482     public void setExecutor(Executor executor)
483     {
484         updateBean(this.executor,executor);
485         this.executor = executor;
486     }
487 
488     public void setMasker(Masker masker)
489     {
490         this.masker = masker;
491     }
492 
493     public void setMaxBinaryMessageBufferSize(int max)
494     {
495         this.policy.setMaxBinaryMessageBufferSize(max);
496     }
497 
498     /**
499      * Set the max idle timeout for new connections.
500      * <p>
501      * Existing connections will not have their max idle timeout adjusted.
502      * 
503      * @param ms
504      *            the timeout in milliseconds
505      */
506     public void setMaxIdleTimeout(long ms)
507     {
508         this.policy.setIdleTimeout(ms);
509     }
510 
511     public void setMaxTextMessageBufferSize(int max)
512     {
513         this.policy.setMaxTextMessageBufferSize(max);
514     }
515 
516     public void setSessionFactory(SessionFactory sessionFactory)
517     {
518         this.sessionFactory = sessionFactory;
519     }
520 }