1 package org.eclipse.jetty.server.handler;
2
3 import java.io.IOException;
4 import java.net.InetSocketAddress;
5 import java.nio.channels.ClosedChannelException;
6 import java.nio.channels.SelectionKey;
7 import java.nio.channels.SocketChannel;
8 import java.util.Arrays;
9 import java.util.concurrent.ConcurrentHashMap;
10 import java.util.concurrent.ConcurrentMap;
11 import java.util.concurrent.CountDownLatch;
12 import java.util.concurrent.TimeUnit;
13
14 import javax.servlet.ServletException;
15 import javax.servlet.http.HttpServletRequest;
16 import javax.servlet.http.HttpServletResponse;
17
18 import org.eclipse.jetty.http.HttpMethods;
19 import org.eclipse.jetty.http.HttpParser;
20 import org.eclipse.jetty.io.AsyncEndPoint;
21 import org.eclipse.jetty.io.Buffer;
22 import org.eclipse.jetty.io.ConnectedEndPoint;
23 import org.eclipse.jetty.io.Connection;
24 import org.eclipse.jetty.io.EndPoint;
25 import org.eclipse.jetty.io.nio.AsyncConnection;
26 import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
27 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
28 import org.eclipse.jetty.io.nio.SelectorManager;
29 import org.eclipse.jetty.server.AbstractHttpConnection;
30 import org.eclipse.jetty.server.Handler;
31 import org.eclipse.jetty.server.Request;
32 import org.eclipse.jetty.server.Server;
33 import org.eclipse.jetty.util.HostMap;
34 import org.eclipse.jetty.util.TypeUtil;
35 import org.eclipse.jetty.util.component.LifeCycle;
36 import org.eclipse.jetty.util.log.Log;
37 import org.eclipse.jetty.util.log.Logger;
38 import org.eclipse.jetty.util.thread.ThreadPool;
39
40
41
42
43
44
45 public class ConnectHandler extends HandlerWrapper
46 {
47 private static final Logger LOG = Log.getLogger(ConnectHandler.class);
48
49 private final Logger _logger = Log.getLogger(getClass().getName());
50 private final SelectorManager _selectorManager = new Manager();
51 private volatile int _connectTimeout = 5000;
52 private volatile int _writeTimeout = 30000;
53 private volatile ThreadPool _threadPool;
54 private volatile boolean _privateThreadPool;
55 private HostMap<String> _white = new HostMap<String>();
56 private HostMap<String> _black = new HostMap<String>();
57
58 public ConnectHandler()
59 {
60 this(null);
61 }
62
63 public ConnectHandler(String[] white, String[] black)
64 {
65 this(null, white, black);
66 }
67
68 public ConnectHandler(Handler handler)
69 {
70 setHandler(handler);
71 }
72
73 public ConnectHandler(Handler handler, String[] white, String[] black)
74 {
75 setHandler(handler);
76 set(white, _white);
77 set(black, _black);
78 }
79
80
81
82
83 public int getConnectTimeout()
84 {
85 return _connectTimeout;
86 }
87
88
89
90
91 public void setConnectTimeout(int connectTimeout)
92 {
93 _connectTimeout = connectTimeout;
94 }
95
96
97
98
99 public int getWriteTimeout()
100 {
101 return _writeTimeout;
102 }
103
104
105
106
107 public void setWriteTimeout(int writeTimeout)
108 {
109 _writeTimeout = writeTimeout;
110 }
111
112 @Override
113 public void setServer(Server server)
114 {
115 super.setServer(server);
116
117 server.getContainer().update(this, null, _selectorManager, "selectManager");
118
119 if (_privateThreadPool)
120 server.getContainer().update(this, null, _privateThreadPool, "threadpool", true);
121 else
122 _threadPool = server.getThreadPool();
123 }
124
125
126
127
128 public ThreadPool getThreadPool()
129 {
130 return _threadPool;
131 }
132
133
134
135
136 public void setThreadPool(ThreadPool threadPool)
137 {
138 if (getServer() != null)
139 getServer().getContainer().update(this, _privateThreadPool ? _threadPool : null, threadPool, "threadpool", true);
140 _privateThreadPool = threadPool != null;
141 _threadPool = threadPool;
142 }
143
144 @Override
145 protected void doStart() throws Exception
146 {
147 super.doStart();
148
149 if (_threadPool == null)
150 {
151 _threadPool = getServer().getThreadPool();
152 _privateThreadPool = false;
153 }
154 if (_threadPool instanceof LifeCycle && !((LifeCycle)_threadPool).isRunning())
155 ((LifeCycle)_threadPool).start();
156
157 _selectorManager.start();
158 }
159
160 @Override
161 protected void doStop() throws Exception
162 {
163 _selectorManager.stop();
164
165 ThreadPool threadPool = _threadPool;
166 if (_privateThreadPool && _threadPool != null && threadPool instanceof LifeCycle)
167 ((LifeCycle)threadPool).stop();
168
169 super.doStop();
170 }
171
172 @Override
173 public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
174 {
175 if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod()))
176 {
177 _logger.debug("CONNECT request for {}", request.getRequestURI());
178 try
179 {
180 handleConnect(baseRequest, request, response, request.getRequestURI());
181 }
182 catch(Exception e)
183 {
184 _logger.warn("ConnectHandler "+baseRequest.getUri()+" "+ e);
185 _logger.debug(e);
186 }
187 }
188 else
189 {
190 super.handle(target, baseRequest, request, response);
191 }
192 }
193
194
195
196
197
198
199
200
201
202
203
204
205
206 protected void handleConnect(Request baseRequest, HttpServletRequest request, HttpServletResponse response, String serverAddress) throws ServletException, IOException
207 {
208 boolean proceed = handleAuthentication(request, response, serverAddress);
209 if (!proceed)
210 return;
211
212 String host = serverAddress;
213 int port = 80;
214 int colon = serverAddress.indexOf(':');
215 if (colon > 0)
216 {
217 host = serverAddress.substring(0, colon);
218 port = Integer.parseInt(serverAddress.substring(colon + 1));
219 }
220
221 if (!validateDestination(host))
222 {
223 LOG.info("ProxyHandler: Forbidden destination " + host);
224 response.setStatus(HttpServletResponse.SC_FORBIDDEN);
225 baseRequest.setHandled(true);
226 return;
227 }
228
229 SocketChannel channel = connectToServer(request, host, port);
230
231
232
233
234
235
236 AbstractHttpConnection httpConnection = AbstractHttpConnection.getCurrentConnection();
237 Buffer headerBuffer = ((HttpParser)httpConnection.getParser()).getHeaderBuffer();
238 Buffer bodyBuffer = ((HttpParser)httpConnection.getParser()).getBodyBuffer();
239 int length = headerBuffer == null ? 0 : headerBuffer.length();
240 length += bodyBuffer == null ? 0 : bodyBuffer.length();
241 IndirectNIOBuffer buffer = null;
242 if (length > 0)
243 {
244 buffer = new IndirectNIOBuffer(length);
245 if (headerBuffer != null)
246 {
247 buffer.put(headerBuffer);
248 headerBuffer.clear();
249 }
250 if (bodyBuffer != null)
251 {
252 buffer.put(bodyBuffer);
253 bodyBuffer.clear();
254 }
255 }
256
257 ConcurrentMap<String, Object> context = new ConcurrentHashMap<String, Object>();
258 prepareContext(request, context);
259
260 ClientToProxyConnection clientToProxy = prepareConnections(context, channel, buffer);
261
262
263 response.setStatus(HttpServletResponse.SC_OK);
264
265
266 baseRequest.getConnection().getGenerator().setPersistent(true);
267
268
269 response.getOutputStream().close();
270
271 upgradeConnection(request, response, clientToProxy);
272 }
273
274 private ClientToProxyConnection prepareConnections(ConcurrentMap<String, Object> context, SocketChannel channel, Buffer buffer)
275 {
276 AbstractHttpConnection httpConnection = AbstractHttpConnection.getCurrentConnection();
277 ProxyToServerConnection proxyToServer = newProxyToServerConnection(context, buffer);
278 ClientToProxyConnection clientToProxy = newClientToProxyConnection(context, channel, httpConnection.getEndPoint(), httpConnection.getTimeStamp());
279 clientToProxy.setConnection(proxyToServer);
280 proxyToServer.setConnection(clientToProxy);
281 return clientToProxy;
282 }
283
284
285
286
287
288
289
290
291
292
293
294
295 protected boolean handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address) throws ServletException, IOException
296 {
297 return true;
298 }
299
300 protected ClientToProxyConnection newClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timeStamp)
301 {
302 return new ClientToProxyConnection(context, channel, endPoint, timeStamp);
303 }
304
305 protected ProxyToServerConnection newProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer buffer)
306 {
307 return new ProxyToServerConnection(context, buffer);
308 }
309
310 private SocketChannel connectToServer(HttpServletRequest request, String host, int port) throws IOException
311 {
312 SocketChannel channel = connect(request, host, port);
313 channel.configureBlocking(false);
314 return channel;
315 }
316
317
318
319
320
321
322
323
324
325
326 protected SocketChannel connect(HttpServletRequest request, String host, int port) throws IOException
327 {
328 SocketChannel channel = SocketChannel.open();
329 try
330 {
331
332 _logger.debug("Establishing connection to {}:{}", host, port);
333 channel.socket().setTcpNoDelay(true);
334 channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout());
335 _logger.debug("Established connection to {}:{}", host, port);
336 return channel;
337 }
338 catch (IOException x)
339 {
340 _logger.debug("Failed to establish connection to " + host + ":" + port, x);
341 try
342 {
343 channel.close();
344 }
345 catch (IOException xx)
346 {
347 LOG.ignore(xx);
348 }
349 throw x;
350 }
351 }
352
353 protected void prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context)
354 {
355 }
356
357 private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection) throws IOException
358 {
359
360
361 request.setAttribute("org.eclipse.jetty.io.Connection", connection);
362 response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
363 _logger.debug("Upgraded connection to {}", connection);
364 }
365
366 private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException
367 {
368 _selectorManager.register(channel, proxyToServer);
369 proxyToServer.waitReady(_connectTimeout);
370 }
371
372
373
374
375
376
377
378
379
380
381
382 protected int read(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
383 {
384 return endPoint.fill(buffer);
385 }
386
387
388
389
390
391
392
393
394
395
396 protected int write(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
397 {
398 if (buffer == null)
399 return 0;
400
401 int length = buffer.length();
402 StringBuilder builder = new StringBuilder();
403 int written = endPoint.flush(buffer);
404 builder.append(written);
405 buffer.compact();
406 if (!endPoint.isBlocking())
407 {
408 while (buffer.space() == 0)
409 {
410 boolean ready = endPoint.blockWritable(getWriteTimeout());
411 if (!ready)
412 throw new IOException("Write timeout");
413
414 written = endPoint.flush(buffer);
415 builder.append("+").append(written);
416 buffer.compact();
417 }
418 }
419 _logger.debug("Written {}/{} bytes {}", builder, length, endPoint);
420 return length;
421 }
422
423 private class Manager extends SelectorManager
424 {
425 @Override
426 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
427 {
428 SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, key, channel.socket().getSoTimeout());
429 endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
430 endp.setMaxIdleTime(_writeTimeout);
431 return endp;
432 }
433
434 @Override
435 public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
436 {
437 ProxyToServerConnection proxyToServer = (ProxyToServerConnection)attachment;
438 proxyToServer.setTimeStamp(System.currentTimeMillis());
439 proxyToServer.setEndPoint(endpoint);
440 return proxyToServer;
441 }
442
443 @Override
444 protected void endPointOpened(SelectChannelEndPoint endpoint)
445 {
446 ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment();
447 proxyToServer.ready();
448 }
449
450 @Override
451 public boolean dispatch(Runnable task)
452 {
453 return _threadPool.dispatch(task);
454 }
455
456 @Override
457 protected void endPointClosed(SelectChannelEndPoint endpoint)
458 {
459 }
460
461 @Override
462 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
463 {
464 }
465 }
466
467
468
469 public class ProxyToServerConnection implements AsyncConnection
470 {
471 private final CountDownLatch _ready = new CountDownLatch(1);
472 private final Buffer _buffer = new IndirectNIOBuffer(1024);
473 private final ConcurrentMap<String, Object> _context;
474 private volatile Buffer _data;
475 private volatile ClientToProxyConnection _toClient;
476 private volatile long _timestamp;
477 private volatile AsyncEndPoint _endPoint;
478
479 public ProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer data)
480 {
481 _context = context;
482 _data = data;
483 }
484
485 @Override
486 public String toString()
487 {
488 StringBuilder builder = new StringBuilder("ProxyToServer");
489 builder.append("(:").append(_endPoint.getLocalPort());
490 builder.append("<=>:").append(_endPoint.getRemotePort());
491 return builder.append(")").toString();
492 }
493
494 public Connection handle() throws IOException
495 {
496 _logger.debug("{}: begin reading from server", this);
497 try
498 {
499 writeData();
500
501 while (true)
502 {
503 int read = read(_endPoint, _buffer, _context);
504
505 if (read == -1)
506 {
507 _logger.debug("{}: server closed connection {}", this, _endPoint);
508
509 if (_endPoint.isOutputShutdown() || !_endPoint.isOpen())
510 closeClient();
511 else
512 _toClient.shutdownOutput();
513
514 break;
515 }
516
517 if (read == 0)
518 break;
519
520 _logger.debug("{}: read from server {} bytes {}", this, read, _endPoint);
521 int written = write(_toClient._endPoint, _buffer, _context);
522 _logger.debug("{}: written to {} {} bytes", this, _toClient, written);
523 }
524 return this;
525 }
526 catch (ClosedChannelException x)
527 {
528 _logger.debug(x);
529 throw x;
530 }
531 catch (IOException x)
532 {
533 _logger.warn(this + ": unexpected exception", x);
534 close();
535 throw x;
536 }
537 catch (RuntimeException x)
538 {
539 _logger.warn(this + ": unexpected exception", x);
540 close();
541 throw x;
542 }
543 finally
544 {
545 _logger.debug("{}: end reading from server", this);
546 }
547 }
548
549 public void onInputShutdown() throws IOException
550 {
551
552 }
553
554 private void writeData() throws IOException
555 {
556
557
558
559 synchronized (this)
560 {
561 if (_data != null)
562 {
563 try
564 {
565 int written = write(_endPoint, _data, _context);
566 _logger.debug("{}: written to server {} bytes", this, written);
567 }
568 finally
569 {
570
571
572
573 _data = null;
574 }
575 }
576 }
577 }
578
579 public void setConnection(ClientToProxyConnection connection)
580 {
581 _toClient = connection;
582 }
583
584 public long getTimeStamp()
585 {
586 return _timestamp;
587 }
588
589 public void setTimeStamp(long timestamp)
590 {
591 _timestamp = timestamp;
592 }
593
594 public void setEndPoint(AsyncEndPoint endpoint)
595 {
596 _endPoint = endpoint;
597 }
598
599 public boolean isIdle()
600 {
601 return false;
602 }
603
604 public boolean isSuspended()
605 {
606 return false;
607 }
608
609 public void onClose()
610 {
611 }
612
613 public void ready()
614 {
615 _ready.countDown();
616 }
617
618 public void waitReady(long timeout) throws IOException
619 {
620 try
621 {
622 _ready.await(timeout, TimeUnit.MILLISECONDS);
623 }
624 catch (final InterruptedException x)
625 {
626 throw new IOException()
627 {{
628 initCause(x);
629 }};
630 }
631 }
632
633 public void closeClient() throws IOException
634 {
635 _toClient.closeClient();
636 }
637
638 public void closeServer() throws IOException
639 {
640 _endPoint.close();
641 }
642
643 public void close()
644 {
645 try
646 {
647 closeClient();
648 }
649 catch (IOException x)
650 {
651 _logger.debug(this + ": unexpected exception closing the client", x);
652 }
653
654 try
655 {
656 closeServer();
657 }
658 catch (IOException x)
659 {
660 _logger.debug(this + ": unexpected exception closing the server", x);
661 }
662 }
663
664 public void shutdownOutput() throws IOException
665 {
666 writeData();
667 _endPoint.shutdownOutput();
668 }
669
670 public void onIdleExpired()
671 {
672 try
673 {
674 shutdownOutput();
675 }
676 catch(Exception e)
677 {
678 LOG.debug(e);
679 close();
680 }
681 }
682 }
683
684 public class ClientToProxyConnection implements AsyncConnection
685 {
686 private final Buffer _buffer = new IndirectNIOBuffer(1024);
687 private final ConcurrentMap<String, Object> _context;
688 private final SocketChannel _channel;
689 private final EndPoint _endPoint;
690 private final long _timestamp;
691 private volatile ProxyToServerConnection _toServer;
692 private boolean _firstTime = true;
693
694 public ClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timestamp)
695 {
696 _context = context;
697 _channel = channel;
698 _endPoint = endPoint;
699 _timestamp = timestamp;
700 }
701
702 @Override
703 public String toString()
704 {
705 StringBuilder builder = new StringBuilder("ClientToProxy");
706 builder.append("(:").append(_endPoint.getLocalPort());
707 builder.append("<=>:").append(_endPoint.getRemotePort());
708 return builder.append(")").toString();
709 }
710
711 public Connection handle() throws IOException
712 {
713 _logger.debug("{}: begin reading from client", this);
714 try
715 {
716 if (_firstTime)
717 {
718 _firstTime = false;
719 register(_channel, _toServer);
720 _logger.debug("{}: registered channel {} with connection {}", this, _channel, _toServer);
721 }
722
723 while (true)
724 {
725 int read = read(_endPoint, _buffer, _context);
726
727 if (read == -1)
728 {
729 _logger.debug("{}: client closed connection {}", this, _endPoint);
730
731 if (_endPoint.isOutputShutdown() || !_endPoint.isOpen())
732 closeServer();
733 else
734 _toServer.shutdownOutput();
735
736 break;
737 }
738
739 if (read == 0)
740 break;
741
742 _logger.debug("{}: read from client {} bytes {}", this, read, _endPoint);
743 int written = write(_toServer._endPoint, _buffer, _context);
744 _logger.debug("{}: written to {} {} bytes", this, _toServer, written);
745 }
746 return this;
747 }
748 catch (ClosedChannelException x)
749 {
750 _logger.debug(x);
751 closeServer();
752 throw x;
753 }
754 catch (IOException x)
755 {
756 _logger.warn(this + ": unexpected exception", x);
757 close();
758 throw x;
759 }
760 catch (RuntimeException x)
761 {
762 _logger.warn(this + ": unexpected exception", x);
763 close();
764 throw x;
765 }
766 finally
767 {
768 _logger.debug("{}: end reading from client", this);
769 }
770 }
771
772 public void onInputShutdown() throws IOException
773 {
774
775 }
776
777 public long getTimeStamp()
778 {
779 return _timestamp;
780 }
781
782 public boolean isIdle()
783 {
784 return false;
785 }
786
787 public boolean isSuspended()
788 {
789 return false;
790 }
791
792 public void onClose()
793 {
794 }
795
796 public void setConnection(ProxyToServerConnection connection)
797 {
798 _toServer = connection;
799 }
800
801 public void closeClient() throws IOException
802 {
803 _endPoint.close();
804 }
805
806 public void closeServer() throws IOException
807 {
808 _toServer.closeServer();
809 }
810
811 public void close()
812 {
813 try
814 {
815 closeClient();
816 }
817 catch (IOException x)
818 {
819 _logger.debug(this + ": unexpected exception closing the client", x);
820 }
821
822 try
823 {
824 closeServer();
825 }
826 catch (IOException x)
827 {
828 _logger.debug(this + ": unexpected exception closing the server", x);
829 }
830 }
831
832 public void shutdownOutput() throws IOException
833 {
834 _endPoint.shutdownOutput();
835 }
836
837 public void onIdleExpired()
838 {
839 try
840 {
841 shutdownOutput();
842 }
843 catch(Exception e)
844 {
845 LOG.debug(e);
846 close();
847 }
848 }
849 }
850
851
852
853
854
855
856 public void addWhite(String entry)
857 {
858 add(entry, _white);
859 }
860
861
862
863
864
865
866 public void addBlack(String entry)
867 {
868 add(entry, _black);
869 }
870
871
872
873
874
875
876 public void setWhite(String[] entries)
877 {
878 set(entries, _white);
879 }
880
881
882
883
884
885
886 public void setBlack(String[] entries)
887 {
888 set(entries, _black);
889 }
890
891
892
893
894
895
896
897
898 protected void set(String[] entries, HostMap<String> hostMap)
899 {
900 hostMap.clear();
901
902 if (entries != null && entries.length > 0)
903 {
904 for (String addrPath : entries)
905 {
906 add(addrPath, hostMap);
907 }
908 }
909 }
910
911
912
913
914
915
916
917
918 private void add(String entry, HostMap<String> hostMap)
919 {
920 if (entry != null && entry.length() > 0)
921 {
922 entry = entry.trim();
923 if (hostMap.get(entry) == null)
924 {
925 hostMap.put(entry, entry);
926 }
927 }
928 }
929
930
931
932
933
934
935
936 public boolean validateDestination(String host)
937 {
938 if (_white.size() > 0)
939 {
940 Object whiteObj = _white.getLazyMatches(host);
941 if (whiteObj == null)
942 {
943 return false;
944 }
945 }
946
947 if (_black.size() > 0)
948 {
949 Object blackObj = _black.getLazyMatches(host);
950 if (blackObj != null)
951 {
952 return false;
953 }
954 }
955
956 return true;
957 }
958
959 @Override
960 public void dump(Appendable out, String indent) throws IOException
961 {
962 dumpThis(out);
963 if (_privateThreadPool)
964 dump(out, indent, Arrays.asList(_threadPool, _selectorManager), TypeUtil.asList(getHandlers()), getBeans());
965 else
966 dump(out, indent, Arrays.asList(_selectorManager), TypeUtil.asList(getHandlers()), getBeans());
967 }
968 }