View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  package org.eclipse.jetty.client;
14  
15  
16  import java.io.IOException;
17  import java.lang.reflect.Constructor;
18  import java.util.ArrayList;
19  import java.util.LinkedList;
20  import java.util.List;
21  import java.util.concurrent.ArrayBlockingQueue;
22  
23  import org.eclipse.jetty.client.security.Authorization;
24  import org.eclipse.jetty.client.security.SecurityListener;
25  import org.eclipse.jetty.http.HttpCookie;
26  import org.eclipse.jetty.http.HttpHeaders;
27  import org.eclipse.jetty.http.PathMap;
28  import org.eclipse.jetty.io.Buffer;
29  import org.eclipse.jetty.io.ByteArrayBuffer;
30  import org.eclipse.jetty.util.log.Log;
31  
32  /**
33   *
34   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
35   */
36  public class HttpDestination
37  {
38      private final ByteArrayBuffer _hostHeader;
39      private final Address _address;
40      private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
41      private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
42      private final HttpClient _client;
43      private final boolean _ssl;
44      private final int _maxConnections;
45      private int _pendingConnections = 0;
46      private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
47      private int _newConnection = 0;
48      private Address _proxy;
49      private Authorization _proxyAuthentication;
50      private PathMap _authorizations;
51      private List<HttpCookie> _cookies;
52  
53      public void dump() throws IOException
54      {
55          synchronized (this)
56          {
57              Log.info(this.toString());
58              Log.info("connections=" + _connections.size());
59              Log.info("idle=" + _idle.size());
60              Log.info("pending=" + _pendingConnections);
61              for (HttpConnection c : _connections)
62              {
63                  if (!c.isIdle())
64                      c.dump();
65              }
66          }
67      }
68  
69      /* The queue of exchanged for this destination if connections are limited */
70      private LinkedList<HttpExchange> _queue = new LinkedList<HttpExchange>();
71  
72      HttpDestination(HttpClient client, Address address, boolean ssl, int maxConnections)
73      {
74          _client = client;
75          _address = address;
76          _ssl = ssl;
77          _maxConnections = maxConnections;
78          String addressString = address.getHost();
79          if (address.getPort() != (_ssl ? 443 : 80)) addressString += ":" + address.getPort();
80          _hostHeader = new ByteArrayBuffer(addressString);
81      }
82  
83      public Address getAddress()
84      {
85          return _address;
86      }
87  
88      public Buffer getHostHeader()
89      {
90          return _hostHeader;
91      }
92  
93      public HttpClient getHttpClient()
94      {
95          return _client;
96      }
97  
98      public boolean isSecure()
99      {
100         return _ssl;
101     }
102 
103     public void addAuthorization(String pathSpec, Authorization authorization)
104     {
105         synchronized (this)
106         {
107             if (_authorizations == null)
108                 _authorizations = new PathMap();
109             _authorizations.put(pathSpec, authorization);
110         }
111 
112         // TODO query and remove methods
113     }
114 
115     public void addCookie(HttpCookie cookie)
116     {
117         synchronized (this)
118         {
119             if (_cookies == null)
120                 _cookies = new ArrayList<HttpCookie>();
121             _cookies.add(cookie);
122         }
123 
124         // TODO query, remove and age methods
125     }
126 
127     /**
128      * Get a connection. We either get an idle connection if one is available, or
129      * we make a new connection, if we have not yet reached maxConnections. If we
130      * have reached maxConnections, we wait until the number reduces.
131      * @param timeout max time prepared to block waiting to be able to get a connection
132      * @return
133      * @throws IOException
134      */
135     private HttpConnection getConnection(long timeout) throws IOException
136     {
137         HttpConnection connection = null;
138 
139         while ((connection == null) && (connection = getIdleConnection()) == null && timeout>0)
140         {
141             int totalConnections = 0;
142             boolean starting = false;
143             synchronized (this)
144             {
145                 totalConnections = _connections.size() + _pendingConnections;
146                 if (totalConnections < _maxConnections)
147                 {
148                     _newConnection++;
149                     startNewConnection();
150                     starting = true;
151                 }
152             }
153 
154             if (!starting)
155             {
156                 try
157                 {
158                     Thread.currentThread().sleep(200);
159                     timeout-=200;
160                 }
161                 catch (InterruptedException e)
162                 {
163                     Log.ignore(e);
164                 }
165             }
166             else
167             {
168                try
169                {
170                    Object o = _newQueue.take();
171                    if (o instanceof HttpConnection)
172                    {
173                        connection = (HttpConnection)o;
174                    }
175                    else
176                        throw (IOException)o;
177                }
178                catch (InterruptedException e)
179                {
180                    Log.ignore(e);
181                }
182            }
183         }
184         return connection;
185     }
186 
187     public HttpConnection reserveConnection(long timeout) throws IOException
188     {
189         HttpConnection connection = getConnection(timeout);
190         if (connection != null)
191             connection.setReserved(true);
192         return connection;
193     }
194 
195     public HttpConnection getIdleConnection() throws IOException
196     {
197         long now = System.currentTimeMillis();
198         long idleTimeout=_client.getIdleTimeout();
199         HttpConnection connection = null;
200         while (true)
201         {
202             synchronized (this)
203             {
204                 if (connection!=null)
205                 {
206                     _connections.remove(connection);
207                     connection.getEndPoint().close();
208                     connection=null;
209                 }
210                 if (_idle.size() > 0)
211                     connection = _idle.remove(_idle.size()-1);
212             }
213 
214             if (connection==null)
215                 return null;
216 
217             long last = connection.getLast();
218             if (connection.getEndPoint().isOpen() && (last==0 || ((now-last)<idleTimeout)) )
219                 return connection;
220 
221         }
222     }
223 
224     protected void startNewConnection()
225     {
226         try
227         {
228             synchronized (this)
229             {
230                 _pendingConnections++;
231             }
232             _client._connector.startConnection(this);
233         }
234         catch (Exception e)
235         {
236             Log.debug(e);
237             onConnectionFailed(e);
238         }
239     }
240 
241     public void onConnectionFailed(Throwable throwable)
242     {
243         Throwable connect_failure = null;
244 
245         synchronized (this)
246         {
247             _pendingConnections--;
248             if (_newConnection > 0)
249             {
250                 connect_failure = throwable;
251                 _newConnection--;
252             }
253             else if (_queue.size() > 0)
254             {
255                 HttpExchange ex = _queue.removeFirst();
256                 ex.getEventListener().onConnectionFailed(throwable);
257                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
258             }
259         }
260 
261         if (connect_failure != null)
262         {
263             try
264             {
265                 _newQueue.put(connect_failure);
266             }
267             catch (InterruptedException e)
268             {
269                 Log.ignore(e);
270             }
271         }
272     }
273 
274     public void onException(Throwable throwable)
275     {
276         synchronized (this)
277         {
278             _pendingConnections--;
279             if (_queue.size() > 0)
280             {
281                 HttpExchange ex = _queue.removeFirst();
282                 ex.getEventListener().onException(throwable);
283                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
284             }
285         }
286     }
287 
288     public void onNewConnection(HttpConnection connection) throws IOException
289     {
290         HttpConnection q_connection = null;
291 
292         synchronized (this)
293         {
294             _pendingConnections--;
295             _connections.add(connection);
296 
297             if (_newConnection > 0)
298             {
299                 q_connection = connection;
300                 _newConnection--;
301             }
302             else if (_queue.size() == 0)
303             {
304                 _idle.add(connection);
305             }
306             else
307             {
308                 HttpExchange ex = _queue.removeFirst();
309                 connection.send(ex);
310             }
311         }
312 
313         if (q_connection != null)
314         {
315             try
316             {
317                 _newQueue.put(q_connection);
318             }
319             catch (InterruptedException e)
320             {
321                 Log.ignore(e);
322             }
323         }
324     }
325 
326     public void returnConnection(HttpConnection connection, boolean close) throws IOException
327     {
328         if (connection.isReserved())
329             connection.setReserved(false);
330 
331         if (close)
332         {
333             try
334             {
335                 connection.close();
336             }
337             catch (IOException e)
338             {
339                 Log.ignore(e);
340             }
341         }
342 
343         if (!_client.isStarted())
344             return;
345 
346         if (!close && connection.getEndPoint().isOpen())
347         {
348             synchronized (this)
349             {
350                 if (_queue.size() == 0)
351                 {
352                     connection.setLast(System.currentTimeMillis());
353                     _idle.add(connection);
354                 }
355                 else
356                 {
357                     HttpExchange ex = _queue.removeFirst();
358                     connection.send(ex);
359                 }
360                 this.notifyAll();
361             }
362         }
363         else
364         {
365             synchronized (this)
366             {
367                 _connections.remove(connection);
368                 if (!_queue.isEmpty())
369                     startNewConnection();
370             }
371         }
372     }
373 
374     public void send(HttpExchange ex) throws IOException
375     {
376         LinkedList<String> listeners = _client.getRegisteredListeners();
377 
378         if (listeners != null)
379         {
380             // Add registered listeners, fail if we can't load them
381             for (int i = listeners.size(); i > 0; --i)
382             {
383                 String listenerClass = listeners.get(i - 1);
384 
385                 try
386                 {
387                     Class listener = Class.forName(listenerClass);
388                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
389                     HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
390                     ex.setEventListener(elistener);
391                 }
392                 catch (Exception e)
393                 {
394                     e.printStackTrace();
395                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
396                 }
397             }
398         }
399 
400         // Security is supported by default and should be the first consulted
401         if (_client.hasRealms())
402         {
403             ex.setEventListener(new SecurityListener(this, ex));
404         }
405 
406         doSend(ex);
407     }
408 
409     public void resend(HttpExchange ex) throws IOException
410     {
411         ex.getEventListener().onRetry();
412         ex.reset();
413         doSend(ex);
414     }
415 
416     protected void doSend(HttpExchange ex) throws IOException
417     {
418         // add cookies
419         // TODO handle max-age etc.
420         if (_cookies != null)
421         {
422             StringBuilder buf = null;
423             for (HttpCookie cookie : _cookies)
424             {
425                 if (buf == null)
426                     buf = new StringBuilder();
427                 else
428                     buf.append("; ");
429                 buf.append(cookie.getName()); // TODO quotes
430                 buf.append("=");
431                 buf.append(cookie.getValue()); // TODO quotes
432             }
433             if (buf != null)
434                 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
435         }
436 
437         // Add any known authorizations
438         if (_authorizations != null)
439         {
440             Authorization auth = (Authorization)_authorizations.match(ex.getURI());
441             if (auth != null)
442                 ((Authorization)auth).setCredentials(ex);
443         }
444 
445         HttpConnection connection = getIdleConnection();
446         if (connection != null)
447         {
448             boolean sent = connection.send(ex);
449             if (!sent) connection = null;
450         }
451 
452         if (connection == null)
453         {
454             synchronized (this)
455             {
456                 _queue.add(ex);
457                 if (_connections.size() + _pendingConnections < _maxConnections)
458                 {
459                     startNewConnection();
460                 }
461             }
462         }
463     }
464 
465     public synchronized String toString()
466     {
467         return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
468     }
469 
470     public synchronized String toDetailString()
471     {
472         StringBuilder b = new StringBuilder();
473         b.append(toString());
474         b.append('\n');
475         synchronized (this)
476         {
477             for (HttpConnection connection : _connections)
478             {
479                 b.append(connection.toDetailString());
480                 if (_idle.contains(connection))
481                     b.append(" IDLE");
482                 b.append('\n');
483             }
484         }
485         b.append("--");
486         b.append('\n');
487 
488         return b.toString();
489     }
490 
491     public void setProxy(Address proxy)
492     {
493         _proxy = proxy;
494     }
495 
496     public Address getProxy()
497     {
498         return _proxy;
499     }
500 
501     public Authorization getProxyAuthentication()
502     {
503         return _proxyAuthentication;
504     }
505 
506     public void setProxyAuthentication(Authorization authentication)
507     {
508         _proxyAuthentication = authentication;
509     }
510 
511     public boolean isProxied()
512     {
513         return _proxy != null;
514     }
515 
516     public void close() throws IOException
517     {
518         synchronized (this)
519         {
520             for (HttpConnection connection : _connections)
521             {
522                 connection.close();
523             }
524         }
525     }
526 }