1 package org.eclipse.jetty.server.handler;
2
3 import java.io.IOException;
4 import java.net.InetSocketAddress;
5 import java.nio.channels.ClosedChannelException;
6 import java.nio.channels.SelectionKey;
7 import java.nio.channels.SocketChannel;
8 import java.util.Arrays;
9 import java.util.Collections;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
12 import java.util.concurrent.CountDownLatch;
13 import java.util.concurrent.TimeUnit;
14 import javax.servlet.ServletException;
15 import javax.servlet.http.HttpServletRequest;
16 import javax.servlet.http.HttpServletResponse;
17
18 import org.eclipse.jetty.http.HttpMethods;
19 import org.eclipse.jetty.http.HttpParser;
20 import org.eclipse.jetty.io.Buffer;
21 import org.eclipse.jetty.io.ConnectedEndPoint;
22 import org.eclipse.jetty.io.Connection;
23 import org.eclipse.jetty.io.EndPoint;
24 import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
25 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
26 import org.eclipse.jetty.io.nio.SelectorManager;
27 import org.eclipse.jetty.server.Handler;
28 import org.eclipse.jetty.server.HttpConnection;
29 import org.eclipse.jetty.server.Request;
30 import org.eclipse.jetty.server.Server;
31 import org.eclipse.jetty.util.HostMap;
32 import org.eclipse.jetty.util.TypeUtil;
33 import org.eclipse.jetty.util.component.LifeCycle;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.log.Logger;
36 import org.eclipse.jetty.util.thread.ThreadPool;
37
38
39
40
41
42
43 public class ConnectHandler extends HandlerWrapper
44 {
45 private final Logger _logger = Log.getLogger(getClass().getName());
46 private final SelectorManager _selectorManager = new Manager();
47 private volatile int _connectTimeout = 5000;
48 private volatile int _writeTimeout = 30000;
49 private volatile ThreadPool _threadPool;
50 private volatile boolean _privateThreadPool;
51 private HostMap<String> _white = new HostMap<String>();
52 private HostMap<String> _black = new HostMap<String>();
53
54 public ConnectHandler()
55 {
56 this(null);
57 }
58
59 public ConnectHandler(String[] white, String[] black)
60 {
61 this(null, white, black);
62 }
63
64 public ConnectHandler(Handler handler)
65 {
66 setHandler(handler);
67 }
68
69 public ConnectHandler(Handler handler, String[] white, String[] black)
70 {
71 setHandler(handler);
72 set(white, _white);
73 set(black, _black);
74 }
75
76
77
78
79 public int getConnectTimeout()
80 {
81 return _connectTimeout;
82 }
83
84
85
86
87 public void setConnectTimeout(int connectTimeout)
88 {
89 _connectTimeout = connectTimeout;
90 }
91
92
93
94
95 public int getWriteTimeout()
96 {
97 return _writeTimeout;
98 }
99
100
101
102
103 public void setWriteTimeout(int writeTimeout)
104 {
105 _writeTimeout = writeTimeout;
106 }
107
108 @Override
109 public void setServer(Server server)
110 {
111 super.setServer(server);
112
113 server.getContainer().update(this,null,_selectorManager,"selectManager");
114
115 if (_privateThreadPool)
116 server.getContainer().update(this,null,_privateThreadPool,"threadpool",true);
117 else
118 _threadPool=server.getThreadPool();
119 }
120
121
122
123
124 public ThreadPool getThreadPool()
125 {
126 return _threadPool;
127 }
128
129
130
131
132 public void setThreadPool(ThreadPool threadPool)
133 {
134 if (getServer()!=null)
135 getServer().getContainer().update(this,_privateThreadPool?_threadPool:null,threadPool,"threadpool",true);
136 _privateThreadPool=threadPool!=null;
137 _threadPool=threadPool;
138 }
139
140 @Override
141 protected void doStart() throws Exception
142 {
143 super.doStart();
144
145 if (_threadPool==null)
146 {
147 _threadPool=getServer().getThreadPool();
148 _privateThreadPool=false;
149 }
150 if (_threadPool instanceof LifeCycle && !((LifeCycle)_threadPool).isRunning())
151 ((LifeCycle)_threadPool).start();
152
153 _selectorManager.start();
154 _threadPool.dispatch(new Runnable()
155 {
156 public void run()
157 {
158 while (isRunning())
159 {
160 try
161 {
162 _selectorManager.doSelect(0);
163 }
164 catch (IOException x)
165 {
166 _logger.warn("Unexpected exception", x);
167 }
168 }
169 }
170 });
171 }
172
173 @Override
174 protected void doStop() throws Exception
175 {
176 _selectorManager.stop();
177
178 ThreadPool threadPool = _threadPool;
179 if (_privateThreadPool && _threadPool != null && threadPool instanceof LifeCycle)
180 ((LifeCycle)threadPool).stop();
181
182 super.doStop();
183 }
184
185 @Override
186 public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
187 {
188 if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod()))
189 {
190 _logger.debug("CONNECT request for {}", request.getRequestURI());
191 handleConnect(baseRequest, request, response, request.getRequestURI());
192 }
193 else
194 {
195 super.handle(target, baseRequest, request, response);
196 }
197 }
198
199
200
201
202
203
204
205
206
207
208
209
210
211 protected void handleConnect(Request baseRequest, HttpServletRequest request, HttpServletResponse response, String serverAddress) throws ServletException, IOException
212 {
213 boolean proceed = handleAuthentication(request, response, serverAddress);
214 if (!proceed)
215 return;
216
217 String host = serverAddress;
218 int port = 80;
219 int colon = serverAddress.indexOf(':');
220 if (colon > 0)
221 {
222 host = serverAddress.substring(0, colon);
223 port = Integer.parseInt(serverAddress.substring(colon + 1));
224 }
225
226 if (!validateDestination(host))
227 {
228 Log.info("ProxyHandler: Forbidden destination "+host);
229 response.setStatus(HttpServletResponse.SC_FORBIDDEN);
230 baseRequest.setHandled(true);
231 return;
232 }
233
234 SocketChannel channel = connectToServer(request, host, port);
235
236
237
238
239
240
241 HttpConnection httpConnection = HttpConnection.getCurrentConnection();
242 Buffer headerBuffer = ((HttpParser)httpConnection.getParser()).getHeaderBuffer();
243 Buffer bodyBuffer = ((HttpParser)httpConnection.getParser()).getBodyBuffer();
244 int length = headerBuffer == null ? 0 : headerBuffer.length();
245 length += bodyBuffer == null ? 0 : bodyBuffer.length();
246 IndirectNIOBuffer buffer = null;
247 if (length > 0)
248 {
249 buffer = new IndirectNIOBuffer(length);
250 if (headerBuffer != null)
251 {
252 buffer.put(headerBuffer);
253 headerBuffer.clear();
254 }
255 if (bodyBuffer != null)
256 {
257 buffer.put(bodyBuffer);
258 bodyBuffer.clear();
259 }
260 }
261
262 ConcurrentMap<String, Object> context = new ConcurrentHashMap<String, Object>();
263 prepareContext(request, context);
264
265 ClientToProxyConnection clientToProxy = prepareConnections(context, channel, buffer);
266
267
268 response.setStatus(HttpServletResponse.SC_OK);
269
270
271 baseRequest.getConnection().getGenerator().setPersistent(true);
272
273
274 response.getOutputStream().close();
275
276 upgradeConnection(request, response, clientToProxy);
277 }
278
279 private ClientToProxyConnection prepareConnections(ConcurrentMap<String, Object> context, SocketChannel channel, Buffer buffer)
280 {
281 HttpConnection httpConnection = HttpConnection.getCurrentConnection();
282 ProxyToServerConnection proxyToServer = newProxyToServerConnection(context, buffer);
283 ClientToProxyConnection clientToProxy = newClientToProxyConnection(context, channel, httpConnection.getEndPoint(), httpConnection.getTimeStamp());
284 clientToProxy.setConnection(proxyToServer);
285 proxyToServer.setConnection(clientToProxy);
286 return clientToProxy;
287 }
288
289
290
291
292
293
294
295
296
297
298
299
300 protected boolean handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address) throws ServletException, IOException
301 {
302 return true;
303 }
304
305 protected ClientToProxyConnection newClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timeStamp)
306 {
307 return new ClientToProxyConnection(context, channel, endPoint, timeStamp);
308 }
309
310 protected ProxyToServerConnection newProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer buffer)
311 {
312 return new ProxyToServerConnection(context, buffer);
313 }
314
315 private SocketChannel connectToServer(HttpServletRequest request, String host, int port) throws IOException
316 {
317 SocketChannel channel = connect(request, host, port);
318 channel.configureBlocking(false);
319 return channel;
320 }
321
322
323
324
325
326
327
328
329
330 protected SocketChannel connect(HttpServletRequest request, String host, int port) throws IOException
331 {
332 _logger.debug("Establishing connection to {}:{}", host, port);
333
334 SocketChannel channel = SocketChannel.open();
335 channel.socket().setTcpNoDelay(true);
336 channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout());
337 _logger.debug("Established connection to {}:{}", host, port);
338 return channel;
339 }
340
341 protected void prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context)
342 {
343 }
344
345 private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection) throws IOException
346 {
347
348
349 request.setAttribute("org.eclipse.jetty.io.Connection", connection);
350 response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
351 _logger.debug("Upgraded connection to {}", connection);
352 }
353
354 private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException
355 {
356 _selectorManager.register(channel, proxyToServer);
357 proxyToServer.waitReady(_connectTimeout);
358 }
359
360
361
362
363
364
365
366
367
368
369 protected int read(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
370 {
371 return endPoint.fill(buffer);
372 }
373
374
375
376
377
378
379
380
381
382
383 protected int write(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
384 {
385 if (buffer == null)
386 return 0;
387
388 int length = buffer.length();
389 StringBuilder builder = new StringBuilder();
390 int written = endPoint.flush(buffer);
391 builder.append(written);
392 buffer.compact();
393 if (!endPoint.isBlocking())
394 {
395 while (buffer.space() == 0)
396 {
397 boolean ready = endPoint.blockWritable(getWriteTimeout());
398 if (!ready)
399 throw new IOException("Write timeout");
400
401 written = endPoint.flush(buffer);
402 builder.append("+").append(written);
403 buffer.compact();
404 }
405 }
406 _logger.debug("Written {}/{} bytes {}", builder, length, endPoint);
407 return length;
408 }
409
410 private class Manager extends SelectorManager
411 {
412 @Override
413 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey selectionKey) throws IOException
414 {
415 SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey);
416
417 return endp;
418 }
419
420 @Override
421 protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
422 {
423 ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment();
424 proxyToServer.setTimeStamp(System.currentTimeMillis());
425 proxyToServer.setEndPoint(endpoint);
426 return proxyToServer;
427 }
428
429 @Override
430 protected void endPointOpened(SelectChannelEndPoint endpoint)
431 {
432 ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment();
433 proxyToServer.ready();
434 }
435
436 @Override
437 public boolean dispatch(Runnable task)
438 {
439 return _threadPool.dispatch(task);
440 }
441
442 @Override
443 protected void endPointClosed(SelectChannelEndPoint endpoint)
444 {
445 }
446
447 @Override
448 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
449 {
450 }
451 }
452
453 public class ProxyToServerConnection implements Connection
454 {
455 private final CountDownLatch _ready = new CountDownLatch(1);
456 private final Buffer _buffer = new IndirectNIOBuffer(1024);
457 private final ConcurrentMap<String, Object> _context;
458 private volatile Buffer _data;
459 private volatile ClientToProxyConnection _toClient;
460 private volatile long _timestamp;
461 private volatile SelectChannelEndPoint _endPoint;
462
463 public ProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer data)
464 {
465 _context = context;
466 _data = data;
467 }
468
469 @Override
470 public String toString()
471 {
472 StringBuilder builder = new StringBuilder("ProxyToServer");
473 builder.append("(:").append(_endPoint.getLocalPort());
474 builder.append("<=>:").append(_endPoint.getRemotePort());
475 return builder.append(")").toString();
476 }
477
478 public Connection handle() throws IOException
479 {
480 _logger.debug("{}: begin reading from server", this);
481 try
482 {
483 if (_data != null)
484 {
485 int written = write(_endPoint, _data, _context);
486 _logger.debug("{}: written to server {} bytes", this, written);
487 _data = null;
488 }
489
490 while (true)
491 {
492 int read = read(_endPoint, _buffer, _context);
493
494 if (read == -1)
495 {
496 _logger.debug("{}: server closed connection {}", this, _endPoint);
497 close();
498 break;
499 }
500
501 if (read == 0)
502 break;
503
504 _logger.debug("{}: read from server {} bytes {}", this, read, _endPoint);
505 int written = write(_toClient._endPoint, _buffer, _context);
506 _logger.debug("{}: written to {} {} bytes", this, _toClient, written);
507 }
508 return this;
509 }
510 catch (ClosedChannelException x)
511 {
512 _logger.debug(x);
513 throw x;
514 }
515 catch (IOException x)
516 {
517 _logger.warn(this + ": unexpected exception", x);
518 close();
519 throw x;
520 }
521 catch (RuntimeException x)
522 {
523 _logger.warn(this + ": unexpected exception", x);
524 close();
525 throw x;
526 }
527 finally
528 {
529 _logger.debug("{}: end reading from server", this);
530 }
531 }
532
533 public void setConnection(ClientToProxyConnection connection)
534 {
535 _toClient = connection;
536 }
537
538 public long getTimeStamp()
539 {
540 return _timestamp;
541 }
542
543 public void setTimeStamp(long timestamp)
544 {
545 _timestamp = timestamp;
546 }
547
548 public void setEndPoint(SelectChannelEndPoint endpoint)
549 {
550 _endPoint = endpoint;
551 }
552
553 public boolean isIdle()
554 {
555 return false;
556 }
557
558 public boolean isSuspended()
559 {
560 return false;
561 }
562
563 public void closed()
564 {
565 }
566
567 public void ready()
568 {
569 _ready.countDown();
570 }
571
572 public void waitReady(long timeout) throws IOException
573 {
574 try
575 {
576 _ready.await(timeout, TimeUnit.MILLISECONDS);
577 }
578 catch (final InterruptedException x)
579 {
580 throw new IOException(){{initCause(x);}};
581 }
582 }
583
584 public void closeClient() throws IOException
585 {
586 _toClient.closeClient();
587 }
588
589 public void closeServer() throws IOException
590 {
591 _endPoint.close();
592 }
593
594 public void close()
595 {
596 try
597 {
598 closeClient();
599 }
600 catch (IOException x)
601 {
602 _logger.debug(this + ": unexpected exception closing the client", x);
603 }
604
605 try
606 {
607 closeServer();
608 }
609 catch (IOException x)
610 {
611 _logger.debug(this + ": unexpected exception closing the server", x);
612 }
613 }
614 }
615
616 public class ClientToProxyConnection implements Connection
617 {
618 private final Buffer _buffer = new IndirectNIOBuffer(1024);
619 private final ConcurrentMap<String, Object> _context;
620 private final SocketChannel _channel;
621 private final EndPoint _endPoint;
622 private final long _timestamp;
623 private volatile ProxyToServerConnection _toServer;
624 private boolean _firstTime = true;
625
626 public ClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timestamp)
627 {
628 _context = context;
629 _channel = channel;
630 _endPoint = endPoint;
631 _timestamp = timestamp;
632 }
633
634 @Override
635 public String toString()
636 {
637 StringBuilder builder = new StringBuilder("ClientToProxy");
638 builder.append("(:").append(_endPoint.getLocalPort());
639 builder.append("<=>:").append(_endPoint.getRemotePort());
640 return builder.append(")").toString();
641 }
642
643 public Connection handle() throws IOException
644 {
645 _logger.debug("{}: begin reading from client", this);
646 try
647 {
648 if (_firstTime)
649 {
650 _firstTime = false;
651 register(_channel, _toServer);
652 _logger.debug("{}: registered channel {} with connection {}", this, _channel, _toServer);
653 }
654
655 while (true)
656 {
657 int read = read(_endPoint, _buffer, _context);
658
659 if (read == -1)
660 {
661 _logger.debug("{}: client closed connection {}", this, _endPoint);
662 close();
663 break;
664 }
665
666 if (read == 0)
667 break;
668
669 _logger.debug("{}: read from client {} bytes {}", this, read, _endPoint);
670 int written = write(_toServer._endPoint, _buffer, _context);
671 _logger.debug("{}: written to {} {} bytes", this, _toServer, written);
672 }
673 return this;
674 }
675 catch (ClosedChannelException x)
676 {
677 _logger.debug(x);
678 closeServer();
679 throw x;
680 }
681 catch (IOException x)
682 {
683 _logger.warn(this + ": unexpected exception", x);
684 close();
685 throw x;
686 }
687 catch (RuntimeException x)
688 {
689 _logger.warn(this + ": unexpected exception", x);
690 close();
691 throw x;
692 }
693 finally
694 {
695 _logger.debug("{}: end reading from client", this);
696 }
697 }
698
699 public long getTimeStamp()
700 {
701 return _timestamp;
702 }
703
704 public boolean isIdle()
705 {
706 return false;
707 }
708
709 public boolean isSuspended()
710 {
711 return false;
712 }
713
714 public void closed()
715 {
716 }
717
718 public void setConnection(ProxyToServerConnection connection)
719 {
720 _toServer = connection;
721 }
722
723 public void closeClient() throws IOException
724 {
725 _endPoint.close();
726 }
727
728 public void closeServer() throws IOException
729 {
730 _toServer.closeServer();
731 }
732
733 public void close()
734 {
735 try
736 {
737 closeClient();
738 }
739 catch (IOException x)
740 {
741 _logger.debug(this + ": unexpected exception closing the client", x);
742 }
743
744 try
745 {
746 closeServer();
747 }
748 catch (IOException x)
749 {
750 _logger.debug(this + ": unexpected exception closing the server", x);
751 }
752 }
753 }
754
755
756
757
758
759
760
761 public void addWhite(String entry)
762 {
763 add(entry, _white);
764 }
765
766
767
768
769
770
771
772 public void addBlack(String entry)
773 {
774 add(entry, _black);
775 }
776
777
778
779
780
781
782
783 public void setWhite(String[] entries)
784 {
785 set(entries, _white);
786 }
787
788
789
790
791
792
793
794 public void setBlack(String[] entries)
795 {
796 set(entries, _black);
797 }
798
799
800
801
802
803
804
805
806
807 protected void set(String[] entries, HostMap<String> hostMap)
808 {
809 hostMap.clear();
810
811 if (entries != null && entries.length > 0)
812 {
813 for (String addrPath:entries)
814 {
815 add(addrPath, hostMap);
816 }
817 }
818 }
819
820
821
822
823
824
825
826
827
828 private void add(String entry, HostMap<String> hostMap)
829 {
830 if (entry != null && entry.length() > 0)
831 {
832 entry = entry.trim();
833 if (hostMap.get(entry) == null)
834 {
835 hostMap.put(entry,entry);
836 }
837 }
838 }
839
840
841
842
843
844
845
846
847 public boolean validateDestination(String host)
848 {
849 if (_white.size()>0)
850 {
851 Object whiteObj = _white.getLazyMatches(host);
852 if (whiteObj == null)
853 {
854 return false;
855 }
856 }
857
858 if (_black.size() > 0)
859 {
860 Object blackObj = _black.getLazyMatches(host);
861 if (blackObj != null)
862 {
863 return false;
864 }
865 }
866
867 return true;
868 }
869
870 @Override
871 public void dump(Appendable out, String indent) throws IOException
872 {
873 dumpThis(out);
874 if (_privateThreadPool)
875 dump(out,indent,Arrays.asList(new Object[]{_threadPool,_selectorManager}),TypeUtil.asList(getHandlers()),getBeans());
876 else
877 dump(out,indent,Collections.singletonList(_selectorManager),TypeUtil.asList(getHandlers()),getBeans());
878 }
879
880
881 }