View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.lang.reflect.Constructor;
18  import java.net.ProtocolException;
19  import java.util.ArrayList;
20  import java.util.LinkedList;
21  import java.util.List;
22  import java.util.concurrent.ArrayBlockingQueue;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.RejectedExecutionException;
25  
26  import org.eclipse.jetty.client.HttpClient.Connector;
27  import org.eclipse.jetty.client.security.Authentication;
28  import org.eclipse.jetty.client.security.SecurityListener;
29  import org.eclipse.jetty.http.HttpCookie;
30  import org.eclipse.jetty.http.HttpHeaders;
31  import org.eclipse.jetty.http.HttpMethods;
32  import org.eclipse.jetty.http.HttpStatus;
33  import org.eclipse.jetty.http.PathMap;
34  import org.eclipse.jetty.io.Buffer;
35  import org.eclipse.jetty.io.ByteArrayBuffer;
36  import org.eclipse.jetty.io.Connection;
37  import org.eclipse.jetty.io.EndPoint;
38  import org.eclipse.jetty.util.component.AggregateLifeCycle;
39  import org.eclipse.jetty.util.component.Dumpable;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  
43  /**
44   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
45   */
46  public class HttpDestination implements Dumpable
47  {
48      private static final Logger LOG = Log.getLogger(HttpDestination.class);
49  
50      private final List<HttpExchange> _queue = new LinkedList<HttpExchange>();
51      private final List<AbstractHttpConnection> _connections = new LinkedList<AbstractHttpConnection>();
52      private final BlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
53      private final List<AbstractHttpConnection> _idle = new ArrayList<AbstractHttpConnection>();
54      private final HttpClient _client;
55      private final Address _address;
56      private final boolean _ssl;
57      private final ByteArrayBuffer _hostHeader;
58      private volatile int _maxConnections;
59      private volatile int _maxQueueSize;
60      private int _pendingConnections = 0;
61      private int _newConnection = 0;
62      private volatile Address _proxy;
63      private Authentication _proxyAuthentication;
64      private PathMap _authorizations;
65      private List<HttpCookie> _cookies;
66  
67  
68  
69      HttpDestination(HttpClient client, Address address, boolean ssl)
70      {
71          _client = client;
72          _address = address;
73          _ssl = ssl;
74          _maxConnections = _client.getMaxConnectionsPerAddress();
75          _maxQueueSize = _client.getMaxQueueSizePerAddress();
76          String addressString = address.getHost();
77          if (address.getPort() != (_ssl ? 443 : 80))
78              addressString += ":" + address.getPort();
79          _hostHeader = new ByteArrayBuffer(addressString);
80      }
81  
82      public HttpClient getHttpClient()
83      {
84          return _client;
85      }
86  
87      public Address getAddress()
88      {
89          return _address;
90      }
91  
92      public boolean isSecure()
93      {
94          return _ssl;
95      }
96  
97      public Buffer getHostHeader()
98      {
99          return _hostHeader;
100     }
101 
102     public int getMaxConnections()
103     {
104         return _maxConnections;
105     }
106 
107     public void setMaxConnections(int maxConnections)
108     {
109         this._maxConnections = maxConnections;
110     }
111 
112     public int getMaxQueueSize()
113     {
114         return _maxQueueSize;
115     }
116 
117     public void setMaxQueueSize(int maxQueueSize)
118     {
119         this._maxQueueSize = maxQueueSize;
120     }
121 
122     public int getConnections()
123     {
124         synchronized (this)
125         {
126             return _connections.size();
127         }
128     }
129 
130     public int getIdleConnections()
131     {
132         synchronized (this)
133         {
134             return _idle.size();
135         }
136     }
137 
138     public void addAuthorization(String pathSpec, Authentication authorization)
139     {
140         synchronized (this)
141         {
142             if (_authorizations == null)
143                 _authorizations = new PathMap();
144             _authorizations.put(pathSpec, authorization);
145         }
146 
147         // TODO query and remove methods
148     }
149 
150     public void addCookie(HttpCookie cookie)
151     {
152         synchronized (this)
153         {
154             if (_cookies == null)
155                 _cookies = new ArrayList<HttpCookie>();
156             _cookies.add(cookie);
157         }
158 
159         // TODO query, remove and age methods
160     }
161 
162     /**
163      * Get a connection. We either get an idle connection if one is available, or
164      * we make a new connection, if we have not yet reached maxConnections. If we
165      * have reached maxConnections, we wait until the number reduces.
166      *
167      * @param timeout max time prepared to block waiting to be able to get a connection
168      * @return a HttpConnection for this destination
169      * @throws IOException if an I/O error occurs
170      */
171     private AbstractHttpConnection getConnection(long timeout) throws IOException
172     {
173         AbstractHttpConnection connection = null;
174 
175         while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
176         {
177             boolean startConnection = false;
178             synchronized (this)
179             {
180                 int totalConnections = _connections.size() + _pendingConnections;
181                 if (totalConnections < _maxConnections)
182                 {
183                     _newConnection++;
184                     startConnection = true;
185                 }
186             }
187 
188             if (startConnection)
189             {
190                 startNewConnection();
191                 try
192                 {
193                     Object o = _newQueue.take();
194                     if (o instanceof AbstractHttpConnection)
195                     {
196                         connection = (AbstractHttpConnection)o;
197                     }
198                     else
199                         throw (IOException)o;
200                 }
201                 catch (InterruptedException e)
202                 {
203                     LOG.ignore(e);
204                 }
205             }
206             else
207             {
208                 try
209                 {
210                     Thread.currentThread();
211                     Thread.sleep(200);
212                     timeout -= 200;
213                 }
214                 catch (InterruptedException e)
215                 {
216                     LOG.ignore(e);
217                 }
218             }
219         }
220         return connection;
221     }
222 
223     public AbstractHttpConnection reserveConnection(long timeout) throws IOException
224     {
225         AbstractHttpConnection connection = getConnection(timeout);
226         if (connection != null)
227             connection.setReserved(true);
228         return connection;
229     }
230 
231     public AbstractHttpConnection getIdleConnection() throws IOException
232     {
233         AbstractHttpConnection connection = null;
234         while (true)
235         {
236             synchronized (this)
237             {
238                 if (connection != null)
239                 {
240                     _connections.remove(connection);
241                     connection.close();
242                     connection = null;
243                 }
244                 if (_idle.size() > 0)
245                     connection = _idle.remove(_idle.size() - 1);
246             }
247 
248             if (connection == null)
249             {
250                 return null;
251             }
252             
253             // Check if the connection was idle,
254             // but it expired just a moment ago
255             if (connection.cancelIdleTimeout())
256             {
257                 return connection;
258             }
259         }
260     }
261 
262     protected void startNewConnection()
263     {
264         try
265         {
266             synchronized (this)
267             {
268                 _pendingConnections++;
269             }
270             final Connector connector = _client._connector;
271             if (connector != null)
272                 connector.startConnection(this);
273         }
274         catch (Exception e)
275         {
276             LOG.debug(e);
277             onConnectionFailed(e);
278         }
279     }
280 
281     public void onConnectionFailed(Throwable throwable)
282     {
283         Throwable connect_failure = null;
284 
285         boolean startConnection = false;
286         synchronized (this)
287         {
288             _pendingConnections--;
289             if (_newConnection > 0)
290             {
291                 connect_failure = throwable;
292                 _newConnection--;
293             }
294             else if (_queue.size() > 0)
295             {
296                 HttpExchange ex = _queue.remove(0);
297                 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
298                     ex.getEventListener().onConnectionFailed(throwable);
299 
300                 // Since an existing connection had failed, we need to create a
301                 // connection if the  queue is not empty and client is running.
302                 if (!_queue.isEmpty() && _client.isStarted())
303                     startConnection = true;
304             }
305         }
306 
307         if (startConnection)
308             startNewConnection();
309 
310         if (connect_failure != null)
311         {
312             try
313             {
314                 _newQueue.put(connect_failure);
315             }
316             catch (InterruptedException e)
317             {
318                 LOG.ignore(e);
319             }
320         }
321     }
322 
323     public void onException(Throwable throwable)
324     {
325         synchronized (this)
326         {
327             _pendingConnections--;
328             if (_queue.size() > 0)
329             {
330                 HttpExchange ex = _queue.remove(0);
331                 if(ex.setStatus(HttpExchange.STATUS_EXCEPTED))
332                     ex.getEventListener().onException(throwable);
333             }
334         }
335     }
336 
337     public void onNewConnection(final AbstractHttpConnection connection) throws IOException
338     {
339         Connection q_connection = null;
340 
341         synchronized (this)
342         {
343             _pendingConnections--;
344             _connections.add(connection);
345 
346             if (_newConnection > 0)
347             {
348                 q_connection = connection;
349                 _newConnection--;
350             }
351             else if (_queue.size() == 0)
352             {
353                 connection.setIdleTimeout();
354                 _idle.add(connection);
355             }
356             else
357             {
358                 EndPoint endPoint = connection.getEndPoint();
359                 if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint)
360                 {
361                     SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint;
362                     HttpExchange exchange = _queue.get(0);
363                     ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange);
364                     connect.setAddress(getProxy());
365                     send(connection, connect);
366                 }
367                 else
368                 {
369                     HttpExchange exchange = _queue.remove(0);
370                     send(connection, exchange);
371                 }
372             }
373         }
374 
375         if (q_connection != null)
376         {
377             try
378             {
379                 _newQueue.put(q_connection);
380             }
381             catch (InterruptedException e)
382             {
383                 LOG.ignore(e);
384             }
385         }
386     }
387 
388     public void returnConnection(AbstractHttpConnection connection, boolean close) throws IOException
389     {
390         if (connection.isReserved())
391             connection.setReserved(false);
392 
393         if (close)
394         {
395             try
396             {
397                 connection.close();
398             }
399             catch (IOException e)
400             {
401                 LOG.ignore(e);
402             }
403         }
404 
405         if (!_client.isStarted())
406             return;
407 
408         if (!close && connection.getEndPoint().isOpen())
409         {
410             synchronized (this)
411             {
412                 if (_queue.size() == 0)
413                 {
414                     connection.setIdleTimeout();
415                     _idle.add(connection);
416                 }
417                 else
418                 {
419                     HttpExchange ex = _queue.remove(0);
420                     send(connection, ex);
421                 }
422                 this.notifyAll();
423             }
424         }
425         else
426         {
427             boolean startConnection = false;
428             synchronized (this)
429             {
430                 _connections.remove(connection);
431                 if (!_queue.isEmpty())
432                     startConnection = true;
433             }
434 
435             if (startConnection)
436                 startNewConnection();
437         }
438     }
439 
440     public void returnIdleConnection(AbstractHttpConnection connection)
441     {
442         connection.onIdleExpired();
443 
444         boolean startConnection = false;
445         synchronized (this)
446         {
447             _idle.remove(connection);
448             _connections.remove(connection);
449 
450             if (!_queue.isEmpty() && _client.isStarted())
451                 startConnection = true;
452         }
453 
454         if (startConnection)
455             startNewConnection();
456     }
457 
458     public void send(HttpExchange ex) throws IOException
459     {
460         LinkedList<String> listeners = _client.getRegisteredListeners();
461 
462         if (listeners != null)
463         {
464             // Add registered listeners, fail if we can't load them
465             for (int i = listeners.size(); i > 0; --i)
466             {
467                 String listenerClass = listeners.get(i - 1);
468 
469                 try
470                 {
471                     Class listener = Class.forName(listenerClass);
472                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
473                     HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
474                     ex.setEventListener(elistener);
475                 }
476                 catch (Exception e)
477                 {
478                     e.printStackTrace();
479                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
480                 }
481             }
482         }
483 
484         // Security is supported by default and should be the first consulted
485         if (_client.hasRealms())
486         {
487             ex.setEventListener(new SecurityListener(this, ex));
488         }
489 
490         doSend(ex);
491     }
492 
493     public void resend(HttpExchange ex) throws IOException
494     {
495         ex.getEventListener().onRetry();
496         ex.reset();
497         doSend(ex);
498     }
499 
500     protected void doSend(HttpExchange ex) throws IOException
501     {
502         // add cookies
503         // TODO handle max-age etc.
504         if (_cookies != null)
505         {
506             StringBuilder buf = null;
507             for (HttpCookie cookie : _cookies)
508             {
509                 if (buf == null)
510                     buf = new StringBuilder();
511                 else
512                     buf.append("; ");
513                 buf.append(cookie.getName()); // TODO quotes
514                 buf.append("=");
515                 buf.append(cookie.getValue()); // TODO quotes
516             }
517             if (buf != null)
518                 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
519         }
520 
521         // Add any known authorizations
522         if (_authorizations != null)
523         {
524             Authentication auth = (Authentication)_authorizations.match(ex.getRequestURI());
525             if (auth != null)
526                 (auth).setCredentials(ex);
527         }
528 
529         // Schedule the timeout here, before we queue the exchange
530         // so that we count also the queue time in the timeout
531         ex.scheduleTimeout(this);
532 
533         AbstractHttpConnection connection = getIdleConnection();
534         if (connection != null)
535         {
536             send(connection, ex);
537         }
538         else
539         {
540             boolean startConnection = false;
541             synchronized (this)
542             {
543                 if (_queue.size() == _maxQueueSize)
544                     throw new RejectedExecutionException("Queue full for address " + _address);
545 
546                 _queue.add(ex);
547                 if (_connections.size() + _pendingConnections < _maxConnections)
548                     startConnection = true;
549             }
550 
551             if (startConnection)
552                 startNewConnection();
553         }
554     }
555 
556     protected void exchangeExpired(HttpExchange exchange)
557     {
558         // The exchange may expire while waiting in the
559         // destination queue, make sure it is removed
560         synchronized (this)
561         {
562             _queue.remove(exchange);
563         }
564     }
565 
566     protected void send(AbstractHttpConnection connection, HttpExchange exchange) throws IOException
567     {
568         synchronized (this)
569         {
570             // If server closes the connection, put the exchange back
571             // to the exchange queue and recycle the connection
572             if (!connection.send(exchange))
573             {
574                 if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
575                     _queue.add(0, exchange);
576                 returnIdleConnection(connection);
577             }
578         }
579     }
580 
581     @Override
582     public synchronized String toString()
583     {
584         return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n",hashCode(),_address.getHost(),_address.getPort(),_connections.size(),_maxConnections,_idle.size(),_queue.size(),_maxQueueSize);
585     }
586 
587     public synchronized String toDetailString()
588     {
589         StringBuilder b = new StringBuilder();
590         b.append(toString());
591         b.append('\n');
592         synchronized (this)
593         {
594             for (AbstractHttpConnection connection : _connections)
595             {
596                 b.append(connection.toDetailString());
597                 if (_idle.contains(connection))
598                     b.append(" IDLE");
599                 b.append('\n');
600             }
601         }
602         b.append("--");
603         b.append('\n');
604 
605         return b.toString();
606     }
607 
608     public void setProxy(Address proxy)
609     {
610         _proxy = proxy;
611     }
612 
613     public Address getProxy()
614     {
615         return _proxy;
616     }
617 
618     public Authentication getProxyAuthentication()
619     {
620         return _proxyAuthentication;
621     }
622 
623     public void setProxyAuthentication(Authentication authentication)
624     {
625         _proxyAuthentication = authentication;
626     }
627 
628     public boolean isProxied()
629     {
630         return _proxy != null;
631     }
632 
633     public void close() throws IOException
634     {
635         synchronized (this)
636         {
637             for (AbstractHttpConnection connection : _connections)
638             {
639                 connection.close();
640             }
641         }
642     }
643 
644     /* ------------------------------------------------------------ */
645     /**
646      * @see org.eclipse.jetty.util.component.Dumpable#dump()
647      */
648     public String dump()
649     {
650         return AggregateLifeCycle.dump(this);
651     }
652 
653     /* ------------------------------------------------------------ */
654     /**
655      * @see org.eclipse.jetty.util.component.Dumpable#dump(java.lang.Appendable, java.lang.String)
656      */
657     public void dump(Appendable out, String indent) throws IOException
658     {
659         synchronized (this)
660         {
661             out.append(String.valueOf(this)+"idle="+_idle.size()+" pending="+_pendingConnections).append("\n");
662             AggregateLifeCycle.dump(out,indent,_connections);
663         }
664     }
665 
666     private class ConnectExchange extends ContentExchange
667     {
668         private final SelectConnector.UpgradableEndPoint proxyEndPoint;
669         private final HttpExchange exchange;
670 
671         public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint, HttpExchange exchange)
672         {
673             this.proxyEndPoint = proxyEndPoint;
674             this.exchange = exchange;
675             setMethod(HttpMethods.CONNECT);
676             setVersion(exchange.getVersion());
677             String serverHostAndPort = serverAddress.toString();
678             setRequestURI(serverHostAndPort);
679             addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
680             addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
681             addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
682         }
683 
684         @Override
685         protected void onResponseComplete() throws IOException
686         {
687             int responseStatus = getResponseStatus();
688             if (responseStatus == HttpStatus.OK_200)
689             {
690                 proxyEndPoint.upgrade();
691             }
692             else if(responseStatus == HttpStatus.GATEWAY_TIMEOUT_504)
693             {
694                 onExpire();
695             }
696             else
697             {
698                 onException(new ProtocolException("Proxy: " + proxyEndPoint.getRemoteAddr() +":" + proxyEndPoint.getRemotePort() + " didn't return http return code 200, but " + responseStatus + " while trying to request: " + exchange.getAddress().toString()));
699             }
700         }
701 
702         @Override
703         protected void onConnectionFailed(Throwable x)
704         {
705             HttpDestination.this.onConnectionFailed(x);
706         }
707 
708         @Override
709         protected void onException(Throwable x)
710         {
711             _queue.remove(exchange);
712             if (exchange.setStatus(STATUS_EXCEPTED))
713                 exchange.getEventListener().onException(x);
714         }
715 
716         @Override
717         protected void onExpire()
718         {
719             _queue.remove(exchange);
720             if (exchange.setStatus(STATUS_EXPIRED))
721                 exchange.getEventListener().onExpire();
722         }
723 
724     }
725 }