View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.client.io;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.SocketAddress;
24  import java.net.URI;
25  import java.nio.channels.SocketChannel;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.Locale;
29  import java.util.Queue;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  
32  import org.eclipse.jetty.util.component.ContainerLifeCycle;
33  import org.eclipse.jetty.util.log.Log;
34  import org.eclipse.jetty.util.log.Logger;
35  import org.eclipse.jetty.websocket.api.WebSocketException;
36  import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
37  import org.eclipse.jetty.websocket.client.WebSocketClient;
38  import org.eclipse.jetty.websocket.common.WebSocketSession;
39  import org.eclipse.jetty.websocket.common.events.EventDriver;
40  
41  /**
42   * Internal Connection/Client Manager used to track active clients, their physical vs virtual connection information, and provide some means to create new
43   * physical or virtual connections.
44   */
45  public class ConnectionManager extends ContainerLifeCycle
46  {
47      private class PhysicalConnect extends ConnectPromise
48      {
49          private SocketAddress bindAddress;
50  
51          public PhysicalConnect(WebSocketClient client, EventDriver driver, ClientUpgradeRequest request)
52          {
53              super(client,driver,request);
54              this.bindAddress = client.getBindAddress();
55          }
56  
57          @Override
58          public void run()
59          {
60              SocketChannel channel = null;
61              try
62              {
63                  channel = SocketChannel.open();
64                  if (bindAddress != null)
65                  {
66                      channel.bind(bindAddress);
67                  }
68  
69                  URI wsUri = getRequest().getRequestURI();
70  
71                  channel.socket().setTcpNoDelay(true); // disable nagle
72                  channel.configureBlocking(false); // async always
73  
74                  InetSocketAddress address = toSocketAddress(wsUri);
75  
76                  channel.connect(address);
77                  getSelector().connect(channel,this);
78              }
79              catch (Throwable t)
80              {
81                  // close the socket channel
82                  if (channel != null)
83                  {
84                      try
85                      {
86                          channel.close();
87                      }
88                      catch (IOException ignore)
89                      {
90                          LOG.ignore(ignore);
91                      }
92                  }
93                  
94                  // notify the future
95                  failed(t);
96              }
97          }
98      }
99  
100     private class VirtualConnect extends ConnectPromise
101     {
102         public VirtualConnect(WebSocketClient client, EventDriver driver, ClientUpgradeRequest request)
103         {
104             super(client,driver,request);
105         }
106 
107         @Override
108         public void run()
109         {
110             failed(new WebSocketException("MUX Not yet supported"));
111         }
112     }
113 
114     private static final Logger LOG = Log.getLogger(ConnectionManager.class);
115 
116     public static InetSocketAddress toSocketAddress(URI uri)
117     {
118         if (!uri.isAbsolute())
119         {
120             throw new IllegalArgumentException("Cannot get InetSocketAddress of non-absolute URIs");
121         }
122 
123         int port = uri.getPort();
124         String scheme = uri.getScheme().toLowerCase(Locale.ENGLISH);
125         if ("ws".equals(scheme))
126         {
127             if (port == (-1))
128             {
129                 port = 80;
130             }
131         }
132         else if ("wss".equals(scheme))
133         {
134             if (port == (-1))
135             {
136                 port = 443;
137             }
138         }
139         else
140         {
141             throw new IllegalArgumentException("Only support ws:// and wss:// URIs");
142         }
143 
144         return new InetSocketAddress(uri.getHost(),port);
145     }
146 
147     private final Queue<WebSocketSession> sessions = new ConcurrentLinkedQueue<>();
148     private final WebSocketClient client;
149     private WebSocketClientSelectorManager selector;
150 
151     public ConnectionManager(WebSocketClient client)
152     {
153         this.client = client;
154     }
155 
156     public void addSession(WebSocketSession session)
157     {
158         sessions.add(session);
159     }
160 
161     private void closeAllConnections()
162     {
163         for (WebSocketSession session : sessions)
164         {
165             if (session.getConnection() != null)
166             {
167                 try
168                 {
169                     session.getConnection().close();
170                 }
171                 catch (Throwable t)
172                 {
173                     LOG.debug("During Close All Connections",t);
174                 }
175             }
176         }
177     }
178 
179     public ConnectPromise connect(WebSocketClient client, EventDriver driver, ClientUpgradeRequest request)
180     {
181         URI toUri = request.getRequestURI();
182         String hostname = toUri.getHost();
183 
184         if (isVirtualConnectionPossibleTo(hostname))
185         {
186             return new VirtualConnect(client,driver,request);
187         }
188 
189         return new PhysicalConnect(client,driver,request);
190     }
191 
192     @Override
193     protected void doStart() throws Exception
194     {
195         selector = newWebSocketClientSelectorManager(client);
196         selector.setSslContextFactory(client.getSslContextFactory());
197         selector.setConnectTimeout(client.getConnectTimeout());
198         addBean(selector);
199 
200         super.doStart();
201     }
202 
203     @Override
204     protected void doStop() throws Exception
205     {
206         closeAllConnections();
207         sessions.clear();
208         super.doStop();
209         removeBean(selector);
210     }
211 
212     public WebSocketClientSelectorManager getSelector()
213     {
214         return selector;
215     }
216 
217     public Collection<WebSocketSession> getSessions()
218     {
219         return Collections.unmodifiableCollection(sessions);
220     }
221 
222     public boolean isVirtualConnectionPossibleTo(String hostname)
223     {
224         // TODO Auto-generated method stub
225         return false;
226     }
227 
228     /**
229      * Factory method for new WebSocketClientSelectorManager (used by other projects like cometd)
230      * 
231      * @param client
232      *            the client used to create the WebSocketClientSelectorManager
233      * @return the new WebSocketClientSelectorManager
234      */
235     protected WebSocketClientSelectorManager newWebSocketClientSelectorManager(WebSocketClient client)
236     {
237         return new WebSocketClientSelectorManager(client);
238     }
239 
240     public void removeSession(WebSocketSession session)
241     {
242         sessions.remove(session);
243     }
244 }