View Javadoc

1   package org.eclipse.jetty.server.handler;
2   
3   import java.io.IOException;
4   import java.net.InetSocketAddress;
5   import java.nio.channels.ClosedChannelException;
6   import java.nio.channels.SelectionKey;
7   import java.nio.channels.SocketChannel;
8   import java.util.concurrent.ConcurrentHashMap;
9   import java.util.concurrent.ConcurrentMap;
10  import java.util.concurrent.CountDownLatch;
11  import java.util.concurrent.TimeUnit;
12  import javax.servlet.ServletException;
13  import javax.servlet.http.HttpServletRequest;
14  import javax.servlet.http.HttpServletResponse;
15  
16  import org.eclipse.jetty.http.HttpMethods;
17  import org.eclipse.jetty.http.HttpParser;
18  import org.eclipse.jetty.io.Buffer;
19  import org.eclipse.jetty.io.ConnectedEndPoint;
20  import org.eclipse.jetty.io.Connection;
21  import org.eclipse.jetty.io.EndPoint;
22  import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
23  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
24  import org.eclipse.jetty.io.nio.SelectorManager;
25  import org.eclipse.jetty.server.Handler;
26  import org.eclipse.jetty.server.HttpConnection;
27  import org.eclipse.jetty.server.Request;
28  import org.eclipse.jetty.server.Server;
29  import org.eclipse.jetty.util.component.LifeCycle;
30  import org.eclipse.jetty.util.log.Log;
31  import org.eclipse.jetty.util.log.Logger;
32  import org.eclipse.jetty.util.thread.ThreadPool;
33  
34  /**
35   * <p>Implementation of a tunneling proxy that supports HTTP CONNECT and transparent proxy.</p>
36   * <p>To work as CONNECT proxy, objects of this class must be instantiated using the no-arguments
37   * constructor, since the remote server information will be present in the CONNECT URI.</p>
38   * <p>To work as transparent proxy, objects of this class must be instantiated using the string
39   * argument constructor, passing the remote host address and port in the form {@code host:port}.</p>
40   *
41   * @version $Revision$ $Date$
42   */
43  public class ProxyHandler extends HandlerWrapper
44  {
45      private final Logger _logger = Log.getLogger(getClass().getName());
46      private final SelectorManager _selectorManager = new Manager();
47      private volatile int _connectTimeout = 5000;
48      private volatile int _writeTimeout = 30000;
49      private volatile ThreadPool _threadPool;
50      private volatile boolean _privateThreadPool;
51  
52      public ProxyHandler()
53      {
54          this(null);
55      }
56  
57      public ProxyHandler(Handler handler)
58      {
59          setHandler(handler);
60      }
61  
62      /**
63       * @return the timeout, in milliseconds, to connect to the remote server
64       */
65      public int getConnectTimeout()
66      {
67          return _connectTimeout;
68      }
69  
70      /**
71       * @param connectTimeout the timeout, in milliseconds, to connect to the remote server
72       */
73      public void setConnectTimeout(int connectTimeout)
74      {
75          _connectTimeout = connectTimeout;
76      }
77  
78      /**
79       * @return the timeout, in milliseconds, to write data to a peer
80       */
81      public int getWriteTimeout()
82      {
83          return _writeTimeout;
84      }
85  
86      /**
87       * @param writeTimeout the timeout, in milliseconds, to write data to a peer
88       */
89      public void setWriteTimeout(int writeTimeout)
90      {
91          _writeTimeout = writeTimeout;
92      }
93  
94      @Override
95      public void setServer(Server server)
96      {
97          super.setServer(server);
98  
99          server.getContainer().update(this,null,_selectorManager,"selectManager");
100 
101         if (_privateThreadPool)
102             server.getContainer().update(this,null,_privateThreadPool,"threadpool",true);
103         else
104             _threadPool=server.getThreadPool();
105     }
106 
107     /**
108      * @return the thread pool
109      */
110     public ThreadPool getThreadPool()
111     {
112         return _threadPool;
113     }
114 
115     /**
116      * @param threadPool the thread pool
117      */
118     public void setThreadPool(ThreadPool threadPool)
119     {
120         if (getServer()!=null)
121             getServer().getContainer().update(this,_privateThreadPool?_threadPool:null,threadPool,"threadpool",true);
122         _privateThreadPool=threadPool!=null;
123         _threadPool=threadPool;
124     }
125 
126     @Override
127     protected void doStart() throws Exception
128     {
129         super.doStart();
130 
131         if (_threadPool==null)
132         {
133             _threadPool=getServer().getThreadPool();
134             _privateThreadPool=false;
135         }
136         if (_threadPool instanceof LifeCycle && !((LifeCycle)_threadPool).isRunning())
137             ((LifeCycle)_threadPool).start();
138 
139         _selectorManager.start();
140         _threadPool.dispatch(new Runnable()
141         {
142             public void run()
143             {
144                 while (isRunning())
145                 {
146                     try
147                     {
148                         _selectorManager.doSelect(0);
149                     }
150                     catch (IOException x)
151                     {
152                         _logger.warn("Unexpected exception", x);
153                     }
154                 }
155             }
156         });
157     }
158 
159     @Override
160     protected void doStop() throws Exception
161     {
162         _selectorManager.stop();
163 
164         ThreadPool threadPool = _threadPool;
165         if (_privateThreadPool && _threadPool != null && threadPool instanceof LifeCycle)
166             ((LifeCycle)threadPool).stop();
167 
168         super.doStop();
169     }
170 
171     @Override
172     public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
173     {
174         if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod()))
175         {
176             _logger.debug("CONNECT request for {}", request.getRequestURI());
177             handleConnect(baseRequest, request, response, request.getRequestURI());
178         }
179         else
180         {
181             super.handle(target, baseRequest, request, response);
182         }
183     }
184 
185     /**
186      * <p>Handles a CONNECT request.</p>
187      * <p>CONNECT requests may have authentication headers such as <code>Proxy-Authorization</code>
188      * that authenticate the client with the proxy.</p>
189      *
190      * @param baseRequest Jetty-specific http request
191      * @param request the http request
192      * @param response the http response
193      * @param serverAddress the remote server address in the form {@code host:port}
194      * @throws ServletException if an application error occurs
195      * @throws IOException if an I/O error occurs
196      */
197     protected void handleConnect(Request baseRequest, HttpServletRequest request, HttpServletResponse response, String serverAddress) throws ServletException, IOException
198     {
199         boolean proceed = handleAuthentication(request, response, serverAddress);
200         if (!proceed)
201             return;
202 
203         String host = serverAddress;
204         int port = 80;
205         int colon = serverAddress.indexOf(':');
206         if (colon > 0)
207         {
208             host = serverAddress.substring(0, colon);
209             port = Integer.parseInt(serverAddress.substring(colon + 1));
210         }
211 
212         SocketChannel channel = connectToServer(request, host, port);
213 
214         // Transfer unread data from old connection to new connection
215         // We need to copy the data to avoid races:
216         // 1. when this unread data is written and the server replies before the clientToProxy
217         // connection is installed (it is only installed after returning from this method)
218         // 2. when the client sends data before this unread data has been written.
219         HttpConnection httpConnection = HttpConnection.getCurrentConnection();
220         Buffer headerBuffer = ((HttpParser)httpConnection.getParser()).getHeaderBuffer();
221         Buffer bodyBuffer = ((HttpParser)httpConnection.getParser()).getBodyBuffer();
222         int length = headerBuffer == null ? 0 : headerBuffer.length();
223         length += bodyBuffer == null ? 0 : bodyBuffer.length();
224         IndirectNIOBuffer buffer = null;
225         if (length > 0)
226         {
227             buffer = new IndirectNIOBuffer(length);
228             if (headerBuffer != null)
229             {
230                 buffer.put(headerBuffer);
231                 headerBuffer.clear();
232             }
233             if (bodyBuffer != null)
234             {
235                 buffer.put(bodyBuffer);
236                 bodyBuffer.clear();
237             }
238         }
239 
240         ConcurrentMap<String, Object> context = new ConcurrentHashMap<String, Object>();
241         prepareContext(request, context);
242 
243         ClientToProxyConnection clientToProxy = prepareConnections(context, channel, buffer);
244 
245         // CONNECT expects a 200 response
246         response.setStatus(HttpServletResponse.SC_OK);
247 
248         // Prevent close
249         baseRequest.getConnection().getGenerator().setPersistent(true);
250 
251         // Close to force last flush it so that the client receives it
252         response.getOutputStream().close();
253 
254         upgradeConnection(request, response, clientToProxy);
255     }
256 
257     private ClientToProxyConnection prepareConnections(ConcurrentMap<String, Object> context, SocketChannel channel, Buffer buffer)
258     {
259         HttpConnection httpConnection = HttpConnection.getCurrentConnection();
260         ProxyToServerConnection proxyToServer = newProxyToServerConnection(context, buffer);
261         ClientToProxyConnection clientToProxy = newClientToProxyConnection(context, channel, httpConnection.getEndPoint(), httpConnection.getTimeStamp());
262         clientToProxy.setConnection(proxyToServer);
263         proxyToServer.setConnection(clientToProxy);
264         return clientToProxy;
265     }
266 
267     /**
268      * <p>Handles the authentication before setting up the tunnel to the remote server.</p>
269      * <p>The default implementation returns true.</p>
270      *
271      * @param request the HTTP request
272      * @param response the HTTP response
273      * @param address the address of the remote server in the form {@code host:port}.
274      * @return true to allow to connect to the remote host, false otherwise
275      * @throws ServletException to report a server error to the caller
276      * @throws IOException to report a server error to the caller
277      */
278     protected boolean handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address) throws ServletException, IOException
279     {
280         return true;
281     }
282 
283     protected ClientToProxyConnection newClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timeStamp)
284     {
285         return new ClientToProxyConnection(context, channel, endPoint, timeStamp);
286     }
287 
288     protected ProxyToServerConnection newProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer buffer)
289     {
290         return new ProxyToServerConnection(context, buffer);
291     }
292 
293     private SocketChannel connectToServer(HttpServletRequest request, String host, int port) throws IOException
294     {
295         SocketChannel channel = connect(request, host, port);
296         channel.configureBlocking(false);
297         return channel;
298     }
299 
300     /**
301      * <p>Establishes a connection to the remote server.</p>
302      * @param request the HTTP request that initiated the tunnel
303      * @param host the host to connect to
304      * @param port the port to connect to
305      * @return a {@link SocketChannel} connected to the remote server
306      * @throws IOException if the connection cannot be established
307      */
308     protected SocketChannel connect(HttpServletRequest request, String host, int port) throws IOException
309     {
310         _logger.debug("Establishing connection to {}:{}", host, port);
311         // Connect to remote server
312         SocketChannel channel = SocketChannel.open();
313         channel.socket().setTcpNoDelay(true);
314         channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout());
315         _logger.debug("Established connection to {}:{}", host, port);
316         return channel;
317     }
318 
319     protected void prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context)
320     {
321     }
322 
323     private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection) throws IOException
324     {
325         // Set the new connection as request attribute and change the status to 101
326         // so that Jetty understands that it has to upgrade the connection
327         request.setAttribute("org.eclipse.jetty.io.Connection", connection);
328         response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
329         _logger.debug("Upgraded connection to {}", connection);
330     }
331 
332     private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException
333     {
334         _selectorManager.register(channel, proxyToServer);
335         proxyToServer.waitReady(_connectTimeout);
336     }
337 
338     /**
339      * <p>Reads (with non-blocking semantic) into the given {@code buffer} from the given {@code endPoint}.</p>
340      * @param endPoint the endPoint to read from
341      * @param buffer the buffer to read data into
342      * @param context the context information related to the connection
343      * @return the number of bytes read (possibly 0 since the read is non-blocking)
344      * or -1 if the channel has been closed remotely
345      * @throws IOException if the endPoint cannot be read
346      */
347     protected int read(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
348     {
349         return endPoint.fill(buffer);
350     }
351 
352     /**
353      * <p>Writes (with blocking semantic) the given buffer of data onto the given endPoint.</p>
354      *
355      * @param endPoint the endPoint to write to
356      * @param buffer the buffer to write
357      * @param context the context information related to the connection
358      * @throws IOException if the buffer cannot be written
359      * @return the number of bytes written
360      */
361     protected int write(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
362     {
363         if (buffer == null)
364             return 0;
365 
366         int length = buffer.length();
367         StringBuilder builder = new StringBuilder();
368         int written = endPoint.flush(buffer);
369         builder.append(written);
370         buffer.compact();
371         if (!endPoint.isBlocking())
372         {
373             while (buffer.space() == 0)
374             {
375                 boolean ready = endPoint.blockWritable(getWriteTimeout());
376                 if (!ready)
377                     throw new IOException("Write timeout");
378 
379                 written = endPoint.flush(buffer);
380                 builder.append("+").append(written);
381                 buffer.compact();
382             }
383         }
384         _logger.debug("Written {}/{} bytes {}", builder, length, endPoint);
385         return length;
386     }
387 
388     private class Manager extends SelectorManager
389     {
390         @Override
391         protected SocketChannel acceptChannel(SelectionKey key) throws IOException
392         {
393             // This is a client-side selector manager
394             throw new IllegalStateException();
395         }
396 
397         @Override
398         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey selectionKey) throws IOException
399         {
400             return new SelectChannelEndPoint(channel, selectSet, selectionKey);
401         }
402 
403         @Override
404         protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
405         {
406             ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment();
407             proxyToServer.setTimeStamp(System.currentTimeMillis());
408             proxyToServer.setEndPoint(endpoint);
409             return proxyToServer;
410         }
411 
412         @Override
413         protected void endPointOpened(SelectChannelEndPoint endpoint)
414         {
415             ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment();
416             proxyToServer.ready();
417         }
418 
419         @Override
420         public boolean dispatch(Runnable task)
421         {
422             return _threadPool.dispatch(task);
423         }
424 
425         @Override
426         protected void endPointClosed(SelectChannelEndPoint endpoint)
427         {
428         }
429 
430         @Override
431         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
432         {
433         }
434     }
435 
436     public class ProxyToServerConnection implements Connection
437     {
438         private final CountDownLatch _ready = new CountDownLatch(1);
439         private final Buffer _buffer = new IndirectNIOBuffer(1024);
440         private final ConcurrentMap<String, Object> _context;
441         private volatile Buffer _data;
442         private volatile ClientToProxyConnection _toClient;
443         private volatile long _timestamp;
444         private volatile SelectChannelEndPoint _endPoint;
445 
446         public ProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer data)
447         {
448             _context = context;
449             _data = data;
450         }
451 
452         public Connection handle() throws IOException
453         {
454             _logger.debug("ProxyToServer: begin reading from server");
455             try
456             {
457                 if (_data != null)
458                 {
459                     int written = write(_endPoint, _data, _context);
460                     _logger.debug("ProxyToServer: written to server {} bytes", written);
461                     _data = null;
462                 }
463 
464                 while (true)
465                 {
466                     int read = read(_endPoint, _buffer, _context);
467 
468                     if (read == -1)
469                     {
470                         _logger.debug("ProxyToServer: server closed connection {}", _endPoint);
471                         close();
472                         break;
473                     }
474 
475                     if (read == 0)
476                         break;
477 
478                     _logger.debug("ProxyToServer: read from server {} bytes {}", read, _endPoint);
479                     int written = write(_toClient._endPoint, _buffer, _context);
480                     _logger.debug("ProxyToServer: written to client {} bytes", written);
481                 }
482                 return this;
483             }
484             catch (IOException x)
485             {
486                 _logger.warn("ProxyToServer: Unexpected exception", x);
487                 close();
488                 throw x;
489             }
490             catch (RuntimeException x)
491             {
492                 _logger.warn("ProxyToServer: Unexpected exception", x);
493                 close();
494                 throw x;
495             }
496             finally
497             {
498                 _logger.debug("ProxyToServer: end reading from server");
499             }
500         }
501 
502         public void setConnection(ClientToProxyConnection connection)
503         {
504             _toClient = connection;
505         }
506 
507         public long getTimeStamp()
508         {
509             return _timestamp;
510         }
511 
512         public void setTimeStamp(long timestamp)
513         {
514             _timestamp = timestamp;
515         }
516 
517         public void setEndPoint(SelectChannelEndPoint endpoint)
518         {
519             _endPoint = endpoint;
520             _logger.debug("ProxyToServer: {}", _endPoint);
521         }
522 
523         public boolean isIdle()
524         {
525             return false;
526         }
527 
528         public boolean isSuspended()
529         {
530             return false;
531         }
532 
533         public void ready()
534         {
535             _ready.countDown();
536         }
537 
538         public void waitReady(long timeout) throws IOException
539         {
540             try
541             {
542                 _ready.await(timeout, TimeUnit.MILLISECONDS);
543             }
544             catch (final InterruptedException x)
545             {
546                 throw new IOException(){{initCause(x);}};
547             }
548         }
549 
550         public void closeClient() throws IOException
551         {
552             _toClient.closeClient();
553         }
554 
555         public void closeServer() throws IOException
556         {
557             _endPoint.close();
558         }
559 
560         public void close()
561         {
562             try
563             {
564                 closeClient();
565             }
566             catch (IOException x)
567             {
568                 _logger.debug("ProxyToServer: Unexpected exception closing the client", x);
569             }
570 
571             try
572             {
573                 closeServer();
574             }
575             catch (IOException x)
576             {
577                 _logger.debug("ProxyToServer: Unexpected exception closing the server", x);
578             }
579         }
580     }
581 
582     public class ClientToProxyConnection implements Connection
583     {
584         private final Buffer _buffer = new IndirectNIOBuffer(1024);
585         private final ConcurrentMap<String, Object> _context;
586         private final SocketChannel _channel;
587         private final EndPoint _endPoint;
588         private final long _timestamp;
589         private volatile ProxyToServerConnection _toServer;
590         private boolean _firstTime = true;
591 
592         public ClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timestamp)
593         {
594             _context = context;
595             _channel = channel;
596             _endPoint = endPoint;
597             _timestamp = timestamp;
598             _logger.debug("ClientToProxy: {}", _endPoint);
599         }
600 
601         public Connection handle() throws IOException
602         {
603             _logger.debug("ClientToProxy: begin reading from client");
604             try
605             {
606                 if (_firstTime)
607                 {
608                     _firstTime = false;
609                     register(_channel, _toServer);
610                     _logger.debug("ClientToProxy: registered channel {} with connection {}", _channel, _toServer);
611                 }
612 
613                 while (true)
614                 {
615                     int read = read(_endPoint, _buffer, _context);
616 
617                     if (read == -1)
618                     {
619                         _logger.debug("ClientToProxy: client closed connection {}", _endPoint);
620                         close();
621                         break;
622                     }
623 
624                     if (read == 0)
625                         break;
626 
627                     _logger.debug("ClientToProxy: read from client {} bytes {}", read, _endPoint);
628                     int written = write(_toServer._endPoint, _buffer, _context);
629                     _logger.debug("ClientToProxy: written to server {} bytes", written);
630                 }
631                 return this;
632             }
633             catch (ClosedChannelException x)
634             {
635                 _logger.debug("ClientToProxy",x);
636                 closeServer();
637                 throw x;
638             }
639             catch (IOException x)
640             {
641                 _logger.warn("ClientToProxy", x);
642                 close();
643                 throw x;
644             }
645             catch (RuntimeException x)
646             {
647                 _logger.warn("ClientToProxy", x);
648                 close();
649                 throw x;
650             }
651             finally
652             {
653                 _logger.debug("ClientToProxy: end reading from client");
654             }
655         }
656 
657         public long getTimeStamp()
658         {
659             return _timestamp;
660         }
661 
662         public boolean isIdle()
663         {
664             return false;
665         }
666 
667         public boolean isSuspended()
668         {
669             return false;
670         }
671 
672         public void setConnection(ProxyToServerConnection connection)
673         {
674             _toServer = connection;
675         }
676 
677         public void closeClient() throws IOException
678         {
679             _endPoint.close();
680         }
681 
682         public void closeServer() throws IOException
683         {
684             _toServer.closeServer();
685         }
686 
687         public void close()
688         {
689             try
690             {
691                 closeClient();
692             }
693             catch (IOException x)
694             {
695                 _logger.debug("ClientToProxy: Unexpected exception closing the client", x);
696             }
697 
698             try
699             {
700                 closeServer();
701             }
702             catch (IOException x)
703             {
704                 _logger.debug("ClientToProxy: Unexpected exception closing the server", x);
705             }
706         }
707     }
708 }