1 package org.eclipse.jetty.server.handler;
2
3 import java.io.IOException;
4 import java.net.InetSocketAddress;
5 import java.nio.channels.ClosedChannelException;
6 import java.nio.channels.SelectionKey;
7 import java.nio.channels.SocketChannel;
8 import java.util.concurrent.ConcurrentHashMap;
9 import java.util.concurrent.ConcurrentMap;
10 import java.util.concurrent.CountDownLatch;
11 import java.util.concurrent.TimeUnit;
12 import javax.servlet.ServletException;
13 import javax.servlet.http.HttpServletRequest;
14 import javax.servlet.http.HttpServletResponse;
15
16 import org.eclipse.jetty.http.HttpMethods;
17 import org.eclipse.jetty.http.HttpParser;
18 import org.eclipse.jetty.io.Buffer;
19 import org.eclipse.jetty.io.ConnectedEndPoint;
20 import org.eclipse.jetty.io.Connection;
21 import org.eclipse.jetty.io.EndPoint;
22 import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
23 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
24 import org.eclipse.jetty.io.nio.SelectorManager;
25 import org.eclipse.jetty.server.Handler;
26 import org.eclipse.jetty.server.HttpConnection;
27 import org.eclipse.jetty.server.Request;
28 import org.eclipse.jetty.server.Server;
29 import org.eclipse.jetty.util.component.LifeCycle;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32 import org.eclipse.jetty.util.thread.ThreadPool;
33
34
35
36
37
38
39
40
41
42
43 public class ProxyHandler extends HandlerWrapper
44 {
45 private final Logger _logger = Log.getLogger(getClass().getName());
46 private final SelectorManager _selectorManager = new Manager();
47 private volatile int _connectTimeout = 5000;
48 private volatile int _writeTimeout = 30000;
49 private volatile ThreadPool _threadPool;
50 private volatile boolean _privateThreadPool;
51
52 public ProxyHandler()
53 {
54 this(null);
55 }
56
57 public ProxyHandler(Handler handler)
58 {
59 setHandler(handler);
60 }
61
62
63
64
65 public int getConnectTimeout()
66 {
67 return _connectTimeout;
68 }
69
70
71
72
73 public void setConnectTimeout(int connectTimeout)
74 {
75 _connectTimeout = connectTimeout;
76 }
77
78
79
80
81 public int getWriteTimeout()
82 {
83 return _writeTimeout;
84 }
85
86
87
88
89 public void setWriteTimeout(int writeTimeout)
90 {
91 _writeTimeout = writeTimeout;
92 }
93
94 @Override
95 public void setServer(Server server)
96 {
97 super.setServer(server);
98
99 server.getContainer().update(this,null,_selectorManager,"selectManager");
100
101 if (_privateThreadPool)
102 server.getContainer().update(this,null,_privateThreadPool,"threadpool",true);
103 else
104 _threadPool=server.getThreadPool();
105 }
106
107
108
109
110 public ThreadPool getThreadPool()
111 {
112 return _threadPool;
113 }
114
115
116
117
118 public void setThreadPool(ThreadPool threadPool)
119 {
120 if (getServer()!=null)
121 getServer().getContainer().update(this,_privateThreadPool?_threadPool:null,threadPool,"threadpool",true);
122 _privateThreadPool=threadPool!=null;
123 _threadPool=threadPool;
124 }
125
126 @Override
127 protected void doStart() throws Exception
128 {
129 super.doStart();
130
131 if (_threadPool==null)
132 {
133 _threadPool=getServer().getThreadPool();
134 _privateThreadPool=false;
135 }
136 if (_threadPool instanceof LifeCycle && !((LifeCycle)_threadPool).isRunning())
137 ((LifeCycle)_threadPool).start();
138
139 _selectorManager.start();
140 _threadPool.dispatch(new Runnable()
141 {
142 public void run()
143 {
144 while (isRunning())
145 {
146 try
147 {
148 _selectorManager.doSelect(0);
149 }
150 catch (IOException x)
151 {
152 _logger.warn("Unexpected exception", x);
153 }
154 }
155 }
156 });
157 }
158
159 @Override
160 protected void doStop() throws Exception
161 {
162 _selectorManager.stop();
163
164 ThreadPool threadPool = _threadPool;
165 if (_privateThreadPool && _threadPool != null && threadPool instanceof LifeCycle)
166 ((LifeCycle)threadPool).stop();
167
168 super.doStop();
169 }
170
171 @Override
172 public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
173 {
174 if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod()))
175 {
176 _logger.debug("CONNECT request for {}", request.getRequestURI());
177 handleConnect(baseRequest, request, response, request.getRequestURI());
178 }
179 else
180 {
181 super.handle(target, baseRequest, request, response);
182 }
183 }
184
185
186
187
188
189
190
191
192
193
194
195
196
197 protected void handleConnect(Request baseRequest, HttpServletRequest request, HttpServletResponse response, String serverAddress) throws ServletException, IOException
198 {
199 boolean proceed = handleAuthentication(request, response, serverAddress);
200 if (!proceed)
201 return;
202
203 String host = serverAddress;
204 int port = 80;
205 int colon = serverAddress.indexOf(':');
206 if (colon > 0)
207 {
208 host = serverAddress.substring(0, colon);
209 port = Integer.parseInt(serverAddress.substring(colon + 1));
210 }
211
212 SocketChannel channel = connectToServer(request, host, port);
213
214
215
216
217
218
219 HttpConnection httpConnection = HttpConnection.getCurrentConnection();
220 Buffer headerBuffer = ((HttpParser)httpConnection.getParser()).getHeaderBuffer();
221 Buffer bodyBuffer = ((HttpParser)httpConnection.getParser()).getBodyBuffer();
222 int length = headerBuffer == null ? 0 : headerBuffer.length();
223 length += bodyBuffer == null ? 0 : bodyBuffer.length();
224 IndirectNIOBuffer buffer = null;
225 if (length > 0)
226 {
227 buffer = new IndirectNIOBuffer(length);
228 if (headerBuffer != null)
229 {
230 buffer.put(headerBuffer);
231 headerBuffer.clear();
232 }
233 if (bodyBuffer != null)
234 {
235 buffer.put(bodyBuffer);
236 bodyBuffer.clear();
237 }
238 }
239
240 ConcurrentMap<String, Object> context = new ConcurrentHashMap<String, Object>();
241 prepareContext(request, context);
242
243 ClientToProxyConnection clientToProxy = prepareConnections(context, channel, buffer);
244
245
246 response.setStatus(HttpServletResponse.SC_OK);
247
248
249 baseRequest.getConnection().getGenerator().setPersistent(true);
250
251
252 response.getOutputStream().close();
253
254 upgradeConnection(request, response, clientToProxy);
255 }
256
257 private ClientToProxyConnection prepareConnections(ConcurrentMap<String, Object> context, SocketChannel channel, Buffer buffer)
258 {
259 HttpConnection httpConnection = HttpConnection.getCurrentConnection();
260 ProxyToServerConnection proxyToServer = newProxyToServerConnection(context, buffer);
261 ClientToProxyConnection clientToProxy = newClientToProxyConnection(context, channel, httpConnection.getEndPoint(), httpConnection.getTimeStamp());
262 clientToProxy.setConnection(proxyToServer);
263 proxyToServer.setConnection(clientToProxy);
264 return clientToProxy;
265 }
266
267
268
269
270
271
272
273
274
275
276
277
278 protected boolean handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address) throws ServletException, IOException
279 {
280 return true;
281 }
282
283 protected ClientToProxyConnection newClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timeStamp)
284 {
285 return new ClientToProxyConnection(context, channel, endPoint, timeStamp);
286 }
287
288 protected ProxyToServerConnection newProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer buffer)
289 {
290 return new ProxyToServerConnection(context, buffer);
291 }
292
293 private SocketChannel connectToServer(HttpServletRequest request, String host, int port) throws IOException
294 {
295 SocketChannel channel = connect(request, host, port);
296 channel.configureBlocking(false);
297 return channel;
298 }
299
300
301
302
303
304
305
306
307
308 protected SocketChannel connect(HttpServletRequest request, String host, int port) throws IOException
309 {
310 _logger.debug("Establishing connection to {}:{}", host, port);
311
312 SocketChannel channel = SocketChannel.open();
313 channel.socket().setTcpNoDelay(true);
314 channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout());
315 _logger.debug("Established connection to {}:{}", host, port);
316 return channel;
317 }
318
319 protected void prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context)
320 {
321 }
322
323 private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection) throws IOException
324 {
325
326
327 request.setAttribute("org.eclipse.jetty.io.Connection", connection);
328 response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
329 _logger.debug("Upgraded connection to {}", connection);
330 }
331
332 private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException
333 {
334 _selectorManager.register(channel, proxyToServer);
335 proxyToServer.waitReady(_connectTimeout);
336 }
337
338
339
340
341
342
343
344
345
346
347 protected int read(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
348 {
349 return endPoint.fill(buffer);
350 }
351
352
353
354
355
356
357
358
359
360
361 protected int write(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
362 {
363 if (buffer == null)
364 return 0;
365
366 int length = buffer.length();
367 StringBuilder builder = new StringBuilder();
368 int written = endPoint.flush(buffer);
369 builder.append(written);
370 buffer.compact();
371 if (!endPoint.isBlocking())
372 {
373 while (buffer.space() == 0)
374 {
375 boolean ready = endPoint.blockWritable(getWriteTimeout());
376 if (!ready)
377 throw new IOException("Write timeout");
378
379 written = endPoint.flush(buffer);
380 builder.append("+").append(written);
381 buffer.compact();
382 }
383 }
384 _logger.debug("Written {}/{} bytes {}", builder, length, endPoint);
385 return length;
386 }
387
388 private class Manager extends SelectorManager
389 {
390 @Override
391 protected SocketChannel acceptChannel(SelectionKey key) throws IOException
392 {
393
394 throw new IllegalStateException();
395 }
396
397 @Override
398 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey selectionKey) throws IOException
399 {
400 return new SelectChannelEndPoint(channel, selectSet, selectionKey);
401 }
402
403 @Override
404 protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
405 {
406 ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment();
407 proxyToServer.setTimeStamp(System.currentTimeMillis());
408 proxyToServer.setEndPoint(endpoint);
409 return proxyToServer;
410 }
411
412 @Override
413 protected void endPointOpened(SelectChannelEndPoint endpoint)
414 {
415 ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment();
416 proxyToServer.ready();
417 }
418
419 @Override
420 public boolean dispatch(Runnable task)
421 {
422 return _threadPool.dispatch(task);
423 }
424
425 @Override
426 protected void endPointClosed(SelectChannelEndPoint endpoint)
427 {
428 }
429
430 @Override
431 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
432 {
433 }
434 }
435
436 public class ProxyToServerConnection implements Connection
437 {
438 private final CountDownLatch _ready = new CountDownLatch(1);
439 private final Buffer _buffer = new IndirectNIOBuffer(1024);
440 private final ConcurrentMap<String, Object> _context;
441 private volatile Buffer _data;
442 private volatile ClientToProxyConnection _toClient;
443 private volatile long _timestamp;
444 private volatile SelectChannelEndPoint _endPoint;
445
446 public ProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer data)
447 {
448 _context = context;
449 _data = data;
450 }
451
452 public Connection handle() throws IOException
453 {
454 _logger.debug("ProxyToServer: begin reading from server");
455 try
456 {
457 if (_data != null)
458 {
459 int written = write(_endPoint, _data, _context);
460 _logger.debug("ProxyToServer: written to server {} bytes", written);
461 _data = null;
462 }
463
464 while (true)
465 {
466 int read = read(_endPoint, _buffer, _context);
467
468 if (read == -1)
469 {
470 _logger.debug("ProxyToServer: server closed connection {}", _endPoint);
471 close();
472 break;
473 }
474
475 if (read == 0)
476 break;
477
478 _logger.debug("ProxyToServer: read from server {} bytes {}", read, _endPoint);
479 int written = write(_toClient._endPoint, _buffer, _context);
480 _logger.debug("ProxyToServer: written to client {} bytes", written);
481 }
482 return this;
483 }
484 catch (IOException x)
485 {
486 _logger.warn("ProxyToServer: Unexpected exception", x);
487 close();
488 throw x;
489 }
490 catch (RuntimeException x)
491 {
492 _logger.warn("ProxyToServer: Unexpected exception", x);
493 close();
494 throw x;
495 }
496 finally
497 {
498 _logger.debug("ProxyToServer: end reading from server");
499 }
500 }
501
502 public void setConnection(ClientToProxyConnection connection)
503 {
504 _toClient = connection;
505 }
506
507 public long getTimeStamp()
508 {
509 return _timestamp;
510 }
511
512 public void setTimeStamp(long timestamp)
513 {
514 _timestamp = timestamp;
515 }
516
517 public void setEndPoint(SelectChannelEndPoint endpoint)
518 {
519 _endPoint = endpoint;
520 _logger.debug("ProxyToServer: {}", _endPoint);
521 }
522
523 public boolean isIdle()
524 {
525 return false;
526 }
527
528 public boolean isSuspended()
529 {
530 return false;
531 }
532
533 public void ready()
534 {
535 _ready.countDown();
536 }
537
538 public void waitReady(long timeout) throws IOException
539 {
540 try
541 {
542 _ready.await(timeout, TimeUnit.MILLISECONDS);
543 }
544 catch (final InterruptedException x)
545 {
546 throw new IOException(){{initCause(x);}};
547 }
548 }
549
550 public void closeClient() throws IOException
551 {
552 _toClient.closeClient();
553 }
554
555 public void closeServer() throws IOException
556 {
557 _endPoint.close();
558 }
559
560 public void close()
561 {
562 try
563 {
564 closeClient();
565 }
566 catch (IOException x)
567 {
568 _logger.debug("ProxyToServer: Unexpected exception closing the client", x);
569 }
570
571 try
572 {
573 closeServer();
574 }
575 catch (IOException x)
576 {
577 _logger.debug("ProxyToServer: Unexpected exception closing the server", x);
578 }
579 }
580 }
581
582 public class ClientToProxyConnection implements Connection
583 {
584 private final Buffer _buffer = new IndirectNIOBuffer(1024);
585 private final ConcurrentMap<String, Object> _context;
586 private final SocketChannel _channel;
587 private final EndPoint _endPoint;
588 private final long _timestamp;
589 private volatile ProxyToServerConnection _toServer;
590 private boolean _firstTime = true;
591
592 public ClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timestamp)
593 {
594 _context = context;
595 _channel = channel;
596 _endPoint = endPoint;
597 _timestamp = timestamp;
598 _logger.debug("ClientToProxy: {}", _endPoint);
599 }
600
601 public Connection handle() throws IOException
602 {
603 _logger.debug("ClientToProxy: begin reading from client");
604 try
605 {
606 if (_firstTime)
607 {
608 _firstTime = false;
609 register(_channel, _toServer);
610 _logger.debug("ClientToProxy: registered channel {} with connection {}", _channel, _toServer);
611 }
612
613 while (true)
614 {
615 int read = read(_endPoint, _buffer, _context);
616
617 if (read == -1)
618 {
619 _logger.debug("ClientToProxy: client closed connection {}", _endPoint);
620 close();
621 break;
622 }
623
624 if (read == 0)
625 break;
626
627 _logger.debug("ClientToProxy: read from client {} bytes {}", read, _endPoint);
628 int written = write(_toServer._endPoint, _buffer, _context);
629 _logger.debug("ClientToProxy: written to server {} bytes", written);
630 }
631 return this;
632 }
633 catch (ClosedChannelException x)
634 {
635 _logger.debug("ClientToProxy",x);
636 closeServer();
637 throw x;
638 }
639 catch (IOException x)
640 {
641 _logger.warn("ClientToProxy", x);
642 close();
643 throw x;
644 }
645 catch (RuntimeException x)
646 {
647 _logger.warn("ClientToProxy", x);
648 close();
649 throw x;
650 }
651 finally
652 {
653 _logger.debug("ClientToProxy: end reading from client");
654 }
655 }
656
657 public long getTimeStamp()
658 {
659 return _timestamp;
660 }
661
662 public boolean isIdle()
663 {
664 return false;
665 }
666
667 public boolean isSuspended()
668 {
669 return false;
670 }
671
672 public void setConnection(ProxyToServerConnection connection)
673 {
674 _toServer = connection;
675 }
676
677 public void closeClient() throws IOException
678 {
679 _endPoint.close();
680 }
681
682 public void closeServer() throws IOException
683 {
684 _toServer.closeServer();
685 }
686
687 public void close()
688 {
689 try
690 {
691 closeClient();
692 }
693 catch (IOException x)
694 {
695 _logger.debug("ClientToProxy: Unexpected exception closing the client", x);
696 }
697
698 try
699 {
700 closeServer();
701 }
702 catch (IOException x)
703 {
704 _logger.debug("ClientToProxy: Unexpected exception closing the server", x);
705 }
706 }
707 }
708 }