1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.client.io;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.net.URI;
25 import java.nio.channels.SocketChannel;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Locale;
29 import java.util.Queue;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31
32 import org.eclipse.jetty.util.component.ContainerLifeCycle;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35 import org.eclipse.jetty.websocket.api.StatusCode;
36 import org.eclipse.jetty.websocket.api.WebSocketException;
37 import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
38 import org.eclipse.jetty.websocket.client.WebSocketClient;
39 import org.eclipse.jetty.websocket.common.WebSocketSession;
40 import org.eclipse.jetty.websocket.common.events.EventDriver;
41
42
43
44
45
46 public class ConnectionManager extends ContainerLifeCycle
47 {
48 private class PhysicalConnect extends ConnectPromise
49 {
50 private SocketAddress bindAddress;
51
52 public PhysicalConnect(WebSocketClient client, EventDriver driver, ClientUpgradeRequest request)
53 {
54 super(client,driver,request);
55 this.bindAddress = client.getBindAddress();
56 }
57
58 @Override
59 public void run()
60 {
61 SocketChannel channel = null;
62 try
63 {
64 channel = SocketChannel.open();
65 if (bindAddress != null)
66 {
67 channel.bind(bindAddress);
68 }
69
70 URI wsUri = getRequest().getRequestURI();
71
72 channel.socket().setTcpNoDelay(true);
73 channel.configureBlocking(false);
74
75 InetSocketAddress address = toSocketAddress(wsUri);
76
77 if (channel.connect(address))
78 {
79 getSelector().accept(channel, this);
80 }
81 else
82 {
83 getSelector().connect(channel, this);
84 }
85 }
86 catch (Throwable t)
87 {
88
89 if (channel != null)
90 {
91 try
92 {
93 channel.close();
94 }
95 catch (IOException ignore)
96 {
97 LOG.ignore(ignore);
98 }
99 }
100
101
102 failed(t);
103 }
104 }
105 }
106
107 private class VirtualConnect extends ConnectPromise
108 {
109 public VirtualConnect(WebSocketClient client, EventDriver driver, ClientUpgradeRequest request)
110 {
111 super(client,driver,request);
112 }
113
114 @Override
115 public void run()
116 {
117 failed(new WebSocketException("MUX Not yet supported"));
118 }
119 }
120
121 private static final Logger LOG = Log.getLogger(ConnectionManager.class);
122
123 public static InetSocketAddress toSocketAddress(URI uri)
124 {
125 if (!uri.isAbsolute())
126 {
127 throw new IllegalArgumentException("Cannot get InetSocketAddress of non-absolute URIs");
128 }
129
130 int port = uri.getPort();
131 String scheme = uri.getScheme().toLowerCase(Locale.ENGLISH);
132 if ("ws".equals(scheme))
133 {
134 if (port == (-1))
135 {
136 port = 80;
137 }
138 }
139 else if ("wss".equals(scheme))
140 {
141 if (port == (-1))
142 {
143 port = 443;
144 }
145 }
146 else
147 {
148 throw new IllegalArgumentException("Only support ws:// and wss:// URIs");
149 }
150
151 return new InetSocketAddress(uri.getHost(),port);
152 }
153
154 private final Queue<WebSocketSession> sessions = new ConcurrentLinkedQueue<>();
155 private final WebSocketClient client;
156 private WebSocketClientSelectorManager selector;
157
158 public ConnectionManager(WebSocketClient client)
159 {
160 this.client = client;
161 }
162
163 public void addSession(WebSocketSession session)
164 {
165 sessions.add(session);
166 }
167
168 private void shutdownAllConnections()
169 {
170 for (WebSocketSession session : sessions)
171 {
172 if (session.getConnection() != null)
173 {
174 try
175 {
176 session.getConnection().close(
177 StatusCode.SHUTDOWN,
178 "Shutdown");
179 }
180 catch (Throwable t)
181 {
182 LOG.debug("During Shutdown All Connections",t);
183 }
184 }
185 }
186 }
187
188 public ConnectPromise connect(WebSocketClient client, EventDriver driver, ClientUpgradeRequest request)
189 {
190 URI toUri = request.getRequestURI();
191 String hostname = toUri.getHost();
192
193 if (isVirtualConnectionPossibleTo(hostname))
194 {
195 return new VirtualConnect(client,driver,request);
196 }
197
198 return new PhysicalConnect(client,driver,request);
199 }
200
201 @Override
202 protected void doStart() throws Exception
203 {
204 selector = newWebSocketClientSelectorManager(client);
205 selector.setSslContextFactory(client.getSslContextFactory());
206 selector.setConnectTimeout(client.getConnectTimeout());
207 addBean(selector);
208
209 super.doStart();
210 }
211
212 @Override
213 protected void doStop() throws Exception
214 {
215 shutdownAllConnections();
216 sessions.clear();
217 super.doStop();
218 removeBean(selector);
219 }
220
221 public WebSocketClientSelectorManager getSelector()
222 {
223 return selector;
224 }
225
226 public Collection<WebSocketSession> getSessions()
227 {
228 return Collections.unmodifiableCollection(sessions);
229 }
230
231 public boolean isVirtualConnectionPossibleTo(String hostname)
232 {
233
234 return false;
235 }
236
237
238
239
240
241
242
243
244 protected WebSocketClientSelectorManager newWebSocketClientSelectorManager(WebSocketClient client)
245 {
246 return new WebSocketClientSelectorManager(client);
247 }
248
249 public void removeSession(WebSocketSession session)
250 {
251 sessions.remove(session);
252 }
253 }