1 package org.eclipse.jetty.websocket;
2
3 import java.io.EOFException;
4 import java.io.IOException;
5 import java.net.ProtocolException;
6 import java.nio.channels.SelectionKey;
7 import java.nio.channels.SocketChannel;
8 import java.util.Random;
9
10 import org.eclipse.jetty.http.HttpFields;
11 import org.eclipse.jetty.http.HttpParser;
12 import org.eclipse.jetty.io.AbstractConnection;
13 import org.eclipse.jetty.io.Buffer;
14 import org.eclipse.jetty.io.Buffers;
15 import org.eclipse.jetty.io.ByteArrayBuffer;
16 import org.eclipse.jetty.io.ConnectedEndPoint;
17 import org.eclipse.jetty.io.Connection;
18 import org.eclipse.jetty.io.SimpleBuffers;
19 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
20 import org.eclipse.jetty.io.nio.SelectorManager;
21 import org.eclipse.jetty.util.B64Code;
22 import org.eclipse.jetty.util.QuotedStringTokenizer;
23 import org.eclipse.jetty.util.component.AggregateLifeCycle;
24 import org.eclipse.jetty.util.component.LifeCycle;
25 import org.eclipse.jetty.util.log.Logger;
26 import org.eclipse.jetty.util.thread.QueuedThreadPool;
27 import org.eclipse.jetty.util.thread.ThreadPool;
28
29
30
31
32
33
34
35
36
37
38
39 public class WebSocketClientFactory extends AggregateLifeCycle
40 {
41 private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
42 private final static Random __random = new Random();
43 private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
44
45 private final ThreadPool _threadPool;
46 private final WebSocketClientSelector _selector;
47 private MaskGen _maskGen;
48 private WebSocketBuffers _buffers;
49
50
51
52
53
54 public WebSocketClientFactory()
55 {
56 _threadPool=new QueuedThreadPool();
57 addBean(_threadPool);
58 _buffers=new WebSocketBuffers(8*1024);
59 addBean(_buffers);
60 _maskGen=new RandomMaskGen();
61 addBean(_maskGen);
62 _selector=new WebSocketClientSelector();
63 addBean(_selector);
64 }
65
66
67
68
69
70
71 public WebSocketClientFactory(ThreadPool threadPool)
72 {
73 _threadPool=threadPool;
74 addBean(threadPool);
75 _buffers=new WebSocketBuffers(8*1024);
76 addBean(_buffers);
77 _maskGen=new RandomMaskGen();
78 addBean(_maskGen);
79 _selector=new WebSocketClientSelector();
80 addBean(_selector);
81 }
82
83
84
85
86
87
88
89
90 public WebSocketClientFactory(ThreadPool threadPool,MaskGen maskGen,int bufferSize)
91 {
92 _threadPool=threadPool;
93 addBean(threadPool);
94 _buffers=new WebSocketBuffers(bufferSize);
95 addBean(_buffers);
96 _maskGen=maskGen;
97 _selector=new WebSocketClientSelector();
98 addBean(_selector);
99 }
100
101
102
103
104
105
106 public SelectorManager getSelectorManager()
107 {
108 return _selector;
109 }
110
111
112
113
114
115
116 public ThreadPool getThreadPool()
117 {
118 return _threadPool;
119 }
120
121
122
123
124
125
126 public MaskGen getMaskGen()
127 {
128 return _maskGen;
129 }
130
131
132
133
134
135
136 public void setMaskGen(MaskGen maskGen)
137 {
138 if (isRunning())
139 throw new IllegalStateException(getState());
140 if (removeBean(_maskGen))
141 addBean(maskGen);
142 _maskGen=maskGen;
143 }
144
145
146
147
148
149
150 public void setBufferSize(int bufferSize)
151 {
152 if (isRunning())
153 throw new IllegalStateException(getState());
154 removeBean(_buffers);
155 _buffers=new WebSocketBuffers(bufferSize);
156 addBean(_buffers);
157 }
158
159
160
161
162
163 public int getBufferSize()
164 {
165 return _buffers.getBufferSize();
166 }
167
168
169
170
171
172
173
174
175 public WebSocketClient newWebSocketClient()
176 {
177 return new WebSocketClient(this);
178 }
179
180
181 @Override
182 protected void doStart() throws Exception
183 {
184 super.doStart();
185 if (getThreadPool() instanceof LifeCycle && !((LifeCycle)getThreadPool()).isStarted())
186 ((LifeCycle)getThreadPool()).start();
187 }
188
189
190 @Override
191 protected void doStop() throws Exception
192 {
193 super.doStop();
194 }
195
196
197
198
199 class WebSocketClientSelector extends SelectorManager
200 {
201 @Override
202 public boolean dispatch(Runnable task)
203 {
204 return _threadPool.dispatch(task);
205 }
206
207 @Override
208 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey sKey) throws IOException
209 {
210 return new SelectChannelEndPoint(channel,selectSet,sKey);
211 }
212
213 @Override
214 protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
215 {
216 WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture) endpoint.getSelectionKey().attachment();
217 return new HandshakeConnection(endpoint,holder);
218 }
219
220 @Override
221 protected void endPointOpened(SelectChannelEndPoint endpoint)
222 {
223
224 }
225
226 @Override
227 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
228 {
229 throw new IllegalStateException();
230 }
231
232 @Override
233 protected void endPointClosed(SelectChannelEndPoint endpoint)
234 {
235 endpoint.getConnection().closed();
236 }
237
238 @Override
239 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
240 {
241 if (!(attachment instanceof WebSocketClient.WebSocketFuture))
242 super.connectionFailed(channel,ex,attachment);
243 else
244 {
245 __log.debug(ex);
246 WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;
247
248 future.handshakeFailed(ex);
249 }
250 }
251 }
252
253
254
255
256
257
258 class HandshakeConnection extends AbstractConnection
259 {
260 private final SelectChannelEndPoint _endp;
261 private final WebSocketClient.WebSocketFuture _future;
262 private final String _key;
263 private final HttpParser _parser;
264 private String _accept;
265 private String _error;
266
267 public HandshakeConnection(SelectChannelEndPoint endpoint, WebSocketClient.WebSocketFuture future)
268 {
269 super(endpoint,System.currentTimeMillis());
270 _endp=endpoint;
271 _future=future;
272
273 byte[] bytes=new byte[16];
274 __random.nextBytes(bytes);
275 _key=new String(B64Code.encode(bytes));
276
277
278 Buffers buffers = new SimpleBuffers(_buffers.getBuffer(),null);
279 _parser=new HttpParser(buffers,_endp,
280
281 new HttpParser.EventHandler()
282 {
283 @Override
284 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
285 {
286 if (status!=101)
287 {
288 _error="Bad response status "+status+" "+reason;
289 _endp.close();
290 }
291 }
292
293 @Override
294 public void parsedHeader(Buffer name, Buffer value) throws IOException
295 {
296 if (__ACCEPT.equals(name))
297 _accept=value.toString();
298 }
299
300 @Override
301 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
302 {
303 if (_error==null)
304 _error="Bad response: "+method+" "+url+" "+version;
305 _endp.close();
306 }
307
308 @Override
309 public void content(Buffer ref) throws IOException
310 {
311 if (_error==null)
312 _error="Bad response. "+ref.length()+"B of content?";
313 _endp.close();
314 }
315 });
316
317 String path=_future.getURI().getPath();
318 if (path==null || path.length()==0)
319 path="/";
320
321 String origin = future.getOrigin();
322
323 StringBuilder request = new StringBuilder(512);
324 request
325 .append("GET ").append(path).append(" HTTP/1.1\r\n")
326 .append("Host: ").append(future.getURI().getHost()).append(":").append(_future.getURI().getPort()).append("\r\n")
327 .append("Upgrade: websocket\r\n")
328 .append("Connection: Upgrade\r\n")
329 .append("Sec-WebSocket-Key: ")
330 .append(_key).append("\r\n");
331
332 if(origin!=null)
333 request.append("Origin: ").append(origin).append("\r\n");
334
335 request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionD13.VERSION).append("\r\n");
336
337 if (future.getProtocol()!=null)
338 request.append("Sec-WebSocket-Protocol: ").append(future.getProtocol()).append("\r\n");
339
340 if (future.getCookies()!=null && future.getCookies().size()>0)
341 {
342 for (String cookie : future.getCookies().keySet())
343 request
344 .append("Cookie: ")
345 .append(QuotedStringTokenizer.quoteIfNeeded(cookie,HttpFields.__COOKIE_DELIM))
346 .append("=")
347 .append(QuotedStringTokenizer.quoteIfNeeded(future.getCookies().get(cookie),HttpFields.__COOKIE_DELIM))
348 .append("\r\n");
349 }
350
351 request.append("\r\n");
352
353
354
355 try
356 {
357 Buffer handshake = new ByteArrayBuffer(request.toString(),false);
358 int len=handshake.length();
359 if (len!=_endp.flush(handshake))
360 throw new IOException("incomplete");
361 }
362 catch(IOException e)
363 {
364 future.handshakeFailed(e);
365 }
366
367 }
368
369 public Connection handle() throws IOException
370 {
371 while (_endp.isOpen() && !_parser.isComplete())
372 {
373 switch (_parser.parseAvailable())
374 {
375 case -1:
376 _future.handshakeFailed(new IOException("Incomplete handshake response"));
377 return this;
378 case 0:
379 return this;
380 default:
381 break;
382 }
383 }
384 if (_error==null)
385 {
386 if (_accept==null)
387 _error="No Sec-WebSocket-Accept";
388 else if (!WebSocketConnectionD13.hashKey(_key).equals(_accept))
389 _error="Bad Sec-WebSocket-Accept";
390 else
391 {
392 Buffer header=_parser.getHeaderBuffer();
393 MaskGen maskGen=_future.getMaskGen();
394 WebSocketConnectionD13 connection = new WebSocketConnectionD13(_future.getWebSocket(),_endp,_buffers,System.currentTimeMillis(),_future.getMaxIdleTime(),_future.getProtocol(),null,10,maskGen);
395
396 if (header.hasContent())
397 connection.fillBuffersFrom(header);
398 _buffers.returnBuffer(header);
399
400 _future.onConnection(connection);
401
402 return connection;
403 }
404 }
405
406 _endp.close();
407 return this;
408 }
409
410 public boolean isIdle()
411 {
412 return false;
413 }
414
415 public boolean isSuspended()
416 {
417 return false;
418 }
419
420 public void closed()
421 {
422 if (_error!=null)
423 _future.handshakeFailed(new ProtocolException(_error));
424 else
425 _future.handshakeFailed(new EOFException());
426 }
427 }
428 }