View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.lang.reflect.Constructor;
18  import java.net.ConnectException;
19  import java.util.ArrayList;
20  import java.util.LinkedList;
21  import java.util.List;
22  import java.util.concurrent.ArrayBlockingQueue;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.RejectedExecutionException;
25  
26  import org.eclipse.jetty.client.HttpClient.Connector;
27  import org.eclipse.jetty.client.security.Authentication;
28  import org.eclipse.jetty.client.security.SecurityListener;
29  import org.eclipse.jetty.http.HttpCookie;
30  import org.eclipse.jetty.http.HttpHeaders;
31  import org.eclipse.jetty.http.HttpMethods;
32  import org.eclipse.jetty.http.HttpStatus;
33  import org.eclipse.jetty.http.PathMap;
34  import org.eclipse.jetty.io.Buffer;
35  import org.eclipse.jetty.io.ByteArrayBuffer;
36  import org.eclipse.jetty.io.Connection;
37  import org.eclipse.jetty.io.EndPoint;
38  import org.eclipse.jetty.util.component.AggregateLifeCycle;
39  import org.eclipse.jetty.util.component.Dumpable;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  
43  /**
44   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
45   */
46  public class HttpDestination implements Dumpable
47  {
48      private static final Logger LOG = Log.getLogger(HttpDestination.class);
49  
50      private final List<HttpExchange> _queue = new LinkedList<HttpExchange>();
51      private final List<HttpConnection> _connections = new LinkedList<HttpConnection>();
52      private final BlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
53      private final List<HttpConnection> _idle = new ArrayList<HttpConnection>();
54      private final HttpClient _client;
55      private final Address _address;
56      private final boolean _ssl;
57      private final ByteArrayBuffer _hostHeader;
58      private volatile int _maxConnections;
59      private volatile int _maxQueueSize;
60      private int _pendingConnections = 0;
61      private int _newConnection = 0;
62      private volatile Address _proxy;
63      private Authentication _proxyAuthentication;
64      private PathMap _authorizations;
65      private List<HttpCookie> _cookies;
66  
67  
68      
69      HttpDestination(HttpClient client, Address address, boolean ssl)
70      {
71          _client = client;
72          _address = address;
73          _ssl = ssl;
74          _maxConnections = _client.getMaxConnectionsPerAddress();
75          _maxQueueSize = _client.getMaxQueueSizePerAddress();
76          String addressString = address.getHost();
77          if (address.getPort() != (_ssl ? 443 : 80))
78              addressString += ":" + address.getPort();
79          _hostHeader = new ByteArrayBuffer(addressString);
80      }
81  
82      public HttpClient getHttpClient()
83      {
84          return _client;
85      }
86  
87      public Address getAddress()
88      {
89          return _address;
90      }
91  
92      public boolean isSecure()
93      {
94          return _ssl;
95      }
96  
97      public Buffer getHostHeader()
98      {
99          return _hostHeader;
100     }
101 
102     public int getMaxConnections()
103     {
104         return _maxConnections;
105     }
106 
107     public void setMaxConnections(int maxConnections)
108     {
109         this._maxConnections = maxConnections;
110     }
111 
112     public int getMaxQueueSize()
113     {
114         return _maxQueueSize;
115     }
116 
117     public void setMaxQueueSize(int maxQueueSize)
118     {
119         this._maxQueueSize = maxQueueSize;
120     }
121 
122     public int getConnections()
123     {
124         synchronized (this)
125         {
126             return _connections.size();
127         }
128     }
129 
130     public int getIdleConnections()
131     {
132         synchronized (this)
133         {
134             return _idle.size();
135         }
136     }
137 
138     public void addAuthorization(String pathSpec, Authentication authorization)
139     {
140         synchronized (this)
141         {
142             if (_authorizations == null)
143                 _authorizations = new PathMap();
144             _authorizations.put(pathSpec, authorization);
145         }
146 
147         // TODO query and remove methods
148     }
149 
150     public void addCookie(HttpCookie cookie)
151     {
152         synchronized (this)
153         {
154             if (_cookies == null)
155                 _cookies = new ArrayList<HttpCookie>();
156             _cookies.add(cookie);
157         }
158 
159         // TODO query, remove and age methods
160     }
161 
162     /**
163      * Get a connection. We either get an idle connection if one is available, or
164      * we make a new connection, if we have not yet reached maxConnections. If we
165      * have reached maxConnections, we wait until the number reduces.
166      *
167      * @param timeout max time prepared to block waiting to be able to get a connection
168      * @return a HttpConnection for this destination
169      * @throws IOException if an I/O error occurs
170      */
171     private HttpConnection getConnection(long timeout) throws IOException
172     {
173         HttpConnection connection = null;
174 
175         while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
176         {
177             boolean startConnection = false;
178             synchronized (this)
179             {
180                 int totalConnections = _connections.size() + _pendingConnections;
181                 if (totalConnections < _maxConnections)
182                 {
183                     _newConnection++;
184                     startConnection = true;
185                 }
186             }
187 
188             if (startConnection)
189             {
190                 startNewConnection();
191                 try
192                 {
193                     Object o = _newQueue.take();
194                     if (o instanceof HttpConnection)
195                     {
196                         connection = (HttpConnection)o;
197                     }
198                     else
199                         throw (IOException)o;
200                 }
201                 catch (InterruptedException e)
202                 {
203                     LOG.ignore(e);
204                 }
205             }
206             else
207             {
208                 try
209                 {
210                     Thread.currentThread();
211                     Thread.sleep(200);
212                     timeout -= 200;
213                 }
214                 catch (InterruptedException e)
215                 {
216                     LOG.ignore(e);
217                 }
218             }
219         }
220         return connection;
221     }
222 
223     public HttpConnection reserveConnection(long timeout) throws IOException
224     {
225         HttpConnection connection = getConnection(timeout);
226         if (connection != null)
227             connection.setReserved(true);
228         return connection;
229     }
230 
231     public HttpConnection getIdleConnection() throws IOException
232     {
233         HttpConnection connection = null;
234         while (true)
235         {
236             synchronized (this)
237             {
238                 if (connection != null)
239                 {
240                     _connections.remove(connection);
241                     connection.close();
242                     connection = null;
243                 }
244                 if (_idle.size() > 0)
245                     connection = _idle.remove(_idle.size() - 1);
246             }
247 
248             if (connection == null)
249                 return null;
250 
251             // Check if the connection was idle,
252             // but it expired just a moment ago
253             if (connection.cancelIdleTimeout())
254                 return connection;
255         }
256     }
257 
258     protected void startNewConnection()
259     {
260         try
261         {
262             synchronized (this)
263             {
264                 _pendingConnections++;
265             }
266             final Connector connector = _client._connector;
267             if (connector != null)
268                 connector.startConnection(this);
269         }
270         catch (Exception e)
271         {
272             LOG.debug(e);
273             onConnectionFailed(e);
274         }
275     }
276 
277     public void onConnectionFailed(Throwable throwable)
278     {
279         Throwable connect_failure = null;
280 
281         boolean startConnection = false;
282         synchronized (this)
283         {
284             _pendingConnections--;
285             if (_newConnection > 0)
286             {
287                 connect_failure = throwable;
288                 _newConnection--;
289             }
290             else if (_queue.size() > 0)
291             {
292                 HttpExchange ex = _queue.remove(0);
293                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
294                 ex.getEventListener().onConnectionFailed(throwable);
295 
296                 // Since an existing connection had failed, we need to create a
297                 // connection if the  queue is not empty and client is running.
298                 if (!_queue.isEmpty() && _client.isStarted())
299                     startConnection = true;
300             }
301         }
302 
303         if (startConnection)
304             startNewConnection();
305 
306         if (connect_failure != null)
307         {
308             try
309             {
310                 _newQueue.put(connect_failure);
311             }
312             catch (InterruptedException e)
313             {
314                 LOG.ignore(e);
315             }
316         }
317     }
318 
319     public void onException(Throwable throwable)
320     {
321         synchronized (this)
322         {
323             _pendingConnections--;
324             if (_queue.size() > 0)
325             {
326                 HttpExchange ex = _queue.remove(0);
327                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
328                 ex.getEventListener().onException(throwable);
329             }
330         }
331     }
332 
333     public void onNewConnection(final HttpConnection connection) throws IOException
334     {
335         Connection q_connection = null;
336 
337         synchronized (this)
338         {
339             _pendingConnections--;
340             _connections.add(connection);
341 
342             if (_newConnection > 0)
343             {
344                 q_connection = connection;
345                 _newConnection--;
346             }
347             else if (_queue.size() == 0)
348             {
349                 connection.setIdleTimeout();
350                 _idle.add(connection);
351             }
352             else
353             {
354                 EndPoint endPoint = connection.getEndPoint();
355                 if (isProxied() && endPoint instanceof SelectConnector.ProxySelectChannelEndPoint)
356                 {
357                     SelectConnector.ProxySelectChannelEndPoint proxyEndPoint = (SelectConnector.ProxySelectChannelEndPoint)endPoint;
358                     HttpExchange exchange = _queue.get(0);
359                     ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange);
360                     connect.setAddress(getProxy());
361                     send(connection, connect);
362                 }
363                 else
364                 {
365                     HttpExchange exchange = _queue.remove(0);
366                     send(connection, exchange);
367                 }
368             }
369         }
370 
371         if (q_connection != null)
372         {
373             try
374             {
375                 _newQueue.put(q_connection);
376             }
377             catch (InterruptedException e)
378             {
379                 LOG.ignore(e);
380             }
381         }
382     }
383 
384     public void returnConnection(HttpConnection connection, boolean close) throws IOException
385     {
386         if (connection.isReserved())
387             connection.setReserved(false);
388 
389         if (close)
390         {
391             try
392             {
393                 connection.close();
394             }
395             catch (IOException e)
396             {
397                 LOG.ignore(e);
398             }
399         }
400 
401         if (!_client.isStarted())
402             return;
403 
404         if (!close && connection.getEndPoint().isOpen())
405         {
406             synchronized (this)
407             {
408                 if (_queue.size() == 0)
409                 {
410                     connection.setIdleTimeout();
411                     _idle.add(connection);
412                 }
413                 else
414                 {
415                     HttpExchange ex = _queue.remove(0);
416                     send(connection, ex);
417                 }
418                 this.notifyAll();
419             }
420         }
421         else
422         {
423             boolean startConnection = false;
424             synchronized (this)
425             {
426                 _connections.remove(connection);
427                 if (!_queue.isEmpty())
428                     startConnection = true;
429             }
430 
431             if (startConnection)
432                 startNewConnection();
433         }
434     }
435 
436     public void returnIdleConnection(HttpConnection connection)
437     {
438         try
439         {
440             connection.close();
441         }
442         catch (IOException e)
443         {
444             LOG.ignore(e);
445         }
446 
447         boolean startConnection = false;
448         synchronized (this)
449         {
450             _idle.remove(connection);
451             _connections.remove(connection);
452 
453             if (!_queue.isEmpty() && _client.isStarted())
454                 startConnection = true;
455         }
456 
457         if (startConnection)
458             startNewConnection();
459     }
460 
461     public void send(HttpExchange ex) throws IOException
462     {
463         LinkedList<String> listeners = _client.getRegisteredListeners();
464 
465         if (listeners != null)
466         {
467             // Add registered listeners, fail if we can't load them
468             for (int i = listeners.size(); i > 0; --i)
469             {
470                 String listenerClass = listeners.get(i - 1);
471 
472                 try
473                 {
474                     Class listener = Class.forName(listenerClass);
475                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
476                     HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
477                     ex.setEventListener(elistener);
478                 }
479                 catch (Exception e)
480                 {
481                     e.printStackTrace();
482                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
483                 }
484             }
485         }
486 
487         // Security is supported by default and should be the first consulted
488         if (_client.hasRealms())
489         {
490             ex.setEventListener(new SecurityListener(this, ex));
491         }
492 
493         doSend(ex);
494     }
495 
496     public void resend(HttpExchange ex) throws IOException
497     {
498         ex.getEventListener().onRetry();
499         ex.reset();
500         doSend(ex);
501     }
502 
503     protected void doSend(HttpExchange ex) throws IOException
504     {
505         // add cookies
506         // TODO handle max-age etc.
507         if (_cookies != null)
508         {
509             StringBuilder buf = null;
510             for (HttpCookie cookie : _cookies)
511             {
512                 if (buf == null)
513                     buf = new StringBuilder();
514                 else
515                     buf.append("; ");
516                 buf.append(cookie.getName()); // TODO quotes
517                 buf.append("=");
518                 buf.append(cookie.getValue()); // TODO quotes
519             }
520             if (buf != null)
521                 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
522         }
523 
524         // Add any known authorizations
525         if (_authorizations != null)
526         {
527             Authentication auth = (Authentication)_authorizations.match(ex.getURI());
528             if (auth != null)
529                 (auth).setCredentials(ex);
530         }
531 
532         // Schedule the timeout here, before we queue the exchange
533         // so that we count also the queue time in the timeout
534         ex.scheduleTimeout(this);
535 
536         HttpConnection connection = getIdleConnection();
537         if (connection != null)
538         {
539             send(connection, ex);
540         }
541         else
542         {
543             boolean startConnection = false;
544             synchronized (this)
545             {
546                 if (_queue.size() == _maxQueueSize)
547                     throw new RejectedExecutionException("Queue full for address " + _address);
548 
549                 _queue.add(ex);
550                 if (_connections.size() + _pendingConnections < _maxConnections)
551                     startConnection = true;
552             }
553 
554             if (startConnection)
555                 startNewConnection();
556         }
557     }
558 
559     protected void exchangeExpired(HttpExchange exchange)
560     {
561         // The exchange may expire while waiting in the
562         // destination queue, make sure it is removed
563         synchronized (this)
564         {
565             _queue.remove(exchange);
566         }
567     }
568 
569     protected void send(HttpConnection connection, HttpExchange exchange) throws IOException
570     {
571         synchronized (this)
572         {
573             // If server closes the connection, put the exchange back
574             // to the exchange queue and recycle the connection
575             if (!connection.send(exchange))
576             {
577                 if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
578                     _queue.add(0, exchange);
579                 returnIdleConnection(connection);
580             }
581         }
582     }
583 
584     @Override
585     public synchronized String toString()
586     {
587         return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n",hashCode(),_address.getHost(),_address.getPort(),_connections.size(),_maxConnections,_idle.size(),_queue.size(),_maxQueueSize);
588     }
589 
590     public synchronized String toDetailString()
591     {
592         StringBuilder b = new StringBuilder();
593         b.append(toString());
594         b.append('\n');
595         synchronized (this)
596         {
597             for (HttpConnection connection : _connections)
598             {
599                 b.append(connection.toDetailString());
600                 if (_idle.contains(connection))
601                     b.append(" IDLE");
602                 b.append('\n');
603             }
604         }
605         b.append("--");
606         b.append('\n');
607 
608         return b.toString();
609     }
610 
611     public void setProxy(Address proxy)
612     {
613         _proxy = proxy;
614     }
615 
616     public Address getProxy()
617     {
618         return _proxy;
619     }
620 
621     public Authentication getProxyAuthentication()
622     {
623         return _proxyAuthentication;
624     }
625 
626     public void setProxyAuthentication(Authentication authentication)
627     {
628         _proxyAuthentication = authentication;
629     }
630 
631     public boolean isProxied()
632     {
633         return _proxy != null;
634     }
635 
636     public void close() throws IOException
637     {
638         synchronized (this)
639         {
640             for (HttpConnection connection : _connections)
641             {
642                 connection.close();
643             }
644         }
645     }
646 
647     /* ------------------------------------------------------------ */
648     /**
649      * @see org.eclipse.jetty.util.component.Dumpable#dump()
650      */
651     public String dump()
652     {
653         return AggregateLifeCycle.dump(this);
654     }
655 
656     /* ------------------------------------------------------------ */
657     /**
658      * @see org.eclipse.jetty.util.component.Dumpable#dump(java.lang.Appendable, java.lang.String)
659      */
660     public void dump(Appendable out, String indent) throws IOException
661     {
662         synchronized (this)
663         {
664             out.append(String.valueOf(this)+"idle="+_idle.size()+" pending="+_pendingConnections).append("\n");
665             AggregateLifeCycle.dump(out,indent,_connections);
666         }
667     }
668     
669     private class ConnectExchange extends ContentExchange
670     {
671         private final SelectConnector.ProxySelectChannelEndPoint proxyEndPoint;
672         private final HttpExchange exchange;
673 
674         public ConnectExchange(Address serverAddress, SelectConnector.ProxySelectChannelEndPoint proxyEndPoint, HttpExchange exchange)
675         {
676             this.proxyEndPoint = proxyEndPoint;
677             this.exchange = exchange;
678             setMethod(HttpMethods.CONNECT);
679             setVersion(exchange.getVersion());
680             String serverHostAndPort = serverAddress.toString();
681             setRequestURI(serverHostAndPort);
682             addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
683             addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
684             addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
685         }
686 
687         @Override
688         protected void onResponseComplete() throws IOException
689         {
690             if (getResponseStatus() == HttpStatus.OK_200)
691             {
692                 proxyEndPoint.upgrade();
693             }
694             else
695             {
696                 onConnectionFailed(new ConnectException(exchange.getAddress().toString()));
697             }
698         }
699 
700         @Override
701         protected void onConnectionFailed(Throwable x)
702         {
703             HttpDestination.this.onConnectionFailed(x);
704         }
705     }
706 }