1 package org.eclipse.jetty.server.handler;
2
3 import java.io.IOException;
4 import java.net.InetSocketAddress;
5 import java.nio.channels.ClosedChannelException;
6 import java.nio.channels.SelectionKey;
7 import java.nio.channels.SocketChannel;
8 import java.util.Arrays;
9 import java.util.concurrent.ConcurrentHashMap;
10 import java.util.concurrent.ConcurrentMap;
11 import java.util.concurrent.CountDownLatch;
12 import java.util.concurrent.TimeUnit;
13 import javax.servlet.ServletException;
14 import javax.servlet.http.HttpServletRequest;
15 import javax.servlet.http.HttpServletResponse;
16
17 import org.eclipse.jetty.http.HttpMethods;
18 import org.eclipse.jetty.http.HttpParser;
19 import org.eclipse.jetty.io.Buffer;
20 import org.eclipse.jetty.io.ConnectedEndPoint;
21 import org.eclipse.jetty.io.Connection;
22 import org.eclipse.jetty.io.EndPoint;
23 import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
24 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
25 import org.eclipse.jetty.io.nio.SelectorManager;
26 import org.eclipse.jetty.server.Handler;
27 import org.eclipse.jetty.server.HttpConnection;
28 import org.eclipse.jetty.server.Request;
29 import org.eclipse.jetty.server.Server;
30 import org.eclipse.jetty.util.HostMap;
31 import org.eclipse.jetty.util.TypeUtil;
32 import org.eclipse.jetty.util.component.LifeCycle;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35 import org.eclipse.jetty.util.log.Logger;
36 import org.eclipse.jetty.util.thread.ThreadPool;
37
38
39
40
41
42
43 public class ConnectHandler extends HandlerWrapper
44 {
45 private static final Logger LOG = Log.getLogger(ConnectHandler.class);
46
47 private final Logger _logger = Log.getLogger(getClass().getName());
48 private final SelectorManager _selectorManager = new Manager();
49 private volatile int _connectTimeout = 5000;
50 private volatile int _writeTimeout = 30000;
51 private volatile ThreadPool _threadPool;
52 private volatile boolean _privateThreadPool;
53 private HostMap<String> _white = new HostMap<String>();
54 private HostMap<String> _black = new HostMap<String>();
55
56 public ConnectHandler()
57 {
58 this(null);
59 }
60
61 public ConnectHandler(String[] white, String[] black)
62 {
63 this(null, white, black);
64 }
65
66 public ConnectHandler(Handler handler)
67 {
68 setHandler(handler);
69 }
70
71 public ConnectHandler(Handler handler, String[] white, String[] black)
72 {
73 setHandler(handler);
74 set(white, _white);
75 set(black, _black);
76 }
77
78
79
80
81 public int getConnectTimeout()
82 {
83 return _connectTimeout;
84 }
85
86
87
88
89 public void setConnectTimeout(int connectTimeout)
90 {
91 _connectTimeout = connectTimeout;
92 }
93
94
95
96
97 public int getWriteTimeout()
98 {
99 return _writeTimeout;
100 }
101
102
103
104
105 public void setWriteTimeout(int writeTimeout)
106 {
107 _writeTimeout = writeTimeout;
108 }
109
110 @Override
111 public void setServer(Server server)
112 {
113 super.setServer(server);
114
115 server.getContainer().update(this, null, _selectorManager, "selectManager");
116
117 if (_privateThreadPool)
118 server.getContainer().update(this, null, _privateThreadPool, "threadpool", true);
119 else
120 _threadPool = server.getThreadPool();
121 }
122
123
124
125
126 public ThreadPool getThreadPool()
127 {
128 return _threadPool;
129 }
130
131
132
133
134 public void setThreadPool(ThreadPool threadPool)
135 {
136 if (getServer() != null)
137 getServer().getContainer().update(this, _privateThreadPool ? _threadPool : null, threadPool, "threadpool", true);
138 _privateThreadPool = threadPool != null;
139 _threadPool = threadPool;
140 }
141
142 @Override
143 protected void doStart() throws Exception
144 {
145 super.doStart();
146
147 if (_threadPool == null)
148 {
149 _threadPool = getServer().getThreadPool();
150 _privateThreadPool = false;
151 }
152 if (_threadPool instanceof LifeCycle && !((LifeCycle)_threadPool).isRunning())
153 ((LifeCycle)_threadPool).start();
154
155 _selectorManager.start();
156 }
157
158 @Override
159 protected void doStop() throws Exception
160 {
161 _selectorManager.stop();
162
163 ThreadPool threadPool = _threadPool;
164 if (_privateThreadPool && _threadPool != null && threadPool instanceof LifeCycle)
165 ((LifeCycle)threadPool).stop();
166
167 super.doStop();
168 }
169
170 @Override
171 public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
172 {
173 if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod()))
174 {
175 _logger.debug("CONNECT request for {}", request.getRequestURI());
176 try
177 {
178 handleConnect(baseRequest, request, response, request.getRequestURI());
179 }
180 catch(Exception e)
181 {
182 _logger.warn("ConnectHandler "+baseRequest.getUri()+" "+ e);
183 _logger.debug(e);
184 }
185 }
186 else
187 {
188 super.handle(target, baseRequest, request, response);
189 }
190 }
191
192
193
194
195
196
197
198
199
200
201
202
203
204 protected void handleConnect(Request baseRequest, HttpServletRequest request, HttpServletResponse response, String serverAddress) throws ServletException, IOException
205 {
206 boolean proceed = handleAuthentication(request, response, serverAddress);
207 if (!proceed)
208 return;
209
210 String host = serverAddress;
211 int port = 80;
212 int colon = serverAddress.indexOf(':');
213 if (colon > 0)
214 {
215 host = serverAddress.substring(0, colon);
216 port = Integer.parseInt(serverAddress.substring(colon + 1));
217 }
218
219 if (!validateDestination(host))
220 {
221 LOG.info("ProxyHandler: Forbidden destination " + host);
222 response.setStatus(HttpServletResponse.SC_FORBIDDEN);
223 baseRequest.setHandled(true);
224 return;
225 }
226
227 SocketChannel channel = connectToServer(request, host, port);
228
229
230
231
232
233
234 HttpConnection httpConnection = HttpConnection.getCurrentConnection();
235 Buffer headerBuffer = ((HttpParser)httpConnection.getParser()).getHeaderBuffer();
236 Buffer bodyBuffer = ((HttpParser)httpConnection.getParser()).getBodyBuffer();
237 int length = headerBuffer == null ? 0 : headerBuffer.length();
238 length += bodyBuffer == null ? 0 : bodyBuffer.length();
239 IndirectNIOBuffer buffer = null;
240 if (length > 0)
241 {
242 buffer = new IndirectNIOBuffer(length);
243 if (headerBuffer != null)
244 {
245 buffer.put(headerBuffer);
246 headerBuffer.clear();
247 }
248 if (bodyBuffer != null)
249 {
250 buffer.put(bodyBuffer);
251 bodyBuffer.clear();
252 }
253 }
254
255 ConcurrentMap<String, Object> context = new ConcurrentHashMap<String, Object>();
256 prepareContext(request, context);
257
258 ClientToProxyConnection clientToProxy = prepareConnections(context, channel, buffer);
259
260
261 response.setStatus(HttpServletResponse.SC_OK);
262
263
264 baseRequest.getConnection().getGenerator().setPersistent(true);
265
266
267 response.getOutputStream().close();
268
269 upgradeConnection(request, response, clientToProxy);
270 }
271
272 private ClientToProxyConnection prepareConnections(ConcurrentMap<String, Object> context, SocketChannel channel, Buffer buffer)
273 {
274 HttpConnection httpConnection = HttpConnection.getCurrentConnection();
275 ProxyToServerConnection proxyToServer = newProxyToServerConnection(context, buffer);
276 ClientToProxyConnection clientToProxy = newClientToProxyConnection(context, channel, httpConnection.getEndPoint(), httpConnection.getTimeStamp());
277 clientToProxy.setConnection(proxyToServer);
278 proxyToServer.setConnection(clientToProxy);
279 return clientToProxy;
280 }
281
282
283
284
285
286
287
288
289
290
291
292
293 protected boolean handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address) throws ServletException, IOException
294 {
295 return true;
296 }
297
298 protected ClientToProxyConnection newClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timeStamp)
299 {
300 return new ClientToProxyConnection(context, channel, endPoint, timeStamp);
301 }
302
303 protected ProxyToServerConnection newProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer buffer)
304 {
305 return new ProxyToServerConnection(context, buffer);
306 }
307
308 private SocketChannel connectToServer(HttpServletRequest request, String host, int port) throws IOException
309 {
310 SocketChannel channel = connect(request, host, port);
311 channel.configureBlocking(false);
312 return channel;
313 }
314
315
316
317
318
319
320
321
322
323
324 protected SocketChannel connect(HttpServletRequest request, String host, int port) throws IOException
325 {
326 SocketChannel channel = SocketChannel.open();
327 try
328 {
329
330 _logger.debug("Establishing connection to {}:{}", host, port);
331 channel.socket().setTcpNoDelay(true);
332 channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout());
333 _logger.debug("Established connection to {}:{}", host, port);
334 return channel;
335 }
336 catch (IOException x)
337 {
338 _logger.debug("Failed to establish connection to " + host + ":" + port, x);
339 try
340 {
341 channel.close();
342 }
343 catch (IOException xx)
344 {
345 LOG.ignore(xx);
346 }
347 throw x;
348 }
349 }
350
351 protected void prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context)
352 {
353 }
354
355 private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection) throws IOException
356 {
357
358
359 request.setAttribute("org.eclipse.jetty.io.Connection", connection);
360 response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
361 _logger.debug("Upgraded connection to {}", connection);
362 }
363
364 private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException
365 {
366 _selectorManager.register(channel, proxyToServer);
367 proxyToServer.waitReady(_connectTimeout);
368 }
369
370
371
372
373
374
375
376
377
378
379
380 protected int read(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
381 {
382 return endPoint.fill(buffer);
383 }
384
385
386
387
388
389
390
391
392
393
394 protected int write(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
395 {
396 if (buffer == null)
397 return 0;
398
399 int length = buffer.length();
400 StringBuilder builder = new StringBuilder();
401 int written = endPoint.flush(buffer);
402 builder.append(written);
403 buffer.compact();
404 if (!endPoint.isBlocking())
405 {
406 while (buffer.space() == 0)
407 {
408 boolean ready = endPoint.blockWritable(getWriteTimeout());
409 if (!ready)
410 throw new IOException("Write timeout");
411
412 written = endPoint.flush(buffer);
413 builder.append("+").append(written);
414 buffer.compact();
415 }
416 }
417 _logger.debug("Written {}/{} bytes {}", builder, length, endPoint);
418 return length;
419 }
420
421 private class Manager extends SelectorManager
422 {
423 @Override
424 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey selectionKey) throws IOException
425 {
426 SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey);
427 endp.setMaxIdleTime(_writeTimeout);
428 return endp;
429 }
430
431 @Override
432 protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
433 {
434 ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment();
435 proxyToServer.setTimeStamp(System.currentTimeMillis());
436 proxyToServer.setEndPoint(endpoint);
437 return proxyToServer;
438 }
439
440 @Override
441 protected void endPointOpened(SelectChannelEndPoint endpoint)
442 {
443 ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment();
444 proxyToServer.ready();
445 }
446
447 @Override
448 public boolean dispatch(Runnable task)
449 {
450 return _threadPool.dispatch(task);
451 }
452
453 @Override
454 protected void endPointClosed(SelectChannelEndPoint endpoint)
455 {
456 }
457
458 @Override
459 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
460 {
461 }
462 }
463
464 public class ProxyToServerConnection implements Connection
465 {
466 private final CountDownLatch _ready = new CountDownLatch(1);
467 private final Buffer _buffer = new IndirectNIOBuffer(1024);
468 private final ConcurrentMap<String, Object> _context;
469 private volatile Buffer _data;
470 private volatile ClientToProxyConnection _toClient;
471 private volatile long _timestamp;
472 private volatile SelectChannelEndPoint _endPoint;
473
474 public ProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer data)
475 {
476 _context = context;
477 _data = data;
478 }
479
480 @Override
481 public String toString()
482 {
483 StringBuilder builder = new StringBuilder("ProxyToServer");
484 builder.append("(:").append(_endPoint.getLocalPort());
485 builder.append("<=>:").append(_endPoint.getRemotePort());
486 return builder.append(")").toString();
487 }
488
489 public Connection handle() throws IOException
490 {
491 _logger.debug("{}: begin reading from server", this);
492 try
493 {
494 writeData();
495
496 while (true)
497 {
498 int read = read(_endPoint, _buffer, _context);
499
500 if (read == -1)
501 {
502 _logger.debug("{}: server closed connection {}", this, _endPoint);
503
504 if (_endPoint.isOutputShutdown() || !_endPoint.isOpen())
505 closeClient();
506 else
507 _toClient.shutdownOutput();
508
509 break;
510 }
511
512 if (read == 0)
513 break;
514
515 _logger.debug("{}: read from server {} bytes {}", this, read, _endPoint);
516 int written = write(_toClient._endPoint, _buffer, _context);
517 _logger.debug("{}: written to {} {} bytes", this, _toClient, written);
518 }
519 return this;
520 }
521 catch (ClosedChannelException x)
522 {
523 _logger.debug(x);
524 throw x;
525 }
526 catch (IOException x)
527 {
528 _logger.warn(this + ": unexpected exception", x);
529 close();
530 throw x;
531 }
532 catch (RuntimeException x)
533 {
534 _logger.warn(this + ": unexpected exception", x);
535 close();
536 throw x;
537 }
538 finally
539 {
540 _logger.debug("{}: end reading from server", this);
541 }
542 }
543
544 private void writeData() throws IOException
545 {
546
547
548
549 synchronized (this)
550 {
551 if (_data != null)
552 {
553 try
554 {
555 int written = write(_endPoint, _data, _context);
556 _logger.debug("{}: written to server {} bytes", this, written);
557 }
558 finally
559 {
560
561
562
563 _data = null;
564 }
565 }
566 }
567 }
568
569 public void setConnection(ClientToProxyConnection connection)
570 {
571 _toClient = connection;
572 }
573
574 public long getTimeStamp()
575 {
576 return _timestamp;
577 }
578
579 public void setTimeStamp(long timestamp)
580 {
581 _timestamp = timestamp;
582 }
583
584 public void setEndPoint(SelectChannelEndPoint endpoint)
585 {
586 _endPoint = endpoint;
587 }
588
589 public boolean isIdle()
590 {
591 return false;
592 }
593
594 public boolean isSuspended()
595 {
596 return false;
597 }
598
599 public void closed()
600 {
601 }
602
603 public void ready()
604 {
605 _ready.countDown();
606 }
607
608 public void waitReady(long timeout) throws IOException
609 {
610 try
611 {
612 _ready.await(timeout, TimeUnit.MILLISECONDS);
613 }
614 catch (final InterruptedException x)
615 {
616 throw new IOException()
617 {{
618 initCause(x);
619 }};
620 }
621 }
622
623 public void closeClient() throws IOException
624 {
625 _toClient.closeClient();
626 }
627
628 public void closeServer() throws IOException
629 {
630 _endPoint.close();
631 }
632
633 public void close()
634 {
635 try
636 {
637 closeClient();
638 }
639 catch (IOException x)
640 {
641 _logger.debug(this + ": unexpected exception closing the client", x);
642 }
643
644 try
645 {
646 closeServer();
647 }
648 catch (IOException x)
649 {
650 _logger.debug(this + ": unexpected exception closing the server", x);
651 }
652 }
653
654 public void shutdownOutput() throws IOException
655 {
656 writeData();
657 _endPoint.shutdownOutput();
658 }
659
660 public void idleExpired()
661 {
662 try
663 {
664 shutdownOutput();
665 }
666 catch(Exception e)
667 {
668 LOG.debug(e);
669 close();
670 }
671 }
672 }
673
674 public class ClientToProxyConnection implements Connection
675 {
676 private final Buffer _buffer = new IndirectNIOBuffer(1024);
677 private final ConcurrentMap<String, Object> _context;
678 private final SocketChannel _channel;
679 private final EndPoint _endPoint;
680 private final long _timestamp;
681 private volatile ProxyToServerConnection _toServer;
682 private boolean _firstTime = true;
683
684 public ClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timestamp)
685 {
686 _context = context;
687 _channel = channel;
688 _endPoint = endPoint;
689 _timestamp = timestamp;
690 }
691
692 @Override
693 public String toString()
694 {
695 StringBuilder builder = new StringBuilder("ClientToProxy");
696 builder.append("(:").append(_endPoint.getLocalPort());
697 builder.append("<=>:").append(_endPoint.getRemotePort());
698 return builder.append(")").toString();
699 }
700
701 public Connection handle() throws IOException
702 {
703 _logger.debug("{}: begin reading from client", this);
704 try
705 {
706 if (_firstTime)
707 {
708 _firstTime = false;
709 register(_channel, _toServer);
710 _logger.debug("{}: registered channel {} with connection {}", this, _channel, _toServer);
711 }
712
713 while (true)
714 {
715 int read = read(_endPoint, _buffer, _context);
716
717 if (read == -1)
718 {
719 _logger.debug("{}: client closed connection {}", this, _endPoint);
720
721 if (_endPoint.isOutputShutdown() || !_endPoint.isOpen())
722 closeServer();
723 else
724 _toServer.shutdownOutput();
725
726 break;
727 }
728
729 if (read == 0)
730 break;
731
732 _logger.debug("{}: read from client {} bytes {}", this, read, _endPoint);
733 int written = write(_toServer._endPoint, _buffer, _context);
734 _logger.debug("{}: written to {} {} bytes", this, _toServer, written);
735 }
736 return this;
737 }
738 catch (ClosedChannelException x)
739 {
740 _logger.debug(x);
741 closeServer();
742 throw x;
743 }
744 catch (IOException x)
745 {
746 _logger.warn(this + ": unexpected exception", x);
747 close();
748 throw x;
749 }
750 catch (RuntimeException x)
751 {
752 _logger.warn(this + ": unexpected exception", x);
753 close();
754 throw x;
755 }
756 finally
757 {
758 _logger.debug("{}: end reading from client", this);
759 }
760 }
761
762 public long getTimeStamp()
763 {
764 return _timestamp;
765 }
766
767 public boolean isIdle()
768 {
769 return false;
770 }
771
772 public boolean isSuspended()
773 {
774 return false;
775 }
776
777 public void closed()
778 {
779 }
780
781 public void setConnection(ProxyToServerConnection connection)
782 {
783 _toServer = connection;
784 }
785
786 public void closeClient() throws IOException
787 {
788 _endPoint.close();
789 }
790
791 public void closeServer() throws IOException
792 {
793 _toServer.closeServer();
794 }
795
796 public void close()
797 {
798 try
799 {
800 closeClient();
801 }
802 catch (IOException x)
803 {
804 _logger.debug(this + ": unexpected exception closing the client", x);
805 }
806
807 try
808 {
809 closeServer();
810 }
811 catch (IOException x)
812 {
813 _logger.debug(this + ": unexpected exception closing the server", x);
814 }
815 }
816
817 public void shutdownOutput() throws IOException
818 {
819 _endPoint.shutdownOutput();
820 }
821
822 public void idleExpired()
823 {
824 try
825 {
826 shutdownOutput();
827 }
828 catch(Exception e)
829 {
830 LOG.debug(e);
831 close();
832 }
833 }
834 }
835
836
837
838
839
840
841 public void addWhite(String entry)
842 {
843 add(entry, _white);
844 }
845
846
847
848
849
850
851 public void addBlack(String entry)
852 {
853 add(entry, _black);
854 }
855
856
857
858
859
860
861 public void setWhite(String[] entries)
862 {
863 set(entries, _white);
864 }
865
866
867
868
869
870
871 public void setBlack(String[] entries)
872 {
873 set(entries, _black);
874 }
875
876
877
878
879
880
881
882
883 protected void set(String[] entries, HostMap<String> hostMap)
884 {
885 hostMap.clear();
886
887 if (entries != null && entries.length > 0)
888 {
889 for (String addrPath : entries)
890 {
891 add(addrPath, hostMap);
892 }
893 }
894 }
895
896
897
898
899
900
901
902
903 private void add(String entry, HostMap<String> hostMap)
904 {
905 if (entry != null && entry.length() > 0)
906 {
907 entry = entry.trim();
908 if (hostMap.get(entry) == null)
909 {
910 hostMap.put(entry, entry);
911 }
912 }
913 }
914
915
916
917
918
919
920
921 public boolean validateDestination(String host)
922 {
923 if (_white.size() > 0)
924 {
925 Object whiteObj = _white.getLazyMatches(host);
926 if (whiteObj == null)
927 {
928 return false;
929 }
930 }
931
932 if (_black.size() > 0)
933 {
934 Object blackObj = _black.getLazyMatches(host);
935 if (blackObj != null)
936 {
937 return false;
938 }
939 }
940
941 return true;
942 }
943
944 @Override
945 public void dump(Appendable out, String indent) throws IOException
946 {
947 dumpThis(out);
948 if (_privateThreadPool)
949 dump(out, indent, Arrays.asList(_threadPool, _selectorManager), TypeUtil.asList(getHandlers()), getBeans());
950 else
951 dump(out, indent, Arrays.asList(_selectorManager), TypeUtil.asList(getHandlers()), getBeans());
952 }
953 }