View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  package org.eclipse.jetty.client;
14  
15  
16  import java.io.IOException;
17  import java.lang.reflect.Constructor;
18  import java.util.ArrayList;
19  import java.util.LinkedList;
20  import java.util.List;
21  import java.util.concurrent.ArrayBlockingQueue;
22  
23  import org.eclipse.jetty.client.security.Authorization;
24  import org.eclipse.jetty.client.security.SecurityListener;
25  import org.eclipse.jetty.http.HttpCookie;
26  import org.eclipse.jetty.http.HttpHeaders;
27  import org.eclipse.jetty.http.PathMap;
28  import org.eclipse.jetty.io.Buffer;
29  import org.eclipse.jetty.io.ByteArrayBuffer;
30  import org.eclipse.jetty.util.log.Log;
31  
32  /**
33   * 
34   * 
35   */
36  public class HttpDestination
37  {
38      private final ByteArrayBuffer _hostHeader;
39      private final Address _address;
40      private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
41      private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
42      private final HttpClient _client;
43      private final boolean _ssl;
44      private final int _maxConnections;
45      private int _pendingConnections = 0;
46      private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
47      private int _newConnection = 0;
48      private Address _proxy;
49      private Authorization _proxyAuthentication;
50      private PathMap _authorizations;
51      private List<HttpCookie> _cookies;
52  
53      /* ------------------------------------------------------------ */
54      public void dump() throws IOException
55      {
56          synchronized (this)
57          {
58              Log.info(this.toString());
59              Log.info("connections=" + _connections.size());
60              Log.info("idle=" + _idle.size());
61              Log.info("pending=" + _pendingConnections);
62              for (HttpConnection c : _connections)
63              {
64                  if (!c.isIdle())
65                      c.dump();
66              }
67          }
68      }
69  
70      /* The queue of exchanged for this destination if connections are limited */
71      private LinkedList<HttpExchange> _queue = new LinkedList<HttpExchange>();
72  
73      /* ------------------------------------------------------------ */
74      HttpDestination(HttpClient client, Address address, boolean ssl, int maxConnections)
75      {
76          _client = client;
77          _address = address;
78          _ssl = ssl;
79          _maxConnections = maxConnections;
80          String addressString = address.getHost();
81          if (address.getPort() != (_ssl ? 443 : 80)) addressString += ":" + address.getPort();
82          _hostHeader = new ByteArrayBuffer(addressString);
83      }
84  
85      /* ------------------------------------------------------------ */
86      public Address getAddress()
87      {
88          return _address;
89      }
90  
91      /* ------------------------------------------------------------ */
92      public Buffer getHostHeader()
93      {
94          return _hostHeader;
95      }
96  
97      /* ------------------------------------------------------------ */
98      public HttpClient getHttpClient()
99      {
100         return _client;
101     }
102 
103     /* ------------------------------------------------------------ */
104     public boolean isSecure()
105     {
106         return _ssl;
107     }
108 
109     /* ------------------------------------------------------------ */
110     public void addAuthorization(String pathSpec, Authorization authorization)
111     {
112         synchronized (this)
113         {
114             if (_authorizations == null)
115                 _authorizations = new PathMap();
116             _authorizations.put(pathSpec, authorization);
117         }
118 
119         // TODO query and remove methods
120     }
121 
122     /* ------------------------------------------------------------------------------- */
123     public void addCookie(HttpCookie cookie)
124     {
125         synchronized (this)
126         {
127             if (_cookies == null)
128                 _cookies = new ArrayList<HttpCookie>();
129             _cookies.add(cookie);
130         }
131 
132         // TODO query, remove and age methods
133     }
134 
135     /* ------------------------------------------------------------------------------- */
136     /**
137      * Get a connection. We either get an idle connection if one is available, or
138      * we make a new connection, if we have not yet reached maxConnections. If we
139      * have reached maxConnections, we wait until the number reduces.
140      * @param timeout max time prepared to block waiting to be able to get a connection
141      * @return
142      * @throws IOException
143      */
144     private HttpConnection getConnection(long timeout) throws IOException
145     {
146         HttpConnection connection = null;
147 
148         while ((connection == null) && (connection = getIdleConnection()) == null && timeout>0)
149         {
150             int totalConnections = 0;
151             boolean starting = false;
152             synchronized (this)
153             {
154                 totalConnections = _connections.size() + _pendingConnections;
155                 if (totalConnections < _maxConnections)
156                 {
157                     _newConnection++;
158                     startNewConnection();
159                     starting = true;
160                 }
161             }
162             
163             if (!starting)
164             {
165                 try
166                 {
167                     Thread.currentThread().sleep(200);
168                     timeout-=200;
169                 }
170                 catch (InterruptedException e)
171                 {
172                     Log.ignore(e);
173                 }
174             }
175             else
176             {
177                try
178                {
179                    Object o = _newQueue.take();
180                    if (o instanceof HttpConnection)
181                    {
182                        connection = (HttpConnection)o;
183                    }
184                    else
185                        throw (IOException)o;
186                }
187                catch (InterruptedException e)
188                {
189                    Log.ignore(e);
190                }
191            }
192         }
193         return connection;
194     }
195     
196     /* ------------------------------------------------------------------------------- */
197     public HttpConnection reserveConnection(long timeout) throws IOException
198     {
199         HttpConnection connection = getConnection(timeout);
200         if (connection != null)
201             connection.setReserved(true);
202         return connection;
203     }
204 
205     /* ------------------------------------------------------------------------------- */
206     public HttpConnection getIdleConnection() throws IOException
207     {
208         long now = System.currentTimeMillis();
209         long idleTimeout=_client.getIdleTimeout();
210         HttpConnection connection = null;
211         while (true)
212         {
213             synchronized (this)
214             {
215                 if (connection!=null)
216                 {
217                     _connections.remove(connection);
218                     connection.getEndPoint().close();
219                     connection=null;
220                 }
221                 if (_idle.size() > 0)
222                     connection = _idle.remove(_idle.size()-1);
223             }
224             
225             if (connection==null)
226                 return null;
227 
228             long last = connection.getLast();
229             if (connection.getEndPoint().isOpen() && (last==0 || ((now-last)<idleTimeout)) )
230                 return connection;
231 
232         }
233     }
234 
235     /* ------------------------------------------------------------------------------- */
236     protected void startNewConnection()
237     {
238         try
239         {
240             synchronized (this)
241             {
242                 _pendingConnections++;
243             }
244             _client._connector.startConnection(this);
245         }
246         catch (Exception e)
247         {
248             e.printStackTrace();
249             onConnectionFailed(e);
250         }
251     }
252 
253     /* ------------------------------------------------------------------------------- */
254     public void onConnectionFailed(Throwable throwable)
255     {
256         Throwable connect_failure = null;
257 
258         synchronized (this)
259         {
260             _pendingConnections--;
261             if (_newConnection > 0)
262             {
263                 connect_failure = throwable;
264                 _newConnection--;
265             }
266             else if (_queue.size() > 0)
267             {
268                 HttpExchange ex = _queue.removeFirst();
269                 ex.getEventListener().onConnectionFailed(throwable);
270             }
271         }
272 
273         if (connect_failure != null)
274         {
275             try
276             {
277                 _newQueue.put(connect_failure);
278             }
279             catch (InterruptedException e)
280             {
281                 Log.ignore(e);
282             }
283         }
284     }
285 
286     /* ------------------------------------------------------------------------------- */
287     public void onException(Throwable throwable)
288     {
289         synchronized (this)
290         {
291             _pendingConnections--;
292             if (_queue.size() > 0)
293             {
294                 HttpExchange ex = _queue.removeFirst();
295                 ex.getEventListener().onException(throwable);
296                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
297             }
298         }
299     }
300 
301     /* ------------------------------------------------------------------------------- */
302     public void onNewConnection(HttpConnection connection) throws IOException
303     {
304         HttpConnection q_connection = null;
305 
306         synchronized (this)
307         {
308             _pendingConnections--;
309             _connections.add(connection);
310 
311             if (_newConnection > 0)
312             {
313                 q_connection = connection;
314                 _newConnection--;
315             }
316             else if (_queue.size() == 0)
317             {
318                 _idle.add(connection);
319             }
320             else
321             {
322                 HttpExchange ex = _queue.removeFirst();
323                 connection.send(ex);
324             }
325         }
326 
327         if (q_connection != null)
328         {
329             try
330             {
331                 _newQueue.put(q_connection);
332             }
333             catch (InterruptedException e)
334             {
335                 Log.ignore(e);
336             }
337         }
338     }
339 
340     /* ------------------------------------------------------------------------------- */
341     public void returnConnection(HttpConnection connection, boolean close) throws IOException
342     {
343         if (connection.isReserved())
344             connection.setReserved(false);
345         
346         if (close)
347         {
348             try
349             {
350                 connection.close();
351             }
352             catch (IOException e)
353             {
354                 Log.ignore(e);
355             }
356         }
357 
358         if (!_client.isStarted())
359             return;
360 
361         if (!close && connection.getEndPoint().isOpen())
362         {
363             synchronized (this)
364             {
365                 if (_queue.size() == 0)
366                 {
367                     connection.setLast(System.currentTimeMillis());
368                     _idle.add(connection);
369                 }
370                 else
371                 {
372                     HttpExchange ex = _queue.removeFirst();
373                     connection.send(ex);
374                 }
375                 this.notifyAll();
376             }
377         }
378         else
379         {
380             synchronized (this)
381             {
382                 _connections.remove(connection);
383                 if (!_queue.isEmpty())
384                     startNewConnection();
385             }
386         }
387     }
388 
389     /* ------------------------------------------------------------ */
390     public void send(HttpExchange ex) throws IOException
391     {
392         LinkedList<String> listeners = _client.getRegisteredListeners();
393 
394         if (listeners != null)
395         {
396             // Add registered listeners, fail if we can't load them
397             for (int i = listeners.size(); i > 0; --i)
398             {
399                 String listenerClass = listeners.get(i - 1);
400 
401                 try
402                 {
403                     Class listener = Class.forName(listenerClass);
404                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
405                     HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
406                     ex.setEventListener(elistener);
407                 }
408                 catch (Exception e)
409                 {
410                     e.printStackTrace();
411                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
412                 }
413             }
414         }
415 
416         // Security is supported by default and should be the first consulted
417         if (_client.hasRealms())
418         {
419             ex.setEventListener(new SecurityListener(this, ex));
420         }
421 
422         doSend(ex);
423     }
424 
425     /* ------------------------------------------------------------ */
426     public void resend(HttpExchange ex) throws IOException
427     {
428         ex.getEventListener().onRetry();
429         doSend(ex);
430     }
431 
432     /* ------------------------------------------------------------ */
433     protected void doSend(HttpExchange ex) throws IOException
434     {
435         // add cookies
436         // TODO handle max-age etc.
437         if (_cookies != null)
438         {
439             StringBuilder buf = null;
440             for (HttpCookie cookie : _cookies)
441             {
442                 if (buf == null)
443                     buf = new StringBuilder();
444                 else
445                     buf.append("; ");
446                 buf.append(cookie.getName()); // TODO quotes
447                 buf.append("=");
448                 buf.append(cookie.getValue()); // TODO quotes
449             }
450             if (buf != null)
451                 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
452         }
453 
454         // Add any known authorizations
455         if (_authorizations != null)
456         {
457             Authorization auth = (Authorization)_authorizations.match(ex.getURI());
458             if (auth != null)
459                 ((Authorization)auth).setCredentials(ex);
460         }
461 
462         HttpConnection connection = getIdleConnection();
463         if (connection != null)
464         {
465             boolean sent = connection.send(ex);
466             if (!sent) connection = null;
467         }
468 
469         if (connection == null)
470         {
471             synchronized (this)
472             {
473                 _queue.add(ex);
474                 if (_connections.size() + _pendingConnections < _maxConnections)
475                 {
476                     startNewConnection();
477                 }
478             }
479         }
480     }
481 
482     /* ------------------------------------------------------------ */
483     public synchronized String toString()
484     {
485         return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
486     }
487 
488     /* ------------------------------------------------------------ */
489     public synchronized String toDetailString()
490     {
491         StringBuilder b = new StringBuilder();
492         b.append(toString());
493         b.append('\n');
494         synchronized (this)
495         {
496             for (HttpConnection connection : _connections)
497             {
498                 if (connection._exchange != null)
499                 {
500                     b.append(connection.toDetailString());
501                     if (_idle.contains(connection))
502                         b.append(" IDLE");
503                     b.append('\n');
504                 }
505             }
506         }
507         b.append("--");
508         b.append('\n');
509 
510         return b.toString();
511     }
512 
513     /* ------------------------------------------------------------ */
514     public void setProxy(Address proxy)
515     {
516         _proxy = proxy;
517     }
518 
519     /* ------------------------------------------------------------ */
520     public Address getProxy()
521     {
522         return _proxy;
523     }
524 
525     /* ------------------------------------------------------------ */
526     public Authorization getProxyAuthentication()
527     {
528         return _proxyAuthentication;
529     }
530 
531     /* ------------------------------------------------------------ */
532     public void setProxyAuthentication(Authorization authentication)
533     {
534         _proxyAuthentication = authentication;
535     }
536 
537     /* ------------------------------------------------------------ */
538     public boolean isProxied()
539     {
540         return _proxy != null;
541     }
542 
543     /* ------------------------------------------------------------ */
544     public void close() throws IOException
545     {
546         synchronized (this)
547         {
548             for (HttpConnection connection : _connections)
549             {
550                 connection.close();
551             }
552         }
553     }
554 
555 }