View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.lang.reflect.Constructor;
18  import java.net.ProtocolException;
19  import java.util.ArrayList;
20  import java.util.LinkedList;
21  import java.util.List;
22  import java.util.concurrent.ArrayBlockingQueue;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.RejectedExecutionException;
25  
26  import org.eclipse.jetty.client.HttpClient.Connector;
27  import org.eclipse.jetty.client.security.Authentication;
28  import org.eclipse.jetty.client.security.SecurityListener;
29  import org.eclipse.jetty.http.HttpCookie;
30  import org.eclipse.jetty.http.HttpHeaders;
31  import org.eclipse.jetty.http.HttpMethods;
32  import org.eclipse.jetty.http.HttpStatus;
33  import org.eclipse.jetty.http.PathMap;
34  import org.eclipse.jetty.io.Buffer;
35  import org.eclipse.jetty.io.ByteArrayBuffer;
36  import org.eclipse.jetty.io.Connection;
37  import org.eclipse.jetty.io.EndPoint;
38  import org.eclipse.jetty.util.component.AggregateLifeCycle;
39  import org.eclipse.jetty.util.component.Dumpable;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  
43  /**
44   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
45   */
46  public class HttpDestination implements Dumpable
47  {
48      private static final Logger LOG = Log.getLogger(HttpDestination.class);
49  
50      private final List<HttpExchange> _queue = new LinkedList<HttpExchange>();
51      private final List<AbstractHttpConnection> _connections = new LinkedList<AbstractHttpConnection>();
52      private final BlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
53      private final List<AbstractHttpConnection> _idle = new ArrayList<AbstractHttpConnection>();
54      private final HttpClient _client;
55      private final Address _address;
56      private final boolean _ssl;
57      private final ByteArrayBuffer _hostHeader;
58      private volatile int _maxConnections;
59      private volatile int _maxQueueSize;
60      private int _pendingConnections = 0;
61      private int _newConnection = 0;
62      private volatile Address _proxy;
63      private Authentication _proxyAuthentication;
64      private PathMap _authorizations;
65      private List<HttpCookie> _cookies;
66  
67  
68  
69      HttpDestination(HttpClient client, Address address, boolean ssl)
70      {
71          _client = client;
72          _address = address;
73          _ssl = ssl;
74          _maxConnections = _client.getMaxConnectionsPerAddress();
75          _maxQueueSize = _client.getMaxQueueSizePerAddress();
76          String addressString = address.getHost();
77          if (address.getPort() != (_ssl ? 443 : 80))
78              addressString += ":" + address.getPort();
79          _hostHeader = new ByteArrayBuffer(addressString);
80      }
81  
82      public HttpClient getHttpClient()
83      {
84          return _client;
85      }
86  
87      public Address getAddress()
88      {
89          return _address;
90      }
91  
92      public boolean isSecure()
93      {
94          return _ssl;
95      }
96  
97      public Buffer getHostHeader()
98      {
99          return _hostHeader;
100     }
101 
102     public int getMaxConnections()
103     {
104         return _maxConnections;
105     }
106 
107     public void setMaxConnections(int maxConnections)
108     {
109         this._maxConnections = maxConnections;
110     }
111 
112     public int getMaxQueueSize()
113     {
114         return _maxQueueSize;
115     }
116 
117     public void setMaxQueueSize(int maxQueueSize)
118     {
119         this._maxQueueSize = maxQueueSize;
120     }
121 
122     public int getConnections()
123     {
124         synchronized (this)
125         {
126             return _connections.size();
127         }
128     }
129 
130     public int getIdleConnections()
131     {
132         synchronized (this)
133         {
134             return _idle.size();
135         }
136     }
137 
138     public void addAuthorization(String pathSpec, Authentication authorization)
139     {
140         synchronized (this)
141         {
142             if (_authorizations == null)
143                 _authorizations = new PathMap();
144             _authorizations.put(pathSpec, authorization);
145         }
146 
147         // TODO query and remove methods
148     }
149 
150     public void addCookie(HttpCookie cookie)
151     {
152         synchronized (this)
153         {
154             if (_cookies == null)
155                 _cookies = new ArrayList<HttpCookie>();
156             _cookies.add(cookie);
157         }
158 
159         // TODO query, remove and age methods
160     }
161 
162     /**
163      * Get a connection. We either get an idle connection if one is available, or
164      * we make a new connection, if we have not yet reached maxConnections. If we
165      * have reached maxConnections, we wait until the number reduces.
166      *
167      * @param timeout max time prepared to block waiting to be able to get a connection
168      * @return a HttpConnection for this destination
169      * @throws IOException if an I/O error occurs
170      */
171     private AbstractHttpConnection getConnection(long timeout) throws IOException
172     {
173         AbstractHttpConnection connection = null;
174 
175         while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
176         {
177             boolean startConnection = false;
178             synchronized (this)
179             {
180                 int totalConnections = _connections.size() + _pendingConnections;
181                 if (totalConnections < _maxConnections)
182                 {
183                     _newConnection++;
184                     startConnection = true;
185                 }
186             }
187 
188             if (startConnection)
189             {
190                 startNewConnection();
191                 try
192                 {
193                     Object o = _newQueue.take();
194                     if (o instanceof AbstractHttpConnection)
195                     {
196                         connection = (AbstractHttpConnection)o;
197                     }
198                     else
199                         throw (IOException)o;
200                 }
201                 catch (InterruptedException e)
202                 {
203                     LOG.ignore(e);
204                 }
205             }
206             else
207             {
208                 try
209                 {
210                     Thread.currentThread();
211                     Thread.sleep(200);
212                     timeout -= 200;
213                 }
214                 catch (InterruptedException e)
215                 {
216                     LOG.ignore(e);
217                 }
218             }
219         }
220         return connection;
221     }
222 
223     public AbstractHttpConnection reserveConnection(long timeout) throws IOException
224     {
225         AbstractHttpConnection connection = getConnection(timeout);
226         if (connection != null)
227             connection.setReserved(true);
228         return connection;
229     }
230 
231     public AbstractHttpConnection getIdleConnection() throws IOException
232     {
233         AbstractHttpConnection connection = null;
234         while (true)
235         {
236             synchronized (this)
237             {
238                 if (connection != null)
239                 {
240                     _connections.remove(connection);
241                     connection.close();
242                     connection = null;
243                 }
244                 if (_idle.size() > 0)
245                     connection = _idle.remove(_idle.size() - 1);
246             }
247 
248             if (connection == null)
249             {
250                 return null;
251             }
252             
253             // Check if the connection was idle,
254             // but it expired just a moment ago
255             if (connection.cancelIdleTimeout())
256             {
257                 return connection;
258             }
259         }
260     }
261 
262     protected void startNewConnection()
263     {
264         try
265         {
266             synchronized (this)
267             {
268                 _pendingConnections++;
269             }
270             final Connector connector = _client._connector;
271             if (connector != null)
272                 connector.startConnection(this);
273         }
274         catch (Exception e)
275         {
276             LOG.debug(e);
277             onConnectionFailed(e);
278         }
279     }
280 
281     public void onConnectionFailed(Throwable throwable)
282     {
283         Throwable connect_failure = null;
284 
285         boolean startConnection = false;
286         synchronized (this)
287         {
288             _pendingConnections--;
289             if (_newConnection > 0)
290             {
291                 connect_failure = throwable;
292                 _newConnection--;
293             }
294             else if (_queue.size() > 0)
295             {
296                 HttpExchange ex = _queue.remove(0);
297                 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
298                     ex.getEventListener().onConnectionFailed(throwable);
299 
300                 // Since an existing connection had failed, we need to create a
301                 // connection if the  queue is not empty and client is running.
302                 if (!_queue.isEmpty() && _client.isStarted())
303                     startConnection = true;
304             }
305         }
306 
307         if (startConnection)
308             startNewConnection();
309 
310         if (connect_failure != null)
311         {
312             try
313             {
314                 _newQueue.put(connect_failure);
315             }
316             catch (InterruptedException e)
317             {
318                 LOG.ignore(e);
319             }
320         }
321     }
322 
323     public void onException(Throwable throwable)
324     {
325         synchronized (this)
326         {
327             _pendingConnections--;
328             if (_queue.size() > 0)
329             {
330                 HttpExchange ex = _queue.remove(0);
331                 if(ex.setStatus(HttpExchange.STATUS_EXCEPTED))
332                     ex.getEventListener().onException(throwable);
333             }
334         }
335     }
336 
337     public void onNewConnection(final AbstractHttpConnection connection) throws IOException
338     {
339         Connection q_connection = null;
340 
341         synchronized (this)
342         {
343             _pendingConnections--;
344             _connections.add(connection);
345 
346             if (_newConnection > 0)
347             {
348                 q_connection = connection;
349                 _newConnection--;
350             }
351             else if (_queue.size() == 0)
352             {
353                 connection.setIdleTimeout();
354                 _idle.add(connection);
355             }
356             else
357             {
358                 EndPoint endPoint = connection.getEndPoint();
359                 if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint)
360                 {
361                     SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint;
362                     HttpExchange exchange = _queue.get(0);
363                     ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange);
364                     connect.setAddress(getProxy());
365                     send(connection, connect);
366                 }
367                 else
368                 {
369                     HttpExchange exchange = _queue.remove(0);
370                     send(connection, exchange);
371                 }
372             }
373         }
374 
375         if (q_connection != null)
376         {
377             try
378             {
379                 _newQueue.put(q_connection);
380             }
381             catch (InterruptedException e)
382             {
383                 LOG.ignore(e);
384             }
385         }
386     }
387 
388     public void returnConnection(AbstractHttpConnection connection, boolean close) throws IOException
389     {
390         if (connection.isReserved())
391             connection.setReserved(false);
392 
393         if (close)
394         {
395             try
396             {
397                 connection.close();
398             }
399             catch (IOException e)
400             {
401                 LOG.ignore(e);
402             }
403         }
404 
405         if (!_client.isStarted())
406             return;
407 
408         if (!close && connection.getEndPoint().isOpen())
409         {
410             synchronized (this)
411             {
412                 if (_queue.size() == 0)
413                 {
414                     connection.setIdleTimeout();
415                     _idle.add(connection);
416                 }
417                 else
418                 {
419                     HttpExchange ex = _queue.remove(0);
420                     send(connection, ex);
421                 }
422                 this.notifyAll();
423             }
424         }
425         else
426         {
427             boolean startConnection = false;
428             synchronized (this)
429             {
430                 _connections.remove(connection);
431                 if (!_queue.isEmpty())
432                     startConnection = true;
433             }
434 
435             if (startConnection)
436                 startNewConnection();
437         }
438     }
439 
440     public void returnIdleConnection(AbstractHttpConnection connection)
441     {
442         // TODO work out the real idle time;
443         long idleForMs=connection!=null&&connection.getEndPoint()!=null?connection.getEndPoint().getMaxIdleTime():-1;
444         connection.onIdleExpired(idleForMs);
445 
446         boolean startConnection = false;
447         synchronized (this)
448         {
449             _idle.remove(connection);
450             _connections.remove(connection);
451 
452             if (!_queue.isEmpty() && _client.isStarted())
453                 startConnection = true;
454         }
455 
456         if (startConnection)
457             startNewConnection();
458     }
459 
460     public void send(HttpExchange ex) throws IOException
461     {
462         LinkedList<String> listeners = _client.getRegisteredListeners();
463 
464         if (listeners != null)
465         {
466             // Add registered listeners, fail if we can't load them
467             for (int i = listeners.size(); i > 0; --i)
468             {
469                 String listenerClass = listeners.get(i - 1);
470 
471                 try
472                 {
473                     Class listener = Class.forName(listenerClass);
474                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
475                     HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
476                     ex.setEventListener(elistener);
477                 }
478                 catch (Exception e)
479                 {
480                     e.printStackTrace();
481                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
482                 }
483             }
484         }
485 
486         // Security is supported by default and should be the first consulted
487         if (_client.hasRealms())
488         {
489             ex.setEventListener(new SecurityListener(this, ex));
490         }
491 
492         doSend(ex);
493     }
494 
495     public void resend(HttpExchange ex) throws IOException
496     {
497         ex.getEventListener().onRetry();
498         ex.reset();
499         doSend(ex);
500     }
501 
502     protected void doSend(HttpExchange ex) throws IOException
503     {
504         // add cookies
505         // TODO handle max-age etc.
506         if (_cookies != null)
507         {
508             StringBuilder buf = null;
509             for (HttpCookie cookie : _cookies)
510             {
511                 if (buf == null)
512                     buf = new StringBuilder();
513                 else
514                     buf.append("; ");
515                 buf.append(cookie.getName()); // TODO quotes
516                 buf.append("=");
517                 buf.append(cookie.getValue()); // TODO quotes
518             }
519             if (buf != null)
520                 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
521         }
522 
523         // Add any known authorizations
524         if (_authorizations != null)
525         {
526             Authentication auth = (Authentication)_authorizations.match(ex.getRequestURI());
527             if (auth != null)
528                 (auth).setCredentials(ex);
529         }
530 
531         // Schedule the timeout here, before we queue the exchange
532         // so that we count also the queue time in the timeout
533         ex.scheduleTimeout(this);
534 
535         AbstractHttpConnection connection = getIdleConnection();
536         if (connection != null)
537         {
538             send(connection, ex);
539         }
540         else
541         {
542             boolean startConnection = false;
543             synchronized (this)
544             {
545                 if (_queue.size() == _maxQueueSize)
546                     throw new RejectedExecutionException("Queue full for address " + _address);
547 
548                 _queue.add(ex);
549                 if (_connections.size() + _pendingConnections < _maxConnections)
550                     startConnection = true;
551             }
552 
553             if (startConnection)
554                 startNewConnection();
555         }
556     }
557 
558     protected void exchangeExpired(HttpExchange exchange)
559     {
560         // The exchange may expire while waiting in the
561         // destination queue, make sure it is removed
562         synchronized (this)
563         {
564             _queue.remove(exchange);
565         }
566     }
567 
568     protected void send(AbstractHttpConnection connection, HttpExchange exchange) throws IOException
569     {
570         synchronized (this)
571         {
572             // If server closes the connection, put the exchange back
573             // to the exchange queue and recycle the connection
574             if (!connection.send(exchange))
575             {
576                 if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
577                     _queue.add(0, exchange);
578                 returnIdleConnection(connection);
579             }
580         }
581     }
582 
583     @Override
584     public synchronized String toString()
585     {
586         return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n",hashCode(),_address.getHost(),_address.getPort(),_connections.size(),_maxConnections,_idle.size(),_queue.size(),_maxQueueSize);
587     }
588 
589     public synchronized String toDetailString()
590     {
591         StringBuilder b = new StringBuilder();
592         b.append(toString());
593         b.append('\n');
594         synchronized (this)
595         {
596             for (AbstractHttpConnection connection : _connections)
597             {
598                 b.append(connection.toDetailString());
599                 if (_idle.contains(connection))
600                     b.append(" IDLE");
601                 b.append('\n');
602             }
603         }
604         b.append("--");
605         b.append('\n');
606 
607         return b.toString();
608     }
609 
610     public void setProxy(Address proxy)
611     {
612         _proxy = proxy;
613     }
614 
615     public Address getProxy()
616     {
617         return _proxy;
618     }
619 
620     public Authentication getProxyAuthentication()
621     {
622         return _proxyAuthentication;
623     }
624 
625     public void setProxyAuthentication(Authentication authentication)
626     {
627         _proxyAuthentication = authentication;
628     }
629 
630     public boolean isProxied()
631     {
632         return _proxy != null;
633     }
634 
635     public void close() throws IOException
636     {
637         synchronized (this)
638         {
639             for (AbstractHttpConnection connection : _connections)
640             {
641                 connection.close();
642             }
643         }
644     }
645 
646     /* ------------------------------------------------------------ */
647     /**
648      * @see org.eclipse.jetty.util.component.Dumpable#dump()
649      */
650     public String dump()
651     {
652         return AggregateLifeCycle.dump(this);
653     }
654 
655     /* ------------------------------------------------------------ */
656     /**
657      * @see org.eclipse.jetty.util.component.Dumpable#dump(java.lang.Appendable, java.lang.String)
658      */
659     public void dump(Appendable out, String indent) throws IOException
660     {
661         synchronized (this)
662         {
663             out.append(String.valueOf(this)+"idle="+_idle.size()+" pending="+_pendingConnections).append("\n");
664             AggregateLifeCycle.dump(out,indent,_connections);
665         }
666     }
667 
668     private class ConnectExchange extends ContentExchange
669     {
670         private final SelectConnector.UpgradableEndPoint proxyEndPoint;
671         private final HttpExchange exchange;
672 
673         public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint, HttpExchange exchange)
674         {
675             this.proxyEndPoint = proxyEndPoint;
676             this.exchange = exchange;
677             setMethod(HttpMethods.CONNECT);
678             setVersion(exchange.getVersion());
679             String serverHostAndPort = serverAddress.toString();
680             setRequestURI(serverHostAndPort);
681             addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
682             addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
683             addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
684         }
685 
686         @Override
687         protected void onResponseComplete() throws IOException
688         {
689             int responseStatus = getResponseStatus();
690             if (responseStatus == HttpStatus.OK_200)
691             {
692                 proxyEndPoint.upgrade();
693             }
694             else if(responseStatus == HttpStatus.GATEWAY_TIMEOUT_504)
695             {
696                 onExpire();
697             }
698             else
699             {
700                 onException(new ProtocolException("Proxy: " + proxyEndPoint.getRemoteAddr() +":" + proxyEndPoint.getRemotePort() + " didn't return http return code 200, but " + responseStatus + " while trying to request: " + exchange.getAddress().toString()));
701             }
702         }
703 
704         @Override
705         protected void onConnectionFailed(Throwable x)
706         {
707             HttpDestination.this.onConnectionFailed(x);
708         }
709 
710         @Override
711         protected void onException(Throwable x)
712         {
713             _queue.remove(exchange);
714             if (exchange.setStatus(STATUS_EXCEPTED))
715                 exchange.getEventListener().onException(x);
716         }
717 
718         @Override
719         protected void onExpire()
720         {
721             _queue.remove(exchange);
722             if (exchange.setStatus(STATUS_EXPIRED))
723                 exchange.getEventListener().onExpire();
724         }
725 
726     }
727 }