View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.lang.reflect.Constructor;
18  import java.net.ConnectException;
19  import java.util.ArrayList;
20  import java.util.LinkedList;
21  import java.util.List;
22  import java.util.concurrent.ArrayBlockingQueue;
23  
24  import org.eclipse.jetty.client.HttpClient.Connector;
25  import org.eclipse.jetty.client.security.Authentication;
26  import org.eclipse.jetty.client.security.SecurityListener;
27  import org.eclipse.jetty.http.HttpCookie;
28  import org.eclipse.jetty.http.HttpHeaders;
29  import org.eclipse.jetty.http.HttpMethods;
30  import org.eclipse.jetty.http.HttpStatus;
31  import org.eclipse.jetty.http.PathMap;
32  import org.eclipse.jetty.io.Buffer;
33  import org.eclipse.jetty.io.ByteArrayBuffer;
34  import org.eclipse.jetty.io.Connection;
35  import org.eclipse.jetty.io.EndPoint;
36  import org.eclipse.jetty.util.log.Log;
37  
38  /**
39   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
40   */
41  public class HttpDestination
42  {
43      private final ByteArrayBuffer _hostHeader;
44      private final Address _address;
45      private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
46      private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
47      private final HttpClient _client;
48      private final boolean _ssl;
49      private final int _maxConnections;
50      private int _pendingConnections = 0;
51      private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
52      private int _newConnection = 0;
53      private Address _proxy;
54      private Authentication _proxyAuthentication;
55      private PathMap _authorizations;
56      private List<HttpCookie> _cookies;
57  
58      public void dump() throws IOException
59      {
60          synchronized (this)
61          {
62              Log.info(this.toString());
63              Log.info("connections=" + _connections.size());
64              Log.info("idle=" + _idle.size());
65              Log.info("pending=" + _pendingConnections);
66              for (HttpConnection c : _connections)
67              {
68                  if (!c.isIdle())
69                      c.dump();
70              }
71          }
72      }
73  
74      /* The queue of exchanged for this destination if connections are limited */
75      private LinkedList<HttpExchange> _queue = new LinkedList<HttpExchange>();
76  
77      HttpDestination(HttpClient client, Address address, boolean ssl, int maxConnections)
78      {
79          _client = client;
80          _address = address;
81          _ssl = ssl;
82          _maxConnections = maxConnections;
83          String addressString = address.getHost();
84          if (address.getPort() != (_ssl ? 443 : 80))
85              addressString += ":" + address.getPort();
86          _hostHeader = new ByteArrayBuffer(addressString);
87      }
88  
89      public Address getAddress()
90      {
91          return _address;
92      }
93  
94      public Buffer getHostHeader()
95      {
96          return _hostHeader;
97      }
98  
99      public HttpClient getHttpClient()
100     {
101         return _client;
102     }
103 
104     public boolean isSecure()
105     {
106         return _ssl;
107     }
108 
109     public int getConnections()
110     {
111         synchronized (this)
112         {
113             return _connections.size();
114         }
115     }
116 
117     public int getIdleConnections()
118     {
119         synchronized (this)
120         {
121             return _idle.size();
122         }
123     }
124 
125     public void addAuthorization(String pathSpec, Authentication authorization)
126     {
127         synchronized (this)
128         {
129             if (_authorizations == null)
130                 _authorizations = new PathMap();
131             _authorizations.put(pathSpec, authorization);
132         }
133 
134         // TODO query and remove methods
135     }
136 
137     public void addCookie(HttpCookie cookie)
138     {
139         synchronized (this)
140         {
141             if (_cookies == null)
142                 _cookies = new ArrayList<HttpCookie>();
143             _cookies.add(cookie);
144         }
145 
146         // TODO query, remove and age methods
147     }
148 
149     /**
150      * Get a connection. We either get an idle connection if one is available, or
151      * we make a new connection, if we have not yet reached maxConnections. If we
152      * have reached maxConnections, we wait until the number reduces.
153      *
154      * @param timeout max time prepared to block waiting to be able to get a connection
155      * @return a HttpConnection for this destination
156      * @throws IOException if an I/O error occurs
157      */
158     private HttpConnection getConnection(long timeout) throws IOException
159     {
160         HttpConnection connection = null;
161 
162         while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
163         {
164             boolean startConnection = false;
165             synchronized (this)
166             {
167                 int totalConnections = _connections.size() + _pendingConnections;
168                 if (totalConnections < _maxConnections)
169                 {
170                     _newConnection++;
171                     startConnection = true;
172                 }
173             }
174 
175             if (startConnection)
176             {
177                 startNewConnection();
178                 try
179                 {
180                     Object o = _newQueue.take();
181                     if (o instanceof HttpConnection)
182                     {
183                         connection = (HttpConnection)o;
184                     }
185                     else
186                         throw (IOException)o;
187                 }
188                 catch (InterruptedException e)
189                 {
190                     Log.ignore(e);
191                 }
192             }
193             else
194             {
195                 try
196                 {
197                     Thread.currentThread();
198                     Thread.sleep(200);
199                     timeout -= 200;
200                 }
201                 catch (InterruptedException e)
202                 {
203                     Log.ignore(e);
204                 }
205             }
206         }
207         return connection;
208     }
209 
210     public HttpConnection reserveConnection(long timeout) throws IOException
211     {
212         HttpConnection connection = getConnection(timeout);
213         if (connection != null)
214             connection.setReserved(true);
215         return connection;
216     }
217 
218     public HttpConnection getIdleConnection() throws IOException
219     {
220         HttpConnection connection = null;
221         while (true)
222         {
223             synchronized (this)
224             {
225                 if (connection != null)
226                 {
227                     _connections.remove(connection);
228                     connection.close();
229                     connection = null;
230                 }
231                 if (_idle.size() > 0)
232                     connection = _idle.remove(_idle.size() - 1);
233             }
234 
235             if (connection == null)
236                 return null;
237 
238             // Check if the connection was idle,
239             // but it expired just a moment ago
240             if (connection.cancelIdleTimeout())
241                 return connection;
242         }
243     }
244 
245     protected void startNewConnection()
246     {
247         try
248         {
249             synchronized (this)
250             {
251                 _pendingConnections++;
252             }
253             final Connector connector = _client._connector;
254             if (connector != null)
255                 connector.startConnection(this);
256         }
257         catch (Exception e)
258         {
259             Log.debug(e);
260             onConnectionFailed(e);
261         }
262     }
263 
264     public void onConnectionFailed(Throwable throwable)
265     {
266         Throwable connect_failure = null;
267 
268         boolean startConnection = false;
269         synchronized (this)
270         {
271             _pendingConnections--;
272             if (_newConnection > 0)
273             {
274                 connect_failure = throwable;
275                 _newConnection--;
276             }
277             else if (_queue.size() > 0)
278             {
279                 HttpExchange ex = _queue.removeFirst();
280                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
281                 ex.getEventListener().onConnectionFailed(throwable);
282 
283                 // Since an existing connection had failed, we need to create a
284                 // connection if the  queue is not empty and client is running.
285                 if (!_queue.isEmpty() && _client.isStarted())
286                     startConnection = true;
287             }
288         }
289 
290         if (startConnection)
291             startNewConnection();
292 
293         if (connect_failure != null)
294         {
295             try
296             {
297                 _newQueue.put(connect_failure);
298             }
299             catch (InterruptedException e)
300             {
301                 Log.ignore(e);
302             }
303         }
304     }
305 
306     public void onException(Throwable throwable)
307     {
308         synchronized (this)
309         {
310             _pendingConnections--;
311             if (_queue.size() > 0)
312             {
313                 HttpExchange ex = _queue.removeFirst();
314                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
315                 ex.getEventListener().onException(throwable);
316             }
317         }
318     }
319 
320     public void onNewConnection(final HttpConnection connection) throws IOException
321     {
322         Connection q_connection = null;
323 
324         synchronized (this)
325         {
326             _pendingConnections--;
327             _connections.add(connection);
328 
329             if (_newConnection > 0)
330             {
331                 q_connection = connection;
332                 _newConnection--;
333             }
334             else if (_queue.size() == 0)
335             {
336                 connection.setIdleTimeout();
337                 _idle.add(connection);
338             }
339             else
340             {
341                 EndPoint endPoint = connection.getEndPoint();
342                 if (isProxied() && endPoint instanceof SelectConnector.ProxySelectChannelEndPoint)
343                 {
344                     SelectConnector.ProxySelectChannelEndPoint proxyEndPoint = (SelectConnector.ProxySelectChannelEndPoint)endPoint;
345                     HttpExchange exchange = _queue.get(0);
346                     ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange);
347                     connect.setAddress(getProxy());
348                     send(connection, connect);
349                 }
350                 else
351                 {
352                     HttpExchange exchange = _queue.removeFirst();
353                     send(connection, exchange);
354                 }
355             }
356         }
357 
358         if (q_connection != null)
359         {
360             try
361             {
362                 _newQueue.put(q_connection);
363             }
364             catch (InterruptedException e)
365             {
366                 Log.ignore(e);
367             }
368         }
369     }
370 
371     public void returnConnection(HttpConnection connection, boolean close) throws IOException
372     {
373         if (connection.isReserved())
374             connection.setReserved(false);
375 
376         if (close)
377         {
378             try
379             {
380                 connection.close();
381             }
382             catch (IOException e)
383             {
384                 Log.ignore(e);
385             }
386         }
387 
388         if (!_client.isStarted())
389             return;
390 
391         if (!close && connection.getEndPoint().isOpen())
392         {
393             synchronized (this)
394             {
395                 if (_queue.size() == 0)
396                 {
397                     connection.setIdleTimeout();
398                     _idle.add(connection);
399                 }
400                 else
401                 {
402                     HttpExchange ex = _queue.removeFirst();
403                     send(connection, ex);
404                 }
405                 this.notifyAll();
406             }
407         }
408         else
409         {
410             boolean startConnection = false;
411             synchronized (this)
412             {
413                 _connections.remove(connection);
414                 if (!_queue.isEmpty())
415                     startConnection = true;
416             }
417 
418             if (startConnection)
419                 startNewConnection();
420         }
421     }
422 
423     public void returnIdleConnection(HttpConnection connection)
424     {
425         try
426         {
427             connection.close();
428         }
429         catch (IOException e)
430         {
431             Log.ignore(e);
432         }
433 
434         boolean startConnection = false;
435         synchronized (this)
436         {
437             _idle.remove(connection);
438             _connections.remove(connection);
439 
440             if (!_queue.isEmpty() && _client.isStarted())
441                 startConnection = true;
442         }
443 
444         if (startConnection)
445             startNewConnection();
446     }
447 
448     public void send(HttpExchange ex) throws IOException
449     {
450         LinkedList<String> listeners = _client.getRegisteredListeners();
451 
452         if (listeners != null)
453         {
454             // Add registered listeners, fail if we can't load them
455             for (int i = listeners.size(); i > 0; --i)
456             {
457                 String listenerClass = listeners.get(i - 1);
458 
459                 try
460                 {
461                     Class listener = Class.forName(listenerClass);
462                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
463                     HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
464                     ex.setEventListener(elistener);
465                 }
466                 catch (Exception e)
467                 {
468                     e.printStackTrace();
469                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
470                 }
471             }
472         }
473 
474         // Security is supported by default and should be the first consulted
475         if (_client.hasRealms())
476         {
477             ex.setEventListener(new SecurityListener(this, ex));
478         }
479 
480         doSend(ex);
481     }
482 
483     public void resend(HttpExchange ex) throws IOException
484     {
485         ex.getEventListener().onRetry();
486         ex.reset();
487         doSend(ex);
488     }
489 
490     protected void doSend(HttpExchange ex) throws IOException
491     {
492         // add cookies
493         // TODO handle max-age etc.
494         if (_cookies != null)
495         {
496             StringBuilder buf = null;
497             for (HttpCookie cookie : _cookies)
498             {
499                 if (buf == null)
500                     buf = new StringBuilder();
501                 else
502                     buf.append("; ");
503                 buf.append(cookie.getName()); // TODO quotes
504                 buf.append("=");
505                 buf.append(cookie.getValue()); // TODO quotes
506             }
507             if (buf != null)
508                 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
509         }
510 
511         // Add any known authorizations
512         if (_authorizations != null)
513         {
514             Authentication auth = (Authentication)_authorizations.match(ex.getURI());
515             if (auth != null)
516                 (auth).setCredentials(ex);
517         }
518 
519         HttpConnection connection = getIdleConnection();
520         if (connection != null)
521         {
522             send(connection, ex);
523         }
524         else
525         {
526             boolean startConnection = false;
527             synchronized (this)
528             {
529                 _queue.add(ex);
530                 if (_connections.size() + _pendingConnections < _maxConnections)
531                     startConnection = true;
532             }
533 
534             if (startConnection)
535                 startNewConnection();
536         }
537     }
538 
539     protected void send(HttpConnection connection, HttpExchange exchange) throws IOException
540     {
541         synchronized (this)
542         {
543             // If server closes the connection, put the exchange back
544             // to the exchange queue and recycle the connection
545             if (!connection.send(exchange))
546             {
547                 _queue.addFirst(exchange);
548                 returnIdleConnection(connection);
549             }
550         }
551     }
552 
553     @Override
554     public synchronized String toString()
555     {
556         return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
557     }
558 
559     public synchronized String toDetailString()
560     {
561         StringBuilder b = new StringBuilder();
562         b.append(toString());
563         b.append('\n');
564         synchronized (this)
565         {
566             for (HttpConnection connection : _connections)
567             {
568                 b.append(connection.toDetailString());
569                 if (_idle.contains(connection))
570                     b.append(" IDLE");
571                 b.append('\n');
572             }
573         }
574         b.append("--");
575         b.append('\n');
576 
577         return b.toString();
578     }
579 
580     public void setProxy(Address proxy)
581     {
582         _proxy = proxy;
583     }
584 
585     public Address getProxy()
586     {
587         return _proxy;
588     }
589 
590     public Authentication getProxyAuthentication()
591     {
592         return _proxyAuthentication;
593     }
594 
595     public void setProxyAuthentication(Authentication authentication)
596     {
597         _proxyAuthentication = authentication;
598     }
599 
600     public boolean isProxied()
601     {
602         return _proxy != null;
603     }
604 
605     public void close() throws IOException
606     {
607         synchronized (this)
608         {
609             for (HttpConnection connection : _connections)
610             {
611                 connection.close();
612             }
613         }
614     }
615 
616     private class ConnectExchange extends ContentExchange
617     {
618         private final SelectConnector.ProxySelectChannelEndPoint proxyEndPoint;
619         private final HttpExchange exchange;
620 
621         public ConnectExchange(Address serverAddress, SelectConnector.ProxySelectChannelEndPoint proxyEndPoint, HttpExchange exchange)
622         {
623             this.proxyEndPoint = proxyEndPoint;
624             this.exchange = exchange;
625             setMethod(HttpMethods.CONNECT);
626             String serverHostAndPort = serverAddress.toString();
627             setURI(serverHostAndPort);
628             addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
629             addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
630             addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
631         }
632 
633         @Override
634         protected void onResponseComplete() throws IOException
635         {
636             if (getResponseStatus() == HttpStatus.OK_200)
637             {
638                 proxyEndPoint.upgrade();
639             }
640             else
641             {
642                 onConnectionFailed(new ConnectException(exchange.getAddress().toString()));
643             }
644         }
645 
646         @Override
647         protected void onConnectionFailed(Throwable x)
648         {
649             HttpDestination.this.onConnectionFailed(x);
650         }
651     }
652 }