View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  package org.eclipse.jetty.client;
14  
15  import java.io.IOException;
16  import java.lang.reflect.Constructor;
17  import java.net.ConnectException;
18  import java.util.ArrayList;
19  import java.util.LinkedList;
20  import java.util.List;
21  import java.util.concurrent.ArrayBlockingQueue;
22  
23  import org.eclipse.jetty.client.HttpClient.Connector;
24  import org.eclipse.jetty.client.security.Authentication;
25  import org.eclipse.jetty.client.security.SecurityListener;
26  import org.eclipse.jetty.http.HttpCookie;
27  import org.eclipse.jetty.http.HttpHeaders;
28  import org.eclipse.jetty.http.HttpMethods;
29  import org.eclipse.jetty.http.HttpStatus;
30  import org.eclipse.jetty.http.PathMap;
31  import org.eclipse.jetty.io.Buffer;
32  import org.eclipse.jetty.io.ByteArrayBuffer;
33  import org.eclipse.jetty.io.EndPoint;
34  import org.eclipse.jetty.util.log.Log;
35  
36  /**
37   *
38   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
39   */
40  public class HttpDestination
41  {
42      private final ByteArrayBuffer _hostHeader;
43      private final Address _address;
44      private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
45      private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
46      private final HttpClient _client;
47      private final boolean _ssl;
48      private final int _maxConnections;
49      private int _pendingConnections = 0;
50      private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
51      private int _newConnection = 0;
52      private Address _proxy;
53      private Authentication _proxyAuthentication;
54      private PathMap _authorizations;
55      private List<HttpCookie> _cookies;
56  
57      public void dump() throws IOException
58      {
59          synchronized (this)
60          {
61              Log.info(this.toString());
62              Log.info("connections=" + _connections.size());
63              Log.info("idle=" + _idle.size());
64              Log.info("pending=" + _pendingConnections);
65              for (HttpConnection c : _connections)
66              {
67                  if (!c.isIdle())
68                      c.dump();
69              }
70          }
71      }
72  
73      /* The queue of exchanged for this destination if connections are limited */
74      private LinkedList<HttpExchange> _queue = new LinkedList<HttpExchange>();
75  
76      HttpDestination(HttpClient client, Address address, boolean ssl, int maxConnections)
77      {
78          _client = client;
79          _address = address;
80          _ssl = ssl;
81          _maxConnections = maxConnections;
82          String addressString = address.getHost();
83          if (address.getPort() != (_ssl ? 443 : 80)) addressString += ":" + address.getPort();
84          _hostHeader = new ByteArrayBuffer(addressString);
85      }
86  
87      public Address getAddress()
88      {
89          return _address;
90      }
91  
92      public Buffer getHostHeader()
93      {
94          return _hostHeader;
95      }
96  
97      public HttpClient getHttpClient()
98      {
99          return _client;
100     }
101 
102     public boolean isSecure()
103     {
104         return _ssl;
105     }
106 
107     public int getConnections()
108     {
109         synchronized (this)
110         {
111             return _connections.size();
112         }
113     }
114 
115     public int getIdleConnections()
116     {
117         synchronized (this)
118         {
119             return _idle.size();
120         }
121     }
122 
123     public void addAuthorization(String pathSpec, Authentication authorization)
124     {
125         synchronized (this)
126         {
127             if (_authorizations == null)
128                 _authorizations = new PathMap();
129             _authorizations.put(pathSpec, authorization);
130         }
131 
132         // TODO query and remove methods
133     }
134 
135     public void addCookie(HttpCookie cookie)
136     {
137         synchronized (this)
138         {
139             if (_cookies == null)
140                 _cookies = new ArrayList<HttpCookie>();
141             _cookies.add(cookie);
142         }
143 
144         // TODO query, remove and age methods
145     }
146 
147     /**
148      * Get a connection. We either get an idle connection if one is available, or
149      * we make a new connection, if we have not yet reached maxConnections. If we
150      * have reached maxConnections, we wait until the number reduces.
151      * @param timeout max time prepared to block waiting to be able to get a connection
152      * @return
153      * @throws IOException
154      */
155     private HttpConnection getConnection(long timeout) throws IOException
156     {
157         HttpConnection connection = null;
158 
159         while ((connection == null) && (connection = getIdleConnection()) == null && timeout>0)
160         {
161             boolean starting = false;
162             synchronized (this)
163             {
164                 int totalConnections = _connections.size() + _pendingConnections;
165                 if (totalConnections < _maxConnections)
166                 {
167                     _newConnection++;
168                     startNewConnection();
169                     starting = true;
170                 }
171             }
172 
173             if (!starting)
174             {
175                 try
176                 {
177                     Thread.currentThread();
178                     Thread.sleep(200);
179                     timeout-=200;
180                 }
181                 catch (InterruptedException e)
182                 {
183                     Log.ignore(e);
184                 }
185             }
186             else
187             {
188                try
189                {
190                    Object o = _newQueue.take();
191                    if (o instanceof HttpConnection)
192                    {
193                        connection = (HttpConnection)o;
194                    }
195                    else
196                        throw (IOException)o;
197                }
198                catch (InterruptedException e)
199                {
200                    Log.ignore(e);
201                }
202            }
203         }
204         return connection;
205     }
206 
207     public HttpConnection reserveConnection(long timeout) throws IOException
208     {
209         HttpConnection connection = getConnection(timeout);
210         if (connection != null)
211             connection.setReserved(true);
212         return connection;
213     }
214 
215     public HttpConnection getIdleConnection() throws IOException
216     {
217         HttpConnection connection = null;
218         while (true)
219         {
220             synchronized (this)
221             {
222                 if (connection!=null)
223                 {
224                     _connections.remove(connection);
225                     connection.close();
226                     connection=null;
227                 }
228                 if (_idle.size() > 0)
229                     connection = _idle.remove(_idle.size()-1);
230             }
231 
232             if (connection==null)
233                 return null;
234 
235             // Check if the connection was idle,
236             // but it expired just a moment ago
237             if (connection.cancelIdleTimeout())
238                 return connection;
239         }
240     }
241 
242     protected void startNewConnection()
243     {
244         try
245         {
246             synchronized (this)
247             {
248                 _pendingConnections++;
249             }
250             final Connector connector=_client._connector;
251             if (connector!=null)
252                 connector.startConnection(this);
253         }
254         catch (Exception e)
255         {
256             Log.debug(e);
257             onConnectionFailed(e);
258         }
259     }
260 
261     public void onConnectionFailed(Throwable throwable)
262     {
263         Throwable connect_failure = null;
264 
265         synchronized (this)
266         {
267             _pendingConnections--;
268             if (_newConnection > 0)
269             {
270                 connect_failure = throwable;
271                 _newConnection--;
272             }
273             else if (_queue.size() > 0)
274             {
275                 HttpExchange ex = _queue.removeFirst();
276                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
277                 ex.getEventListener().onConnectionFailed(throwable);
278 
279                 // Since an existing connection had failed, we need to create a
280                 // connection if the  queue is not empty and client is running.
281                 if (!_queue.isEmpty() && _client.isStarted())
282                     startNewConnection();
283             }
284         }
285 
286         if (connect_failure != null)
287         {
288             try
289             {
290                 _newQueue.put(connect_failure);
291             }
292             catch (InterruptedException e)
293             {
294                 Log.ignore(e);
295             }
296         }
297     }
298 
299     public void onException(Throwable throwable)
300     {
301         synchronized (this)
302         {
303             _pendingConnections--;
304             if (_queue.size() > 0)
305             {
306                 HttpExchange ex = _queue.removeFirst();
307                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
308                 ex.getEventListener().onException(throwable);
309             }
310         }
311     }
312 
313     public void onNewConnection(final HttpConnection connection) throws IOException
314     {
315         HttpConnection q_connection = null;
316 
317         synchronized (this)
318         {
319             _pendingConnections--;
320             _connections.add(connection);
321 
322             if (_newConnection > 0)
323             {
324                 q_connection = connection;
325                 _newConnection--;
326             }
327             else if (_queue.size() == 0)
328             {
329                 connection.setIdleTimeout();
330                 _idle.add(connection);
331             }
332             else
333             {
334                 EndPoint endPoint = connection.getEndPoint();
335                 if (isProxied() && endPoint instanceof SelectConnector.ProxySelectChannelEndPoint)
336                 {
337                     SelectConnector.ProxySelectChannelEndPoint proxyEndPoint = (SelectConnector.ProxySelectChannelEndPoint)endPoint;
338                     HttpExchange exchange = _queue.get(0);
339                     ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange);
340                     connect.setAddress(getProxy());
341                     send(connection, connect);
342                 }
343                 else
344                 {
345                     HttpExchange exchange = _queue.removeFirst();
346                     send(connection, exchange);
347                 }
348             }
349         }
350 
351         if (q_connection != null)
352         {
353             try
354             {
355                 _newQueue.put(q_connection);
356             }
357             catch (InterruptedException e)
358             {
359                 Log.ignore(e);
360             }
361         }
362     }
363 
364     public void returnConnection(HttpConnection connection, boolean close) throws IOException
365     {
366         if (connection.isReserved())
367             connection.setReserved(false);
368 
369         if (close)
370         {
371             try
372             {
373                 connection.close();
374             }
375             catch (IOException e)
376             {
377                 Log.ignore(e);
378             }
379         }
380 
381         if (!_client.isStarted())
382             return;
383 
384         if (!close && connection.getEndPoint().isOpen())
385         {
386             synchronized (this)
387             {
388                 if (_queue.size() == 0)
389                 {
390                     connection.setIdleTimeout();
391                     _idle.add(connection);
392                 }
393                 else
394                 {
395                     HttpExchange ex = _queue.removeFirst();
396                     send(connection, ex);
397                 }
398                 this.notifyAll();
399             }
400         }
401         else
402         {
403             synchronized (this)
404             {
405                 _connections.remove(connection);
406                 if (!_queue.isEmpty())
407                     startNewConnection();
408             }
409         }
410     }
411 
412     public void returnIdleConnection(HttpConnection connection)
413     {
414         try
415         {
416             connection.close();
417         }
418         catch (IOException e)
419         {
420             Log.ignore(e);
421         }
422 
423         synchronized (this)
424         {
425             _idle.remove(connection);
426             _connections.remove(connection);
427 
428             if (!_queue.isEmpty() && _client.isStarted())
429                 startNewConnection();
430         }
431     }
432 
433     public void send(HttpExchange ex) throws IOException
434     {
435         LinkedList<String> listeners = _client.getRegisteredListeners();
436 
437         if (listeners != null)
438         {
439             // Add registered listeners, fail if we can't load them
440             for (int i = listeners.size(); i > 0; --i)
441             {
442                 String listenerClass = listeners.get(i - 1);
443 
444                 try
445                 {
446                     Class listener = Class.forName(listenerClass);
447                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
448                     HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
449                     ex.setEventListener(elistener);
450                 }
451                 catch (Exception e)
452                 {
453                     e.printStackTrace();
454                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
455                 }
456             }
457         }
458 
459         // Security is supported by default and should be the first consulted
460         if (_client.hasRealms())
461         {
462             ex.setEventListener(new SecurityListener(this, ex));
463         }
464 
465         doSend(ex);
466     }
467 
468     public void resend(HttpExchange ex) throws IOException
469     {
470         ex.getEventListener().onRetry();
471         ex.reset();
472         doSend(ex);
473     }
474 
475     protected void doSend(HttpExchange ex) throws IOException
476     {
477         // add cookies
478         // TODO handle max-age etc.
479         if (_cookies != null)
480         {
481             StringBuilder buf = null;
482             for (HttpCookie cookie : _cookies)
483             {
484                 if (buf == null)
485                     buf = new StringBuilder();
486                 else
487                     buf.append("; ");
488                 buf.append(cookie.getName()); // TODO quotes
489                 buf.append("=");
490                 buf.append(cookie.getValue()); // TODO quotes
491             }
492             if (buf != null)
493                 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
494         }
495 
496         // Add any known authorizations
497         if (_authorizations != null)
498         {
499             Authentication auth = (Authentication)_authorizations.match(ex.getURI());
500             if (auth != null)
501                 (auth).setCredentials(ex);
502         }
503 
504         HttpConnection connection = getIdleConnection();
505         if (connection != null)
506         {
507             send(connection, ex);
508         }
509         else
510         {
511             synchronized (this)
512             {
513                 _queue.add(ex);
514                 if (_connections.size() + _pendingConnections < _maxConnections)
515                 {
516                     startNewConnection();
517                 }
518             }
519         }
520     }
521 
522     protected void send(HttpConnection connection, HttpExchange exchange) throws IOException
523     {
524         synchronized (this)
525         {
526             // If server closes the connection, put the exchange back
527             // to the exchange queue and recycle the connection
528             if(!connection.send(exchange))
529             {
530                 _queue.addFirst(exchange);
531                 returnIdleConnection(connection);
532             }
533         }
534     }
535 
536     @Override
537     public synchronized String toString()
538     {
539         return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
540     }
541 
542     public synchronized String toDetailString()
543     {
544         StringBuilder b = new StringBuilder();
545         b.append(toString());
546         b.append('\n');
547         synchronized (this)
548         {
549             for (HttpConnection connection : _connections)
550             {
551                 b.append(connection.toDetailString());
552                 if (_idle.contains(connection))
553                     b.append(" IDLE");
554                 b.append('\n');
555             }
556         }
557         b.append("--");
558         b.append('\n');
559 
560         return b.toString();
561     }
562 
563     public void setProxy(Address proxy)
564     {
565         _proxy = proxy;
566     }
567 
568     public Address getProxy()
569     {
570         return _proxy;
571     }
572 
573     public Authentication getProxyAuthentication()
574     {
575         return _proxyAuthentication;
576     }
577 
578     public void setProxyAuthentication(Authentication authentication)
579     {
580         _proxyAuthentication = authentication;
581     }
582 
583     public boolean isProxied()
584     {
585         return _proxy != null;
586     }
587 
588     public void close() throws IOException
589     {
590         synchronized (this)
591         {
592             for (HttpConnection connection : _connections)
593             {
594                 connection.close();
595             }
596         }
597     }
598 
599     private class ConnectExchange extends ContentExchange
600     {
601         private final SelectConnector.ProxySelectChannelEndPoint proxyEndPoint;
602         private final HttpExchange exchange;
603 
604         public ConnectExchange(Address serverAddress, SelectConnector.ProxySelectChannelEndPoint proxyEndPoint, HttpExchange exchange)
605         {
606             this.proxyEndPoint = proxyEndPoint;
607             this.exchange = exchange;
608             setMethod(HttpMethods.CONNECT);
609             String serverHostAndPort = serverAddress.toString();
610             setURI(serverHostAndPort);
611             addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
612             addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
613             addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
614         }
615 
616         @Override
617         protected void onResponseComplete() throws IOException
618         {
619             if (getResponseStatus() == HttpStatus.OK_200)
620             {
621                 proxyEndPoint.upgrade();
622             }
623             else
624             {
625                 onConnectionFailed(new ConnectException(exchange.getAddress().toString()));
626             }
627         }
628 
629         @Override
630         protected void onConnectionFailed(Throwable x)
631         {
632             HttpDestination.this.onConnectionFailed(x);
633         }
634     }
635 }