View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  package org.eclipse.jetty.client;
14  
15  
16  import java.io.IOException;
17  import java.lang.reflect.Constructor;
18  import java.util.ArrayList;
19  import java.util.LinkedList;
20  import java.util.List;
21  import java.util.concurrent.ArrayBlockingQueue;
22  
23  import org.eclipse.jetty.client.HttpClient.Connector;
24  import org.eclipse.jetty.client.security.Authentication;
25  import org.eclipse.jetty.client.security.SecurityListener;
26  import org.eclipse.jetty.http.HttpCookie;
27  import org.eclipse.jetty.http.HttpHeaders;
28  import org.eclipse.jetty.http.PathMap;
29  import org.eclipse.jetty.io.Buffer;
30  import org.eclipse.jetty.io.ByteArrayBuffer;
31  import org.eclipse.jetty.util.log.Log;
32  
33  /**
34   *
35   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
36   */
37  public class HttpDestination
38  {
39      private final ByteArrayBuffer _hostHeader;
40      private final Address _address;
41      private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
42      private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
43      private final HttpClient _client;
44      private final boolean _ssl;
45      private final int _maxConnections;
46      private int _pendingConnections = 0;
47      private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
48      private int _newConnection = 0;
49      private Address _proxy;
50      private Authentication _proxyAuthentication;
51      private PathMap _authorizations;
52      private List<HttpCookie> _cookies;
53  
54      public void dump() throws IOException
55      {
56          synchronized (this)
57          {
58              Log.info(this.toString());
59              Log.info("connections=" + _connections.size());
60              Log.info("idle=" + _idle.size());
61              Log.info("pending=" + _pendingConnections);
62              for (HttpConnection c : _connections)
63              {
64                  if (!c.isIdle())
65                      c.dump();
66              }
67          }
68      }
69  
70      /* The queue of exchanged for this destination if connections are limited */
71      private LinkedList<HttpExchange> _queue = new LinkedList<HttpExchange>();
72  
73      HttpDestination(HttpClient client, Address address, boolean ssl, int maxConnections)
74      {
75          _client = client;
76          _address = address;
77          _ssl = ssl;
78          _maxConnections = maxConnections;
79          String addressString = address.getHost();
80          if (address.getPort() != (_ssl ? 443 : 80)) addressString += ":" + address.getPort();
81          _hostHeader = new ByteArrayBuffer(addressString);
82      }
83  
84      public Address getAddress()
85      {
86          return _address;
87      }
88  
89      public Buffer getHostHeader()
90      {
91          return _hostHeader;
92      }
93  
94      public HttpClient getHttpClient()
95      {
96          return _client;
97      }
98  
99      public boolean isSecure()
100     {
101         return _ssl;
102     }
103 
104     public int getConnections()
105     {
106         synchronized (this)
107         {
108             return _connections.size();
109         }
110     }
111 
112     public int getIdleConnections()
113     {
114         synchronized (this)
115         {
116             return _idle.size();
117         }
118     }
119 
120     public void addAuthorization(String pathSpec, Authentication authorization)
121     {
122         synchronized (this)
123         {
124             if (_authorizations == null)
125                 _authorizations = new PathMap();
126             _authorizations.put(pathSpec, authorization);
127         }
128 
129         // TODO query and remove methods
130     }
131 
132     public void addCookie(HttpCookie cookie)
133     {
134         synchronized (this)
135         {
136             if (_cookies == null)
137                 _cookies = new ArrayList<HttpCookie>();
138             _cookies.add(cookie);
139         }
140 
141         // TODO query, remove and age methods
142     }
143 
144     /**
145      * Get a connection. We either get an idle connection if one is available, or
146      * we make a new connection, if we have not yet reached maxConnections. If we
147      * have reached maxConnections, we wait until the number reduces.
148      * @param timeout max time prepared to block waiting to be able to get a connection
149      * @return
150      * @throws IOException
151      */
152     private HttpConnection getConnection(long timeout) throws IOException
153     {
154         HttpConnection connection = null;
155 
156         while ((connection == null) && (connection = getIdleConnection()) == null && timeout>0)
157         {
158             boolean starting = false;
159             synchronized (this)
160             {
161                 int totalConnections = _connections.size() + _pendingConnections;
162                 if (totalConnections < _maxConnections)
163                 {
164                     _newConnection++;
165                     startNewConnection();
166                     starting = true;
167                 }
168             }
169 
170             if (!starting)
171             {
172                 try
173                 {
174                     Thread.currentThread();
175                     Thread.sleep(200);
176                     timeout-=200;
177                 }
178                 catch (InterruptedException e)
179                 {
180                     Log.ignore(e);
181                 }
182             }
183             else
184             {
185                try
186                {
187                    Object o = _newQueue.take();
188                    if (o instanceof HttpConnection)
189                    {
190                        connection = (HttpConnection)o;
191                    }
192                    else
193                        throw (IOException)o;
194                }
195                catch (InterruptedException e)
196                {
197                    Log.ignore(e);
198                }
199            }
200         }
201         return connection;
202     }
203 
204     public HttpConnection reserveConnection(long timeout) throws IOException
205     {
206         HttpConnection connection = getConnection(timeout);
207         if (connection != null)
208             connection.setReserved(true);
209         return connection;
210     }
211 
212     public HttpConnection getIdleConnection() throws IOException
213     {
214         HttpConnection connection = null;
215         while (true)
216         {
217             synchronized (this)
218             {
219                 if (connection!=null)
220                 {
221                     _connections.remove(connection);
222                     connection.close();
223                     connection=null;
224                 }
225                 if (_idle.size() > 0)
226                     connection = _idle.remove(_idle.size()-1);
227             }
228 
229             if (connection==null)
230                 return null;
231 
232             // Check if the connection was idle,
233             // but it expired just a moment ago
234             if (connection.cancelIdleTimeout())
235                 return connection;
236         }
237     }
238 
239     protected void startNewConnection()
240     {
241         try
242         {
243             synchronized (this)
244             {
245                 _pendingConnections++;
246             }
247             final Connector connector=_client._connector;
248             if (connector!=null)
249                 connector.startConnection(this);
250         }
251         catch (Exception e)
252         {
253             Log.debug(e);
254             onConnectionFailed(e);
255         }
256     }
257 
258     public void onConnectionFailed(Throwable throwable)
259     {
260         Throwable connect_failure = null;
261 
262         synchronized (this)
263         {
264             _pendingConnections--;
265             if (_newConnection > 0)
266             {
267                 connect_failure = throwable;
268                 _newConnection--;
269             }
270             else if (_queue.size() > 0)
271             {
272                 HttpExchange ex = _queue.removeFirst();
273                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
274                 ex.getEventListener().onConnectionFailed(throwable);
275 
276                 // Since an existing connection had failed, we need to create a
277                 // connection if the  queue is not empty and client is running.
278                 if (!_queue.isEmpty() && _client.isStarted())
279                     startNewConnection();
280             }
281         }
282 
283         if (connect_failure != null)
284         {
285             try
286             {
287                 _newQueue.put(connect_failure);
288             }
289             catch (InterruptedException e)
290             {
291                 Log.ignore(e);
292             }
293         }
294     }
295 
296     public void onException(Throwable throwable)
297     {
298         synchronized (this)
299         {
300             _pendingConnections--;
301             if (_queue.size() > 0)
302             {
303                 HttpExchange ex = _queue.removeFirst();
304                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
305                 ex.getEventListener().onException(throwable);
306             }
307         }
308     }
309 
310     public void onNewConnection(HttpConnection connection) throws IOException
311     {
312         HttpConnection q_connection = null;
313 
314         synchronized (this)
315         {
316             _pendingConnections--;
317             _connections.add(connection);
318 
319             if (_newConnection > 0)
320             {
321                 q_connection = connection;
322                 _newConnection--;
323             }
324             else if (_queue.size() == 0)
325             {
326                 connection.setIdleTimeout();
327                 _idle.add(connection);
328             }
329             else
330             {
331                 HttpExchange ex = _queue.removeFirst();
332                 send(connection, ex);
333             }
334         }
335 
336         if (q_connection != null)
337         {
338             try
339             {
340                 _newQueue.put(q_connection);
341             }
342             catch (InterruptedException e)
343             {
344                 Log.ignore(e);
345             }
346         }
347     }
348 
349     public void returnConnection(HttpConnection connection, boolean close) throws IOException
350     {
351         if (connection.isReserved())
352             connection.setReserved(false);
353 
354         if (close)
355         {
356             try
357             {
358                 connection.close();
359             }
360             catch (IOException e)
361             {
362                 Log.ignore(e);
363             }
364         }
365 
366         if (!_client.isStarted())
367             return;
368 
369         if (!close && connection.getEndPoint().isOpen())
370         {
371             synchronized (this)
372             {
373                 if (_queue.size() == 0)
374                 {
375                     connection.setIdleTimeout();
376                     _idle.add(connection);
377                 }
378                 else
379                 {
380                     HttpExchange ex = _queue.removeFirst();
381                     send(connection, ex);
382                 }
383                 this.notifyAll();
384             }
385         }
386         else
387         {
388             synchronized (this)
389             {
390                 _connections.remove(connection);
391                 if (!_queue.isEmpty())
392                     startNewConnection();
393             }
394         }
395     }
396 
397     public void returnIdleConnection(HttpConnection connection)
398     {
399         try
400         {
401             connection.close();
402         }
403         catch (IOException e)
404         {
405             Log.ignore(e);
406         }
407 
408         synchronized (this)
409         {
410             _idle.remove(connection);
411             _connections.remove(connection);
412 
413             if (!_queue.isEmpty() && _client.isStarted())
414                 startNewConnection();
415         }
416     }
417 
418     public void send(HttpExchange ex) throws IOException
419     {
420         LinkedList<String> listeners = _client.getRegisteredListeners();
421 
422         if (listeners != null)
423         {
424             // Add registered listeners, fail if we can't load them
425             for (int i = listeners.size(); i > 0; --i)
426             {
427                 String listenerClass = listeners.get(i - 1);
428 
429                 try
430                 {
431                     Class listener = Class.forName(listenerClass);
432                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
433                     HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
434                     ex.setEventListener(elistener);
435                 }
436                 catch (Exception e)
437                 {
438                     e.printStackTrace();
439                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
440                 }
441             }
442         }
443 
444         // Security is supported by default and should be the first consulted
445         if (_client.hasRealms())
446         {
447             ex.setEventListener(new SecurityListener(this, ex));
448         }
449 
450         doSend(ex);
451     }
452 
453     public void resend(HttpExchange ex) throws IOException
454     {
455         ex.getEventListener().onRetry();
456         ex.reset();
457         doSend(ex);
458     }
459 
460     protected void doSend(HttpExchange ex) throws IOException
461     {
462         // add cookies
463         // TODO handle max-age etc.
464         if (_cookies != null)
465         {
466             StringBuilder buf = null;
467             for (HttpCookie cookie : _cookies)
468             {
469                 if (buf == null)
470                     buf = new StringBuilder();
471                 else
472                     buf.append("; ");
473                 buf.append(cookie.getName()); // TODO quotes
474                 buf.append("=");
475                 buf.append(cookie.getValue()); // TODO quotes
476             }
477             if (buf != null)
478                 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
479         }
480 
481         // Add any known authorizations
482         if (_authorizations != null)
483         {
484             Authentication auth = (Authentication)_authorizations.match(ex.getURI());
485             if (auth != null)
486                 (auth).setCredentials(ex);
487         }
488 
489         HttpConnection connection = getIdleConnection();
490         if (connection != null)
491         {
492             send(connection, ex);
493         }
494         else
495         {
496             synchronized (this)
497             {
498                 _queue.add(ex);
499                 if (_connections.size() + _pendingConnections < _maxConnections)
500                 {
501                     startNewConnection();
502                 }
503             }
504         }
505     }
506 
507     protected void send(HttpConnection connection, HttpExchange exchange) throws IOException
508     {
509         synchronized (this)
510         {
511             // If server closes the connection, put the exchange back
512             // to the exchange queue and recycle the connection
513             if(!connection.send(exchange))
514             {
515                 _queue.addFirst(exchange);
516                 returnIdleConnection(connection);
517             }
518         }
519     }
520 
521     @Override
522     public synchronized String toString()
523     {
524         return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
525     }
526 
527     public synchronized String toDetailString()
528     {
529         StringBuilder b = new StringBuilder();
530         b.append(toString());
531         b.append('\n');
532         synchronized (this)
533         {
534             for (HttpConnection connection : _connections)
535             {
536                 b.append(connection.toDetailString());
537                 if (_idle.contains(connection))
538                     b.append(" IDLE");
539                 b.append('\n');
540             }
541         }
542         b.append("--");
543         b.append('\n');
544 
545         return b.toString();
546     }
547 
548     public void setProxy(Address proxy)
549     {
550         _proxy = proxy;
551     }
552 
553     public Address getProxy()
554     {
555         return _proxy;
556     }
557 
558     public Authentication getProxyAuthentication()
559     {
560         return _proxyAuthentication;
561     }
562 
563     public void setProxyAuthentication(Authentication authentication)
564     {
565         _proxyAuthentication = authentication;
566     }
567 
568     public boolean isProxied()
569     {
570         return _proxy != null;
571     }
572 
573     public void close() throws IOException
574     {
575         synchronized (this)
576         {
577             for (HttpConnection connection : _connections)
578             {
579                 connection.close();
580             }
581         }
582     }
583 }