View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.lang.reflect.Constructor;
18  import java.net.ConnectException;
19  import java.util.ArrayList;
20  import java.util.LinkedList;
21  import java.util.List;
22  import java.util.concurrent.ArrayBlockingQueue;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.RejectedExecutionException;
25  
26  import org.eclipse.jetty.client.HttpClient.Connector;
27  import org.eclipse.jetty.client.security.Authentication;
28  import org.eclipse.jetty.client.security.SecurityListener;
29  import org.eclipse.jetty.http.HttpCookie;
30  import org.eclipse.jetty.http.HttpHeaders;
31  import org.eclipse.jetty.http.HttpMethods;
32  import org.eclipse.jetty.http.HttpStatus;
33  import org.eclipse.jetty.http.PathMap;
34  import org.eclipse.jetty.io.Buffer;
35  import org.eclipse.jetty.io.ByteArrayBuffer;
36  import org.eclipse.jetty.io.Connection;
37  import org.eclipse.jetty.io.EndPoint;
38  import org.eclipse.jetty.util.component.AggregateLifeCycle;
39  import org.eclipse.jetty.util.component.Dumpable;
40  import org.eclipse.jetty.util.log.Log;
41  
42  /**
43   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
44   */
45  public class HttpDestination implements Dumpable
46  {
47      private final List<HttpExchange> _queue = new LinkedList<HttpExchange>();
48      private final List<HttpConnection> _connections = new LinkedList<HttpConnection>();
49      private final BlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
50      private final List<HttpConnection> _idle = new ArrayList<HttpConnection>();
51      private final HttpClient _client;
52      private final Address _address;
53      private final boolean _ssl;
54      private final ByteArrayBuffer _hostHeader;
55      private volatile int _maxConnections;
56      private volatile int _maxQueueSize;
57      private int _pendingConnections = 0;
58      private int _newConnection = 0;
59      private volatile Address _proxy;
60      private Authentication _proxyAuthentication;
61      private PathMap _authorizations;
62      private List<HttpCookie> _cookies;
63  
64  
65      
66      HttpDestination(HttpClient client, Address address, boolean ssl)
67      {
68          _client = client;
69          _address = address;
70          _ssl = ssl;
71          _maxConnections = _client.getMaxConnectionsPerAddress();
72          _maxQueueSize = _client.getMaxQueueSizePerAddress();
73          String addressString = address.getHost();
74          if (address.getPort() != (_ssl ? 443 : 80))
75              addressString += ":" + address.getPort();
76          _hostHeader = new ByteArrayBuffer(addressString);
77      }
78  
79      public HttpClient getHttpClient()
80      {
81          return _client;
82      }
83  
84      public Address getAddress()
85      {
86          return _address;
87      }
88  
89      public boolean isSecure()
90      {
91          return _ssl;
92      }
93  
94      public Buffer getHostHeader()
95      {
96          return _hostHeader;
97      }
98  
99      public int getMaxConnections()
100     {
101         return _maxConnections;
102     }
103 
104     public void setMaxConnections(int maxConnections)
105     {
106         this._maxConnections = maxConnections;
107     }
108 
109     public int getMaxQueueSize()
110     {
111         return _maxQueueSize;
112     }
113 
114     public void setMaxQueueSize(int maxQueueSize)
115     {
116         this._maxQueueSize = maxQueueSize;
117     }
118 
119     public int getConnections()
120     {
121         synchronized (this)
122         {
123             return _connections.size();
124         }
125     }
126 
127     public int getIdleConnections()
128     {
129         synchronized (this)
130         {
131             return _idle.size();
132         }
133     }
134 
135     public void addAuthorization(String pathSpec, Authentication authorization)
136     {
137         synchronized (this)
138         {
139             if (_authorizations == null)
140                 _authorizations = new PathMap();
141             _authorizations.put(pathSpec, authorization);
142         }
143 
144         // TODO query and remove methods
145     }
146 
147     public void addCookie(HttpCookie cookie)
148     {
149         synchronized (this)
150         {
151             if (_cookies == null)
152                 _cookies = new ArrayList<HttpCookie>();
153             _cookies.add(cookie);
154         }
155 
156         // TODO query, remove and age methods
157     }
158 
159     /**
160      * Get a connection. We either get an idle connection if one is available, or
161      * we make a new connection, if we have not yet reached maxConnections. If we
162      * have reached maxConnections, we wait until the number reduces.
163      *
164      * @param timeout max time prepared to block waiting to be able to get a connection
165      * @return a HttpConnection for this destination
166      * @throws IOException if an I/O error occurs
167      */
168     private HttpConnection getConnection(long timeout) throws IOException
169     {
170         HttpConnection connection = null;
171 
172         while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
173         {
174             boolean startConnection = false;
175             synchronized (this)
176             {
177                 int totalConnections = _connections.size() + _pendingConnections;
178                 if (totalConnections < _maxConnections)
179                 {
180                     _newConnection++;
181                     startConnection = true;
182                 }
183             }
184 
185             if (startConnection)
186             {
187                 startNewConnection();
188                 try
189                 {
190                     Object o = _newQueue.take();
191                     if (o instanceof HttpConnection)
192                     {
193                         connection = (HttpConnection)o;
194                     }
195                     else
196                         throw (IOException)o;
197                 }
198                 catch (InterruptedException e)
199                 {
200                     Log.ignore(e);
201                 }
202             }
203             else
204             {
205                 try
206                 {
207                     Thread.currentThread();
208                     Thread.sleep(200);
209                     timeout -= 200;
210                 }
211                 catch (InterruptedException e)
212                 {
213                     Log.ignore(e);
214                 }
215             }
216         }
217         return connection;
218     }
219 
220     public HttpConnection reserveConnection(long timeout) throws IOException
221     {
222         HttpConnection connection = getConnection(timeout);
223         if (connection != null)
224             connection.setReserved(true);
225         return connection;
226     }
227 
228     public HttpConnection getIdleConnection() throws IOException
229     {
230         HttpConnection connection = null;
231         while (true)
232         {
233             synchronized (this)
234             {
235                 if (connection != null)
236                 {
237                     _connections.remove(connection);
238                     connection.close();
239                     connection = null;
240                 }
241                 if (_idle.size() > 0)
242                     connection = _idle.remove(_idle.size() - 1);
243             }
244 
245             if (connection == null)
246                 return null;
247 
248             // Check if the connection was idle,
249             // but it expired just a moment ago
250             if (connection.cancelIdleTimeout())
251                 return connection;
252         }
253     }
254 
255     protected void startNewConnection()
256     {
257         try
258         {
259             synchronized (this)
260             {
261                 _pendingConnections++;
262             }
263             final Connector connector = _client._connector;
264             if (connector != null)
265                 connector.startConnection(this);
266         }
267         catch (Exception e)
268         {
269             Log.debug(e);
270             onConnectionFailed(e);
271         }
272     }
273 
274     public void onConnectionFailed(Throwable throwable)
275     {
276         Throwable connect_failure = null;
277 
278         boolean startConnection = false;
279         synchronized (this)
280         {
281             _pendingConnections--;
282             if (_newConnection > 0)
283             {
284                 connect_failure = throwable;
285                 _newConnection--;
286             }
287             else if (_queue.size() > 0)
288             {
289                 HttpExchange ex = _queue.remove(0);
290                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
291                 ex.getEventListener().onConnectionFailed(throwable);
292 
293                 // Since an existing connection had failed, we need to create a
294                 // connection if the  queue is not empty and client is running.
295                 if (!_queue.isEmpty() && _client.isStarted())
296                     startConnection = true;
297             }
298         }
299 
300         if (startConnection)
301             startNewConnection();
302 
303         if (connect_failure != null)
304         {
305             try
306             {
307                 _newQueue.put(connect_failure);
308             }
309             catch (InterruptedException e)
310             {
311                 Log.ignore(e);
312             }
313         }
314     }
315 
316     public void onException(Throwable throwable)
317     {
318         synchronized (this)
319         {
320             _pendingConnections--;
321             if (_queue.size() > 0)
322             {
323                 HttpExchange ex = _queue.remove(0);
324                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
325                 ex.getEventListener().onException(throwable);
326             }
327         }
328     }
329 
330     public void onNewConnection(final HttpConnection connection) throws IOException
331     {
332         Connection q_connection = null;
333 
334         synchronized (this)
335         {
336             _pendingConnections--;
337             _connections.add(connection);
338 
339             if (_newConnection > 0)
340             {
341                 q_connection = connection;
342                 _newConnection--;
343             }
344             else if (_queue.size() == 0)
345             {
346                 connection.setIdleTimeout();
347                 _idle.add(connection);
348             }
349             else
350             {
351                 EndPoint endPoint = connection.getEndPoint();
352                 if (isProxied() && endPoint instanceof SelectConnector.ProxySelectChannelEndPoint)
353                 {
354                     SelectConnector.ProxySelectChannelEndPoint proxyEndPoint = (SelectConnector.ProxySelectChannelEndPoint)endPoint;
355                     HttpExchange exchange = _queue.get(0);
356                     ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange);
357                     connect.setAddress(getProxy());
358                     send(connection, connect);
359                 }
360                 else
361                 {
362                     HttpExchange exchange = _queue.remove(0);
363                     send(connection, exchange);
364                 }
365             }
366         }
367 
368         if (q_connection != null)
369         {
370             try
371             {
372                 _newQueue.put(q_connection);
373             }
374             catch (InterruptedException e)
375             {
376                 Log.ignore(e);
377             }
378         }
379     }
380 
381     public void returnConnection(HttpConnection connection, boolean close) throws IOException
382     {
383         if (connection.isReserved())
384             connection.setReserved(false);
385 
386         if (close)
387         {
388             try
389             {
390                 connection.close();
391             }
392             catch (IOException e)
393             {
394                 Log.ignore(e);
395             }
396         }
397 
398         if (!_client.isStarted())
399             return;
400 
401         if (!close && connection.getEndPoint().isOpen())
402         {
403             synchronized (this)
404             {
405                 if (_queue.size() == 0)
406                 {
407                     connection.setIdleTimeout();
408                     _idle.add(connection);
409                 }
410                 else
411                 {
412                     HttpExchange ex = _queue.remove(0);
413                     send(connection, ex);
414                 }
415                 this.notifyAll();
416             }
417         }
418         else
419         {
420             boolean startConnection = false;
421             synchronized (this)
422             {
423                 _connections.remove(connection);
424                 if (!_queue.isEmpty())
425                     startConnection = true;
426             }
427 
428             if (startConnection)
429                 startNewConnection();
430         }
431     }
432 
433     public void returnIdleConnection(HttpConnection connection)
434     {
435         try
436         {
437             connection.close();
438         }
439         catch (IOException e)
440         {
441             Log.ignore(e);
442         }
443 
444         boolean startConnection = false;
445         synchronized (this)
446         {
447             _idle.remove(connection);
448             _connections.remove(connection);
449 
450             if (!_queue.isEmpty() && _client.isStarted())
451                 startConnection = true;
452         }
453 
454         if (startConnection)
455             startNewConnection();
456     }
457 
458     public void send(HttpExchange ex) throws IOException
459     {
460         LinkedList<String> listeners = _client.getRegisteredListeners();
461 
462         if (listeners != null)
463         {
464             // Add registered listeners, fail if we can't load them
465             for (int i = listeners.size(); i > 0; --i)
466             {
467                 String listenerClass = listeners.get(i - 1);
468 
469                 try
470                 {
471                     Class listener = Class.forName(listenerClass);
472                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
473                     HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
474                     ex.setEventListener(elistener);
475                 }
476                 catch (Exception e)
477                 {
478                     e.printStackTrace();
479                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
480                 }
481             }
482         }
483 
484         // Security is supported by default and should be the first consulted
485         if (_client.hasRealms())
486         {
487             ex.setEventListener(new SecurityListener(this, ex));
488         }
489 
490         doSend(ex);
491     }
492 
493     public void resend(HttpExchange ex) throws IOException
494     {
495         ex.getEventListener().onRetry();
496         ex.reset();
497         doSend(ex);
498     }
499 
500     protected void doSend(HttpExchange ex) throws IOException
501     {
502         // add cookies
503         // TODO handle max-age etc.
504         if (_cookies != null)
505         {
506             StringBuilder buf = null;
507             for (HttpCookie cookie : _cookies)
508             {
509                 if (buf == null)
510                     buf = new StringBuilder();
511                 else
512                     buf.append("; ");
513                 buf.append(cookie.getName()); // TODO quotes
514                 buf.append("=");
515                 buf.append(cookie.getValue()); // TODO quotes
516             }
517             if (buf != null)
518                 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
519         }
520 
521         // Add any known authorizations
522         if (_authorizations != null)
523         {
524             Authentication auth = (Authentication)_authorizations.match(ex.getURI());
525             if (auth != null)
526                 (auth).setCredentials(ex);
527         }
528 
529         // Schedule the timeout here, before we queue the exchange
530         // so that we count also the queue time in the timeout
531         ex.scheduleTimeout(this);
532 
533         HttpConnection connection = getIdleConnection();
534         if (connection != null)
535         {
536             send(connection, ex);
537         }
538         else
539         {
540             boolean startConnection = false;
541             synchronized (this)
542             {
543                 if (_queue.size() == _maxQueueSize)
544                     throw new RejectedExecutionException("Queue full for address " + _address);
545 
546                 _queue.add(ex);
547                 if (_connections.size() + _pendingConnections < _maxConnections)
548                     startConnection = true;
549             }
550 
551             if (startConnection)
552                 startNewConnection();
553         }
554     }
555 
556     protected void exchangeExpired(HttpExchange exchange)
557     {
558         // The exchange may expire while waiting in the
559         // destination queue, make sure it is removed
560         synchronized (this)
561         {
562             _queue.remove(exchange);
563         }
564     }
565 
566     protected void send(HttpConnection connection, HttpExchange exchange) throws IOException
567     {
568         synchronized (this)
569         {
570             // If server closes the connection, put the exchange back
571             // to the exchange queue and recycle the connection
572             if (!connection.send(exchange))
573             {
574                 if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
575                     _queue.add(0, exchange);
576                 returnIdleConnection(connection);
577             }
578         }
579     }
580 
581     @Override
582     public synchronized String toString()
583     {
584         return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
585     }
586 
587     public synchronized String toDetailString()
588     {
589         StringBuilder b = new StringBuilder();
590         b.append(toString());
591         b.append('\n');
592         synchronized (this)
593         {
594             for (HttpConnection connection : _connections)
595             {
596                 b.append(connection.toDetailString());
597                 if (_idle.contains(connection))
598                     b.append(" IDLE");
599                 b.append('\n');
600             }
601         }
602         b.append("--");
603         b.append('\n');
604 
605         return b.toString();
606     }
607 
608     public void setProxy(Address proxy)
609     {
610         _proxy = proxy;
611     }
612 
613     public Address getProxy()
614     {
615         return _proxy;
616     }
617 
618     public Authentication getProxyAuthentication()
619     {
620         return _proxyAuthentication;
621     }
622 
623     public void setProxyAuthentication(Authentication authentication)
624     {
625         _proxyAuthentication = authentication;
626     }
627 
628     public boolean isProxied()
629     {
630         return _proxy != null;
631     }
632 
633     public void close() throws IOException
634     {
635         synchronized (this)
636         {
637             for (HttpConnection connection : _connections)
638             {
639                 connection.close();
640             }
641         }
642     }
643 
644     /* ------------------------------------------------------------ */
645     /**
646      * @see org.eclipse.jetty.util.component.Dumpable#dump()
647      */
648     public String dump()
649     {
650         return AggregateLifeCycle.dump(this);
651     }
652 
653     /* ------------------------------------------------------------ */
654     /**
655      * @see org.eclipse.jetty.util.component.Dumpable#dump(java.lang.Appendable, java.lang.String)
656      */
657     public void dump(Appendable out, String indent) throws IOException
658     {
659         synchronized (this)
660         {
661             out.append(String.valueOf(this)+"idle="+_idle.size()+" pending="+_pendingConnections).append("\n");
662             AggregateLifeCycle.dump(out,indent,_connections);
663         }
664     }
665     
666     private class ConnectExchange extends ContentExchange
667     {
668         private final SelectConnector.ProxySelectChannelEndPoint proxyEndPoint;
669         private final HttpExchange exchange;
670 
671         public ConnectExchange(Address serverAddress, SelectConnector.ProxySelectChannelEndPoint proxyEndPoint, HttpExchange exchange)
672         {
673             this.proxyEndPoint = proxyEndPoint;
674             this.exchange = exchange;
675             setMethod(HttpMethods.CONNECT);
676             setVersion(exchange.getVersion());
677             String serverHostAndPort = serverAddress.toString();
678             setURI(serverHostAndPort);
679             addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
680             addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
681             addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
682         }
683 
684         @Override
685         protected void onResponseComplete() throws IOException
686         {
687             if (getResponseStatus() == HttpStatus.OK_200)
688             {
689                 proxyEndPoint.upgrade();
690             }
691             else
692             {
693                 onConnectionFailed(new ConnectException(exchange.getAddress().toString()));
694             }
695         }
696 
697         @Override
698         protected void onConnectionFailed(Throwable x)
699         {
700             HttpDestination.this.onConnectionFailed(x);
701         }
702     }
703 }