1 package org.eclipse.jetty.server.handler;
2
3 import java.io.IOException;
4 import java.net.InetSocketAddress;
5 import java.nio.channels.ClosedChannelException;
6 import java.nio.channels.SelectionKey;
7 import java.nio.channels.SocketChannel;
8 import java.util.Arrays;
9 import java.util.concurrent.ConcurrentHashMap;
10 import java.util.concurrent.ConcurrentMap;
11 import java.util.concurrent.CountDownLatch;
12 import java.util.concurrent.TimeUnit;
13 import javax.servlet.ServletException;
14 import javax.servlet.http.HttpServletRequest;
15 import javax.servlet.http.HttpServletResponse;
16
17 import org.eclipse.jetty.http.HttpMethods;
18 import org.eclipse.jetty.http.HttpParser;
19 import org.eclipse.jetty.io.Buffer;
20 import org.eclipse.jetty.io.ConnectedEndPoint;
21 import org.eclipse.jetty.io.Connection;
22 import org.eclipse.jetty.io.EndPoint;
23 import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
24 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
25 import org.eclipse.jetty.io.nio.SelectorManager;
26 import org.eclipse.jetty.server.Handler;
27 import org.eclipse.jetty.server.HttpConnection;
28 import org.eclipse.jetty.server.Request;
29 import org.eclipse.jetty.server.Server;
30 import org.eclipse.jetty.util.HostMap;
31 import org.eclipse.jetty.util.TypeUtil;
32 import org.eclipse.jetty.util.component.LifeCycle;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35 import org.eclipse.jetty.util.thread.ThreadPool;
36
37
38
39
40
41
42 public class ConnectHandler extends HandlerWrapper
43 {
44 private final Logger _logger = Log.getLogger(getClass().getName());
45 private final SelectorManager _selectorManager = new Manager();
46 private volatile int _connectTimeout = 5000;
47 private volatile int _writeTimeout = 30000;
48 private volatile ThreadPool _threadPool;
49 private volatile boolean _privateThreadPool;
50 private HostMap<String> _white = new HostMap<String>();
51 private HostMap<String> _black = new HostMap<String>();
52
53 public ConnectHandler()
54 {
55 this(null);
56 }
57
58 public ConnectHandler(String[] white, String[] black)
59 {
60 this(null, white, black);
61 }
62
63 public ConnectHandler(Handler handler)
64 {
65 setHandler(handler);
66 }
67
68 public ConnectHandler(Handler handler, String[] white, String[] black)
69 {
70 setHandler(handler);
71 set(white, _white);
72 set(black, _black);
73 }
74
75
76
77
78 public int getConnectTimeout()
79 {
80 return _connectTimeout;
81 }
82
83
84
85
86 public void setConnectTimeout(int connectTimeout)
87 {
88 _connectTimeout = connectTimeout;
89 }
90
91
92
93
94 public int getWriteTimeout()
95 {
96 return _writeTimeout;
97 }
98
99
100
101
102 public void setWriteTimeout(int writeTimeout)
103 {
104 _writeTimeout = writeTimeout;
105 }
106
107 @Override
108 public void setServer(Server server)
109 {
110 super.setServer(server);
111
112 server.getContainer().update(this, null, _selectorManager, "selectManager");
113
114 if (_privateThreadPool)
115 server.getContainer().update(this, null, _privateThreadPool, "threadpool", true);
116 else
117 _threadPool = server.getThreadPool();
118 }
119
120
121
122
123 public ThreadPool getThreadPool()
124 {
125 return _threadPool;
126 }
127
128
129
130
131 public void setThreadPool(ThreadPool threadPool)
132 {
133 if (getServer() != null)
134 getServer().getContainer().update(this, _privateThreadPool ? _threadPool : null, threadPool, "threadpool", true);
135 _privateThreadPool = threadPool != null;
136 _threadPool = threadPool;
137 }
138
139 @Override
140 protected void doStart() throws Exception
141 {
142 super.doStart();
143
144 if (_threadPool == null)
145 {
146 _threadPool = getServer().getThreadPool();
147 _privateThreadPool = false;
148 }
149 if (_threadPool instanceof LifeCycle && !((LifeCycle)_threadPool).isRunning())
150 ((LifeCycle)_threadPool).start();
151
152 _selectorManager.start();
153 _threadPool.dispatch(new Runnable()
154 {
155 public void run()
156 {
157 while (isRunning())
158 {
159 try
160 {
161 _selectorManager.doSelect(0);
162 }
163 catch (IOException x)
164 {
165 _logger.warn("Unexpected exception", x);
166 }
167 }
168 }
169 });
170 }
171
172 @Override
173 protected void doStop() throws Exception
174 {
175 _selectorManager.stop();
176
177 ThreadPool threadPool = _threadPool;
178 if (_privateThreadPool && _threadPool != null && threadPool instanceof LifeCycle)
179 ((LifeCycle)threadPool).stop();
180
181 super.doStop();
182 }
183
184 @Override
185 public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
186 {
187 if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod()))
188 {
189 _logger.debug("CONNECT request for {}", request.getRequestURI());
190 try
191 {
192 handleConnect(baseRequest, request, response, request.getRequestURI());
193 }
194 catch(Exception e)
195 {
196 _logger.warn("ConnectHandler "+baseRequest.getUri()+" "+ e);
197 _logger.debug(e);
198 }
199 }
200 else
201 {
202 super.handle(target, baseRequest, request, response);
203 }
204 }
205
206
207
208
209
210
211
212
213
214
215
216
217
218 protected void handleConnect(Request baseRequest, HttpServletRequest request, HttpServletResponse response, String serverAddress) throws ServletException, IOException
219 {
220 boolean proceed = handleAuthentication(request, response, serverAddress);
221 if (!proceed)
222 return;
223
224 String host = serverAddress;
225 int port = 80;
226 int colon = serverAddress.indexOf(':');
227 if (colon > 0)
228 {
229 host = serverAddress.substring(0, colon);
230 port = Integer.parseInt(serverAddress.substring(colon + 1));
231 }
232
233 if (!validateDestination(host))
234 {
235 Log.info("ProxyHandler: Forbidden destination " + host);
236 response.setStatus(HttpServletResponse.SC_FORBIDDEN);
237 baseRequest.setHandled(true);
238 return;
239 }
240
241 SocketChannel channel = connectToServer(request, host, port);
242
243
244
245
246
247
248 HttpConnection httpConnection = HttpConnection.getCurrentConnection();
249 Buffer headerBuffer = ((HttpParser)httpConnection.getParser()).getHeaderBuffer();
250 Buffer bodyBuffer = ((HttpParser)httpConnection.getParser()).getBodyBuffer();
251 int length = headerBuffer == null ? 0 : headerBuffer.length();
252 length += bodyBuffer == null ? 0 : bodyBuffer.length();
253 IndirectNIOBuffer buffer = null;
254 if (length > 0)
255 {
256 buffer = new IndirectNIOBuffer(length);
257 if (headerBuffer != null)
258 {
259 buffer.put(headerBuffer);
260 headerBuffer.clear();
261 }
262 if (bodyBuffer != null)
263 {
264 buffer.put(bodyBuffer);
265 bodyBuffer.clear();
266 }
267 }
268
269 ConcurrentMap<String, Object> context = new ConcurrentHashMap<String, Object>();
270 prepareContext(request, context);
271
272 ClientToProxyConnection clientToProxy = prepareConnections(context, channel, buffer);
273
274
275 response.setStatus(HttpServletResponse.SC_OK);
276
277
278 baseRequest.getConnection().getGenerator().setPersistent(true);
279
280
281 response.getOutputStream().close();
282
283 upgradeConnection(request, response, clientToProxy);
284 }
285
286 private ClientToProxyConnection prepareConnections(ConcurrentMap<String, Object> context, SocketChannel channel, Buffer buffer)
287 {
288 HttpConnection httpConnection = HttpConnection.getCurrentConnection();
289 ProxyToServerConnection proxyToServer = newProxyToServerConnection(context, buffer);
290 ClientToProxyConnection clientToProxy = newClientToProxyConnection(context, channel, httpConnection.getEndPoint(), httpConnection.getTimeStamp());
291 clientToProxy.setConnection(proxyToServer);
292 proxyToServer.setConnection(clientToProxy);
293 return clientToProxy;
294 }
295
296
297
298
299
300
301
302
303
304
305
306
307 protected boolean handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address) throws ServletException, IOException
308 {
309 return true;
310 }
311
312 protected ClientToProxyConnection newClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timeStamp)
313 {
314 return new ClientToProxyConnection(context, channel, endPoint, timeStamp);
315 }
316
317 protected ProxyToServerConnection newProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer buffer)
318 {
319 return new ProxyToServerConnection(context, buffer);
320 }
321
322 private SocketChannel connectToServer(HttpServletRequest request, String host, int port) throws IOException
323 {
324 SocketChannel channel = connect(request, host, port);
325 channel.configureBlocking(false);
326 return channel;
327 }
328
329
330
331
332
333
334
335
336
337
338 protected SocketChannel connect(HttpServletRequest request, String host, int port) throws IOException
339 {
340 SocketChannel channel = SocketChannel.open();
341 try
342 {
343
344 _logger.debug("Establishing connection to {}:{}", host, port);
345 channel.socket().setTcpNoDelay(true);
346 channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout());
347 _logger.debug("Established connection to {}:{}", host, port);
348 return channel;
349 }
350 catch (IOException x)
351 {
352 _logger.debug("Failed to establish connection to " + host + ":" + port, x);
353 try
354 {
355 channel.close();
356 }
357 catch (IOException xx)
358 {
359 Log.ignore(xx);
360 }
361 throw x;
362 }
363 }
364
365 protected void prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context)
366 {
367 }
368
369 private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection) throws IOException
370 {
371
372
373 request.setAttribute("org.eclipse.jetty.io.Connection", connection);
374 response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
375 _logger.debug("Upgraded connection to {}", connection);
376 }
377
378 private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException
379 {
380 _selectorManager.register(channel, proxyToServer);
381 proxyToServer.waitReady(_connectTimeout);
382 }
383
384
385
386
387
388
389
390
391
392
393
394 protected int read(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
395 {
396 return endPoint.fill(buffer);
397 }
398
399
400
401
402
403
404
405
406
407
408 protected int write(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
409 {
410 if (buffer == null)
411 return 0;
412
413 int length = buffer.length();
414 StringBuilder builder = new StringBuilder();
415 int written = endPoint.flush(buffer);
416 builder.append(written);
417 buffer.compact();
418 if (!endPoint.isBlocking())
419 {
420 while (buffer.space() == 0)
421 {
422 boolean ready = endPoint.blockWritable(getWriteTimeout());
423 if (!ready)
424 throw new IOException("Write timeout");
425
426 written = endPoint.flush(buffer);
427 builder.append("+").append(written);
428 buffer.compact();
429 }
430 }
431 _logger.debug("Written {}/{} bytes {}", builder, length, endPoint);
432 return length;
433 }
434
435 private class Manager extends SelectorManager
436 {
437 @Override
438 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey selectionKey) throws IOException
439 {
440 SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey);
441 endp.setMaxIdleTime(_writeTimeout);
442 return endp;
443 }
444
445 @Override
446 protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
447 {
448 ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment();
449 proxyToServer.setTimeStamp(System.currentTimeMillis());
450 proxyToServer.setEndPoint(endpoint);
451 return proxyToServer;
452 }
453
454 @Override
455 protected void endPointOpened(SelectChannelEndPoint endpoint)
456 {
457 ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment();
458 proxyToServer.ready();
459 }
460
461 @Override
462 public boolean dispatch(Runnable task)
463 {
464 return _threadPool.dispatch(task);
465 }
466
467 @Override
468 protected void endPointClosed(SelectChannelEndPoint endpoint)
469 {
470 }
471
472 @Override
473 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
474 {
475 }
476 }
477
478 public class ProxyToServerConnection implements Connection
479 {
480 private final CountDownLatch _ready = new CountDownLatch(1);
481 private final Buffer _buffer = new IndirectNIOBuffer(1024);
482 private final ConcurrentMap<String, Object> _context;
483 private volatile Buffer _data;
484 private volatile ClientToProxyConnection _toClient;
485 private volatile long _timestamp;
486 private volatile SelectChannelEndPoint _endPoint;
487
488 public ProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer data)
489 {
490 _context = context;
491 _data = data;
492 }
493
494 @Override
495 public String toString()
496 {
497 StringBuilder builder = new StringBuilder("ProxyToServer");
498 builder.append("(:").append(_endPoint.getLocalPort());
499 builder.append("<=>:").append(_endPoint.getRemotePort());
500 return builder.append(")").toString();
501 }
502
503 public Connection handle() throws IOException
504 {
505 _logger.debug("{}: begin reading from server", this);
506 try
507 {
508 writeData();
509
510 while (true)
511 {
512 int read = read(_endPoint, _buffer, _context);
513
514 if (read == -1)
515 {
516 _logger.debug("{}: server closed connection {}", this, _endPoint);
517
518 if (_endPoint.isOutputShutdown() || !_endPoint.isOpen())
519 closeClient();
520 else
521 _toClient.shutdownOutput();
522
523 break;
524 }
525
526 if (read == 0)
527 break;
528
529 _logger.debug("{}: read from server {} bytes {}", this, read, _endPoint);
530 int written = write(_toClient._endPoint, _buffer, _context);
531 _logger.debug("{}: written to {} {} bytes", this, _toClient, written);
532 }
533 return this;
534 }
535 catch (ClosedChannelException x)
536 {
537 _logger.debug(x);
538 throw x;
539 }
540 catch (IOException x)
541 {
542 _logger.warn(this + ": unexpected exception", x);
543 close();
544 throw x;
545 }
546 catch (RuntimeException x)
547 {
548 _logger.warn(this + ": unexpected exception", x);
549 close();
550 throw x;
551 }
552 finally
553 {
554 _logger.debug("{}: end reading from server", this);
555 }
556 }
557
558 private void writeData() throws IOException
559 {
560
561
562
563 synchronized (this)
564 {
565 if (_data != null)
566 {
567 try
568 {
569 int written = write(_endPoint, _data, _context);
570 _logger.debug("{}: written to server {} bytes", this, written);
571 }
572 finally
573 {
574
575
576
577 _data = null;
578 }
579 }
580 }
581 }
582
583 public void setConnection(ClientToProxyConnection connection)
584 {
585 _toClient = connection;
586 }
587
588 public long getTimeStamp()
589 {
590 return _timestamp;
591 }
592
593 public void setTimeStamp(long timestamp)
594 {
595 _timestamp = timestamp;
596 }
597
598 public void setEndPoint(SelectChannelEndPoint endpoint)
599 {
600 _endPoint = endpoint;
601 }
602
603 public boolean isIdle()
604 {
605 return false;
606 }
607
608 public boolean isSuspended()
609 {
610 return false;
611 }
612
613 public void closed()
614 {
615 }
616
617 public void ready()
618 {
619 _ready.countDown();
620 }
621
622 public void waitReady(long timeout) throws IOException
623 {
624 try
625 {
626 _ready.await(timeout, TimeUnit.MILLISECONDS);
627 }
628 catch (final InterruptedException x)
629 {
630 throw new IOException()
631 {{
632 initCause(x);
633 }};
634 }
635 }
636
637 public void closeClient() throws IOException
638 {
639 _toClient.closeClient();
640 }
641
642 public void closeServer() throws IOException
643 {
644 _endPoint.close();
645 }
646
647 public void close()
648 {
649 try
650 {
651 closeClient();
652 }
653 catch (IOException x)
654 {
655 _logger.debug(this + ": unexpected exception closing the client", x);
656 }
657
658 try
659 {
660 closeServer();
661 }
662 catch (IOException x)
663 {
664 _logger.debug(this + ": unexpected exception closing the server", x);
665 }
666 }
667
668 public void shutdownOutput() throws IOException
669 {
670 writeData();
671 _endPoint.shutdownOutput();
672 }
673
674 public void idleExpired()
675 {
676 try
677 {
678 shutdownOutput();
679 }
680 catch(Exception e)
681 {
682 Log.debug(e);
683 close();
684 }
685 }
686 }
687
688 public class ClientToProxyConnection implements Connection
689 {
690 private final Buffer _buffer = new IndirectNIOBuffer(1024);
691 private final ConcurrentMap<String, Object> _context;
692 private final SocketChannel _channel;
693 private final EndPoint _endPoint;
694 private final long _timestamp;
695 private volatile ProxyToServerConnection _toServer;
696 private boolean _firstTime = true;
697
698 public ClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timestamp)
699 {
700 _context = context;
701 _channel = channel;
702 _endPoint = endPoint;
703 _timestamp = timestamp;
704 }
705
706 @Override
707 public String toString()
708 {
709 StringBuilder builder = new StringBuilder("ClientToProxy");
710 builder.append("(:").append(_endPoint.getLocalPort());
711 builder.append("<=>:").append(_endPoint.getRemotePort());
712 return builder.append(")").toString();
713 }
714
715 public Connection handle() throws IOException
716 {
717 _logger.debug("{}: begin reading from client", this);
718 try
719 {
720 if (_firstTime)
721 {
722 _firstTime = false;
723 register(_channel, _toServer);
724 _logger.debug("{}: registered channel {} with connection {}", this, _channel, _toServer);
725 }
726
727 while (true)
728 {
729 int read = read(_endPoint, _buffer, _context);
730
731 if (read == -1)
732 {
733 _logger.debug("{}: client closed connection {}", this, _endPoint);
734
735 if (_endPoint.isOutputShutdown() || !_endPoint.isOpen())
736 closeServer();
737 else
738 _toServer.shutdownOutput();
739
740 break;
741 }
742
743 if (read == 0)
744 break;
745
746 _logger.debug("{}: read from client {} bytes {}", this, read, _endPoint);
747 int written = write(_toServer._endPoint, _buffer, _context);
748 _logger.debug("{}: written to {} {} bytes", this, _toServer, written);
749 }
750 return this;
751 }
752 catch (ClosedChannelException x)
753 {
754 _logger.debug(x);
755 closeServer();
756 throw x;
757 }
758 catch (IOException x)
759 {
760 _logger.warn(this + ": unexpected exception", x);
761 close();
762 throw x;
763 }
764 catch (RuntimeException x)
765 {
766 _logger.warn(this + ": unexpected exception", x);
767 close();
768 throw x;
769 }
770 finally
771 {
772 _logger.debug("{}: end reading from client", this);
773 }
774 }
775
776 public long getTimeStamp()
777 {
778 return _timestamp;
779 }
780
781 public boolean isIdle()
782 {
783 return false;
784 }
785
786 public boolean isSuspended()
787 {
788 return false;
789 }
790
791 public void closed()
792 {
793 }
794
795 public void setConnection(ProxyToServerConnection connection)
796 {
797 _toServer = connection;
798 }
799
800 public void closeClient() throws IOException
801 {
802 _endPoint.close();
803 }
804
805 public void closeServer() throws IOException
806 {
807 _toServer.closeServer();
808 }
809
810 public void close()
811 {
812 try
813 {
814 closeClient();
815 }
816 catch (IOException x)
817 {
818 _logger.debug(this + ": unexpected exception closing the client", x);
819 }
820
821 try
822 {
823 closeServer();
824 }
825 catch (IOException x)
826 {
827 _logger.debug(this + ": unexpected exception closing the server", x);
828 }
829 }
830
831 public void shutdownOutput() throws IOException
832 {
833 _endPoint.shutdownOutput();
834 }
835
836 public void idleExpired()
837 {
838 try
839 {
840 shutdownOutput();
841 }
842 catch(Exception e)
843 {
844 Log.debug(e);
845 close();
846 }
847 }
848 }
849
850
851
852
853
854
855 public void addWhite(String entry)
856 {
857 add(entry, _white);
858 }
859
860
861
862
863
864
865 public void addBlack(String entry)
866 {
867 add(entry, _black);
868 }
869
870
871
872
873
874
875 public void setWhite(String[] entries)
876 {
877 set(entries, _white);
878 }
879
880
881
882
883
884
885 public void setBlack(String[] entries)
886 {
887 set(entries, _black);
888 }
889
890
891
892
893
894
895
896
897 protected void set(String[] entries, HostMap<String> hostMap)
898 {
899 hostMap.clear();
900
901 if (entries != null && entries.length > 0)
902 {
903 for (String addrPath : entries)
904 {
905 add(addrPath, hostMap);
906 }
907 }
908 }
909
910
911
912
913
914
915
916
917 private void add(String entry, HostMap<String> hostMap)
918 {
919 if (entry != null && entry.length() > 0)
920 {
921 entry = entry.trim();
922 if (hostMap.get(entry) == null)
923 {
924 hostMap.put(entry, entry);
925 }
926 }
927 }
928
929
930
931
932
933
934
935 public boolean validateDestination(String host)
936 {
937 if (_white.size() > 0)
938 {
939 Object whiteObj = _white.getLazyMatches(host);
940 if (whiteObj == null)
941 {
942 return false;
943 }
944 }
945
946 if (_black.size() > 0)
947 {
948 Object blackObj = _black.getLazyMatches(host);
949 if (blackObj != null)
950 {
951 return false;
952 }
953 }
954
955 return true;
956 }
957
958 @Override
959 public void dump(Appendable out, String indent) throws IOException
960 {
961 dumpThis(out);
962 if (_privateThreadPool)
963 dump(out, indent, Arrays.asList(_threadPool, _selectorManager), TypeUtil.asList(getHandlers()), getBeans());
964 else
965 dump(out, indent, Arrays.asList(_selectorManager), TypeUtil.asList(getHandlers()), getBeans());
966 }
967 }