View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.lang.reflect.Constructor;
23  import java.net.ProtocolException;
24  import java.util.ArrayList;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.concurrent.ArrayBlockingQueue;
28  import java.util.concurrent.BlockingQueue;
29  import java.util.concurrent.RejectedExecutionException;
30  
31  import org.eclipse.jetty.client.HttpClient.Connector;
32  import org.eclipse.jetty.client.security.Authentication;
33  import org.eclipse.jetty.client.security.SecurityListener;
34  import org.eclipse.jetty.http.HttpCookie;
35  import org.eclipse.jetty.http.HttpHeaders;
36  import org.eclipse.jetty.http.HttpMethods;
37  import org.eclipse.jetty.http.HttpStatus;
38  import org.eclipse.jetty.http.PathMap;
39  import org.eclipse.jetty.io.Buffer;
40  import org.eclipse.jetty.io.ByteArrayBuffer;
41  import org.eclipse.jetty.io.Connection;
42  import org.eclipse.jetty.io.EndPoint;
43  import org.eclipse.jetty.util.component.AggregateLifeCycle;
44  import org.eclipse.jetty.util.component.Dumpable;
45  import org.eclipse.jetty.util.log.Log;
46  import org.eclipse.jetty.util.log.Logger;
47  
48  /**
49   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
50   */
51  public class HttpDestination implements Dumpable
52  {
53      private static final Logger LOG = Log.getLogger(HttpDestination.class);
54  
55      private final List<HttpExchange> _exchanges = new LinkedList<HttpExchange>();
56      private final List<AbstractHttpConnection> _connections = new LinkedList<AbstractHttpConnection>();
57      private final BlockingQueue<Object> _reservedConnections = new ArrayBlockingQueue<Object>(10, true);
58      private final List<AbstractHttpConnection> _idleConnections = new ArrayList<AbstractHttpConnection>();
59      private final HttpClient _client;
60      private final Address _address;
61      private final boolean _ssl;
62      private final ByteArrayBuffer _hostHeader;
63      private volatile int _maxConnections;
64      private volatile int _maxQueueSize;
65      private int _pendingConnections = 0;
66      private int _pendingReservedConnections = 0;
67      private volatile Address _proxy;
68      private Authentication _proxyAuthentication;
69      private PathMap _authorizations;
70      private List<HttpCookie> _cookies;
71  
72      HttpDestination(HttpClient client, Address address, boolean ssl)
73      {
74          _client = client;
75          _address = address;
76          _ssl = ssl;
77          _maxConnections = _client.getMaxConnectionsPerAddress();
78          _maxQueueSize = _client.getMaxQueueSizePerAddress();
79          String addressString = address.getHost();
80          if (address.getPort() != (_ssl ? 443 : 80))
81              addressString += ":" + address.getPort();
82          _hostHeader = new ByteArrayBuffer(addressString);
83      }
84  
85      public HttpClient getHttpClient()
86      {
87          return _client;
88      }
89  
90      public Address getAddress()
91      {
92          return _address;
93      }
94  
95      public boolean isSecure()
96      {
97          return _ssl;
98      }
99  
100     public Buffer getHostHeader()
101     {
102         return _hostHeader;
103     }
104 
105     public int getMaxConnections()
106     {
107         return _maxConnections;
108     }
109 
110     public void setMaxConnections(int maxConnections)
111     {
112         this._maxConnections = maxConnections;
113     }
114 
115     public int getMaxQueueSize()
116     {
117         return _maxQueueSize;
118     }
119 
120     public void setMaxQueueSize(int maxQueueSize)
121     {
122         this._maxQueueSize = maxQueueSize;
123     }
124 
125     public int getConnections()
126     {
127         synchronized (this)
128         {
129             return _connections.size();
130         }
131     }
132 
133     public int getIdleConnections()
134     {
135         synchronized (this)
136         {
137             return _idleConnections.size();
138         }
139     }
140 
141     public void addAuthorization(String pathSpec, Authentication authorization)
142     {
143         synchronized (this)
144         {
145             if (_authorizations == null)
146                 _authorizations = new PathMap();
147             _authorizations.put(pathSpec, authorization);
148         }
149 
150         // TODO query and remove methods
151     }
152 
153     public void addCookie(HttpCookie cookie)
154     {
155         synchronized (this)
156         {
157             if (_cookies == null)
158                 _cookies = new ArrayList<HttpCookie>();
159             _cookies.add(cookie);
160         }
161 
162         // TODO query, remove and age methods
163     }
164 
165     /**
166      * Get a connection. We either get an idle connection if one is available, or
167      * we make a new connection, if we have not yet reached maxConnections. If we
168      * have reached maxConnections, we wait until the number reduces.
169      *
170      * @param timeout max time prepared to block waiting to be able to get a connection
171      * @return a HttpConnection for this destination
172      * @throws IOException if an I/O error occurs
173      */
174     private AbstractHttpConnection getConnection(long timeout) throws IOException
175     {
176         AbstractHttpConnection connection = null;
177 
178         while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
179         {
180             boolean startConnection = false;
181             synchronized (this)
182             {
183                 int totalConnections = _connections.size() + _pendingConnections;
184                 if (totalConnections < _maxConnections)
185                 {
186                     _pendingReservedConnections++;
187                     startConnection = true;
188                 }
189             }
190 
191             if (startConnection)
192             {
193                 startNewConnection();
194                 try
195                 {
196                     Object o = _reservedConnections.take();
197                     if (o instanceof AbstractHttpConnection)
198                     {
199                         connection = (AbstractHttpConnection)o;
200                     }
201                     else
202                         throw (IOException)o;
203                 }
204                 catch (InterruptedException e)
205                 {
206                     LOG.ignore(e);
207                 }
208             }
209             else
210             {
211                 try
212                 {
213                     Thread.currentThread();
214                     Thread.sleep(200);
215                     timeout -= 200;
216                 }
217                 catch (InterruptedException e)
218                 {
219                     LOG.ignore(e);
220                 }
221             }
222         }
223         return connection;
224     }
225 
226     public AbstractHttpConnection reserveConnection(long timeout) throws IOException
227     {
228         AbstractHttpConnection connection = getConnection(timeout);
229         if (connection != null)
230             connection.setReserved(true);
231         return connection;
232     }
233 
234     public AbstractHttpConnection getIdleConnection() throws IOException
235     {
236         AbstractHttpConnection connection = null;
237         while (true)
238         {
239             synchronized (this)
240             {
241                 if (connection != null)
242                 {
243                     _connections.remove(connection);
244                     connection.close();
245                     connection = null;
246                 }
247                 if (_idleConnections.size() > 0)
248                     connection = _idleConnections.remove(_idleConnections.size() - 1);
249             }
250 
251             if (connection == null)
252             {
253                 return null;
254             }
255 
256             // Check if the connection was idle,
257             // but it expired just a moment ago
258             if (connection.cancelIdleTimeout())
259             {
260                 return connection;
261             }
262         }
263     }
264 
265     protected void startNewConnection()
266     {
267         try
268         {
269             synchronized (this)
270             {
271                 _pendingConnections++;
272             }
273             final Connector connector = _client._connector;
274             if (connector != null)
275                 connector.startConnection(this);
276         }
277         catch (Exception e)
278         {
279             LOG.debug(e);
280             onConnectionFailed(e);
281         }
282     }
283 
284     public void onConnectionFailed(Throwable throwable)
285     {
286         Throwable connect_failure = null;
287 
288         boolean startConnection = false;
289         synchronized (this)
290         {
291             _pendingConnections--;
292             if (_pendingReservedConnections > 0)
293             {
294                 connect_failure = throwable;
295                 _pendingReservedConnections--;
296             }
297             else if (_exchanges.size() > 0)
298             {
299                 HttpExchange ex = _exchanges.remove(0);
300                 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
301                     ex.getEventListener().onConnectionFailed(throwable);
302 
303                 // Since an existing connection had failed, we need to create a
304                 // connection if the  queue is not empty and client is running.
305                 if (!_exchanges.isEmpty() && _client.isStarted())
306                     startConnection = true;
307             }
308         }
309 
310         if (startConnection)
311             startNewConnection();
312 
313         if (connect_failure != null)
314         {
315             try
316             {
317                 _reservedConnections.put(connect_failure);
318             }
319             catch (InterruptedException e)
320             {
321                 LOG.ignore(e);
322             }
323         }
324     }
325 
326     public void onException(Throwable throwable)
327     {
328         synchronized (this)
329         {
330             _pendingConnections--;
331             if (_exchanges.size() > 0)
332             {
333                 HttpExchange ex = _exchanges.remove(0);
334                 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
335                     ex.getEventListener().onException(throwable);
336             }
337         }
338     }
339 
340     public void onNewConnection(final AbstractHttpConnection connection) throws IOException
341     {
342         Connection reservedConnection = null;
343 
344         synchronized (this)
345         {
346             _pendingConnections--;
347             _connections.add(connection);
348 
349             if (_pendingReservedConnections > 0)
350             {
351                 reservedConnection = connection;
352                 _pendingReservedConnections--;
353             }
354             else
355             {
356                 // Establish the tunnel if needed
357                 EndPoint endPoint = connection.getEndPoint();
358                 if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint)
359                 {
360                     SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint;
361                     ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint);
362                     connect.setAddress(getProxy());
363                     LOG.debug("Establishing tunnel to {} via {}", getAddress(), getProxy());
364                     send(connection, connect);
365                 }
366                 else
367                 {
368                     // Another connection stole the exchange that caused the creation of this connection ?
369                     if (_exchanges.size() == 0)
370                     {
371                         LOG.debug("No exchanges for new connection {}", connection);
372                         connection.setIdleTimeout();
373                         _idleConnections.add(connection);
374                     }
375                     else
376                     {
377                         HttpExchange exchange = _exchanges.remove(0);
378                         send(connection, exchange);
379                     }
380                 }
381             }
382         }
383 
384         if (reservedConnection != null)
385         {
386             try
387             {
388                 _reservedConnections.put(reservedConnection);
389             }
390             catch (InterruptedException e)
391             {
392                 LOG.ignore(e);
393             }
394         }
395     }
396 
397     public void returnConnection(AbstractHttpConnection connection, boolean close) throws IOException
398     {
399         if (connection.isReserved())
400             connection.setReserved(false);
401 
402         if (close)
403         {
404             try
405             {
406                 connection.close();
407             }
408             catch (IOException e)
409             {
410                 LOG.ignore(e);
411             }
412         }
413 
414         if (!_client.isStarted())
415             return;
416 
417         if (!close && connection.getEndPoint().isOpen())
418         {
419             synchronized (this)
420             {
421                 if (_exchanges.size() == 0)
422                 {
423                     connection.setIdleTimeout();
424                     _idleConnections.add(connection);
425                 }
426                 else
427                 {
428                     HttpExchange ex = _exchanges.remove(0);
429                     send(connection, ex);
430                 }
431                 this.notifyAll();
432             }
433         }
434         else
435         {
436             boolean startConnection = false;
437             synchronized (this)
438             {
439                 _connections.remove(connection);
440                 if (!_exchanges.isEmpty())
441                     startConnection = true;
442             }
443 
444             if (startConnection)
445                 startNewConnection();
446         }
447     }
448 
449     public void returnIdleConnection(AbstractHttpConnection connection)
450     {
451         // TODO work out the real idle time;
452         long idleForMs = connection.getEndPoint() != null ? connection.getEndPoint().getMaxIdleTime() : -1;
453         connection.onIdleExpired(idleForMs);
454 
455         boolean startConnection = false;
456         synchronized (this)
457         {
458             _idleConnections.remove(connection);
459             _connections.remove(connection);
460 
461             if (!_exchanges.isEmpty() && _client.isStarted())
462                 startConnection = true;
463         }
464 
465         if (startConnection)
466             startNewConnection();
467     }
468 
469     public void send(HttpExchange ex) throws IOException
470     {
471         LinkedList<String> listeners = _client.getRegisteredListeners();
472 
473         if (listeners != null)
474         {
475             // Add registered listeners, fail if we can't load them
476             for (int i = listeners.size(); i > 0; --i)
477             {
478                 String listenerClass = listeners.get(i - 1);
479                 try
480                 {
481                     Class<?> listener = Class.forName(listenerClass);
482                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
483                     HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
484                     ex.setEventListener(elistener);
485                 }
486                 catch (final Exception e)
487                 {
488                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass)
489                     {
490                         {
491                             initCause(e);
492                         }
493                     };
494                 }
495             }
496         }
497 
498         // Security is supported by default and should be the first consulted
499         if (_client.hasRealms())
500         {
501             ex.setEventListener(new SecurityListener(this, ex));
502         }
503 
504         doSend(ex);
505     }
506 
507     public void resend(HttpExchange ex) throws IOException
508     {
509         ex.getEventListener().onRetry();
510         ex.reset();
511         doSend(ex);
512     }
513 
514     protected void doSend(HttpExchange ex) throws IOException
515     {
516         // add cookies
517         // TODO handle max-age etc.
518         if (_cookies != null)
519         {
520             StringBuilder buf = null;
521             for (HttpCookie cookie : _cookies)
522             {
523                 if (buf == null)
524                     buf = new StringBuilder();
525                 else
526                     buf.append("; ");
527                 buf.append(cookie.getName()); // TODO quotes
528                 buf.append("=");
529                 buf.append(cookie.getValue()); // TODO quotes
530             }
531             if (buf != null)
532                 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
533         }
534 
535         // Add any known authorizations
536         if (_authorizations != null)
537         {
538             Authentication auth = (Authentication)_authorizations.match(ex.getRequestURI());
539             if (auth != null)
540                 (auth).setCredentials(ex);
541         }
542 
543         // Schedule the timeout here, before we queue the exchange
544         // so that we count also the queue time in the timeout
545         ex.scheduleTimeout(this);
546 
547         AbstractHttpConnection connection = getIdleConnection();
548         if (connection != null)
549         {
550             send(connection, ex);
551         }
552         else
553         {
554             boolean startConnection = false;
555             synchronized (this)
556             {
557                 if (_exchanges.size() == _maxQueueSize)
558                     throw new RejectedExecutionException("Queue full for address " + _address);
559 
560                 _exchanges.add(ex);
561                 if (_connections.size() + _pendingConnections < _maxConnections)
562                     startConnection = true;
563             }
564 
565             if (startConnection)
566                 startNewConnection();
567         }
568     }
569 
570     protected void exchangeExpired(HttpExchange exchange)
571     {
572         // The exchange may expire while waiting in the
573         // destination queue, make sure it is removed
574         synchronized (this)
575         {
576             _exchanges.remove(exchange);
577         }
578     }
579 
580     protected void send(AbstractHttpConnection connection, HttpExchange exchange) throws IOException
581     {
582         synchronized (this)
583         {
584             // If server closes the connection, put the exchange back
585             // to the exchange queue and recycle the connection
586             if (!connection.send(exchange))
587             {
588                 if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
589                     _exchanges.add(0, exchange);
590                 returnIdleConnection(connection);
591             }
592         }
593     }
594 
595     @Override
596     public synchronized String toString()
597     {
598         return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n", hashCode(), _address.getHost(), _address.getPort(), _connections.size(), _maxConnections, _idleConnections.size(), _exchanges.size(), _maxQueueSize);
599     }
600 
601     public synchronized String toDetailString()
602     {
603         StringBuilder b = new StringBuilder();
604         b.append(toString());
605         b.append('\n');
606         synchronized (this)
607         {
608             for (AbstractHttpConnection connection : _connections)
609             {
610                 b.append(connection.toDetailString());
611                 if (_idleConnections.contains(connection))
612                     b.append(" IDLE");
613                 b.append('\n');
614             }
615         }
616         b.append("--");
617         b.append('\n');
618 
619         return b.toString();
620     }
621 
622     public void setProxy(Address proxy)
623     {
624         _proxy = proxy;
625     }
626 
627     public Address getProxy()
628     {
629         return _proxy;
630     }
631 
632     public Authentication getProxyAuthentication()
633     {
634         return _proxyAuthentication;
635     }
636 
637     public void setProxyAuthentication(Authentication authentication)
638     {
639         _proxyAuthentication = authentication;
640     }
641 
642     public boolean isProxied()
643     {
644         return _proxy != null;
645     }
646 
647     public void close() throws IOException
648     {
649         synchronized (this)
650         {
651             for (AbstractHttpConnection connection : _connections)
652             {
653                 connection.close();
654             }
655         }
656     }
657 
658     public String dump()
659     {
660         return AggregateLifeCycle.dump(this);
661     }
662 
663     public void dump(Appendable out, String indent) throws IOException
664     {
665         synchronized (this)
666         {
667             out.append(String.valueOf(this));
668             out.append("idle=");
669             out.append(String.valueOf(_idleConnections.size()));
670             out.append(" pending=");
671             out.append(String.valueOf(_pendingConnections));
672             out.append("\n");
673             AggregateLifeCycle.dump(out, indent, _connections);
674         }
675     }
676 
677     private class ConnectExchange extends ContentExchange
678     {
679         private final SelectConnector.UpgradableEndPoint proxyEndPoint;
680 
681         public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint)
682         {
683             this.proxyEndPoint = proxyEndPoint;
684             setMethod(HttpMethods.CONNECT);
685             String serverHostAndPort = serverAddress.toString();
686             setRequestURI(serverHostAndPort);
687             addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
688             addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
689             addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
690         }
691 
692         @Override
693         protected void onResponseComplete() throws IOException
694         {
695             int responseStatus = getResponseStatus();
696             if (responseStatus == HttpStatus.OK_200)
697             {
698                 proxyEndPoint.upgrade();
699             }
700             else if (responseStatus == HttpStatus.GATEWAY_TIMEOUT_504)
701             {
702                 onExpire();
703             }
704             else
705             {
706                 onException(new ProtocolException("Proxy: " + proxyEndPoint.getRemoteAddr() + ":" + proxyEndPoint.getRemotePort() + " didn't return http return code 200, but " + responseStatus));
707             }
708         }
709 
710         @Override
711         protected void onConnectionFailed(Throwable x)
712         {
713             HttpDestination.this.onConnectionFailed(x);
714         }
715 
716         @Override
717         protected void onException(Throwable x)
718         {
719             HttpExchange exchange = null;
720             synchronized (HttpDestination.this)
721             {
722                 if (!_exchanges.isEmpty())
723                     exchange = _exchanges.remove(0);
724             }
725             if (exchange != null && exchange.setStatus(STATUS_EXCEPTED))
726                 exchange.getEventListener().onException(x);
727         }
728 
729         @Override
730         protected void onExpire()
731         {
732             HttpExchange exchange = null;
733             synchronized (HttpDestination.this)
734             {
735                 if (!_exchanges.isEmpty())
736                     exchange = _exchanges.remove(0);
737             }
738             if (exchange != null && exchange.setStatus(STATUS_EXPIRED))
739                 exchange.getEventListener().onExpire();
740         }
741     }
742 }