View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.lang.reflect.Constructor;
23  import java.net.ProtocolException;
24  import java.util.ArrayList;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.concurrent.ArrayBlockingQueue;
28  import java.util.concurrent.BlockingQueue;
29  import java.util.concurrent.RejectedExecutionException;
30  
31  import org.eclipse.jetty.client.HttpClient.Connector;
32  import org.eclipse.jetty.client.security.Authentication;
33  import org.eclipse.jetty.client.security.SecurityListener;
34  import org.eclipse.jetty.http.HttpCookie;
35  import org.eclipse.jetty.http.HttpHeaders;
36  import org.eclipse.jetty.http.HttpMethods;
37  import org.eclipse.jetty.http.HttpStatus;
38  import org.eclipse.jetty.http.PathMap;
39  import org.eclipse.jetty.io.Buffer;
40  import org.eclipse.jetty.io.ByteArrayBuffer;
41  import org.eclipse.jetty.io.Connection;
42  import org.eclipse.jetty.io.EndPoint;
43  import org.eclipse.jetty.util.component.AggregateLifeCycle;
44  import org.eclipse.jetty.util.component.Dumpable;
45  import org.eclipse.jetty.util.log.Log;
46  import org.eclipse.jetty.util.log.Logger;
47  
48  /**
49   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
50   */
51  public class HttpDestination implements Dumpable
52  {
53      private static final Logger LOG = Log.getLogger(HttpDestination.class);
54  
55      private final List<HttpExchange> _queue = new LinkedList<HttpExchange>();
56      private final List<AbstractHttpConnection> _connections = new LinkedList<AbstractHttpConnection>();
57      private final BlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
58      private final List<AbstractHttpConnection> _idle = new ArrayList<AbstractHttpConnection>();
59      private final HttpClient _client;
60      private final Address _address;
61      private final boolean _ssl;
62      private final ByteArrayBuffer _hostHeader;
63      private volatile int _maxConnections;
64      private volatile int _maxQueueSize;
65      private int _pendingConnections = 0;
66      private int _newConnection = 0;
67      private volatile Address _proxy;
68      private Authentication _proxyAuthentication;
69      private PathMap _authorizations;
70      private List<HttpCookie> _cookies;
71  
72  
73  
74      HttpDestination(HttpClient client, Address address, boolean ssl)
75      {
76          _client = client;
77          _address = address;
78          _ssl = ssl;
79          _maxConnections = _client.getMaxConnectionsPerAddress();
80          _maxQueueSize = _client.getMaxQueueSizePerAddress();
81          String addressString = address.getHost();
82          if (address.getPort() != (_ssl ? 443 : 80))
83              addressString += ":" + address.getPort();
84          _hostHeader = new ByteArrayBuffer(addressString);
85      }
86  
87      public HttpClient getHttpClient()
88      {
89          return _client;
90      }
91  
92      public Address getAddress()
93      {
94          return _address;
95      }
96  
97      public boolean isSecure()
98      {
99          return _ssl;
100     }
101 
102     public Buffer getHostHeader()
103     {
104         return _hostHeader;
105     }
106 
107     public int getMaxConnections()
108     {
109         return _maxConnections;
110     }
111 
112     public void setMaxConnections(int maxConnections)
113     {
114         this._maxConnections = maxConnections;
115     }
116 
117     public int getMaxQueueSize()
118     {
119         return _maxQueueSize;
120     }
121 
122     public void setMaxQueueSize(int maxQueueSize)
123     {
124         this._maxQueueSize = maxQueueSize;
125     }
126 
127     public int getConnections()
128     {
129         synchronized (this)
130         {
131             return _connections.size();
132         }
133     }
134 
135     public int getIdleConnections()
136     {
137         synchronized (this)
138         {
139             return _idle.size();
140         }
141     }
142 
143     public void addAuthorization(String pathSpec, Authentication authorization)
144     {
145         synchronized (this)
146         {
147             if (_authorizations == null)
148                 _authorizations = new PathMap();
149             _authorizations.put(pathSpec, authorization);
150         }
151 
152         // TODO query and remove methods
153     }
154 
155     public void addCookie(HttpCookie cookie)
156     {
157         synchronized (this)
158         {
159             if (_cookies == null)
160                 _cookies = new ArrayList<HttpCookie>();
161             _cookies.add(cookie);
162         }
163 
164         // TODO query, remove and age methods
165     }
166 
167     /**
168      * Get a connection. We either get an idle connection if one is available, or
169      * we make a new connection, if we have not yet reached maxConnections. If we
170      * have reached maxConnections, we wait until the number reduces.
171      *
172      * @param timeout max time prepared to block waiting to be able to get a connection
173      * @return a HttpConnection for this destination
174      * @throws IOException if an I/O error occurs
175      */
176     private AbstractHttpConnection getConnection(long timeout) throws IOException
177     {
178         AbstractHttpConnection connection = null;
179 
180         while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
181         {
182             boolean startConnection = false;
183             synchronized (this)
184             {
185                 int totalConnections = _connections.size() + _pendingConnections;
186                 if (totalConnections < _maxConnections)
187                 {
188                     _newConnection++;
189                     startConnection = true;
190                 }
191             }
192 
193             if (startConnection)
194             {
195                 startNewConnection();
196                 try
197                 {
198                     Object o = _newQueue.take();
199                     if (o instanceof AbstractHttpConnection)
200                     {
201                         connection = (AbstractHttpConnection)o;
202                     }
203                     else
204                         throw (IOException)o;
205                 }
206                 catch (InterruptedException e)
207                 {
208                     LOG.ignore(e);
209                 }
210             }
211             else
212             {
213                 try
214                 {
215                     Thread.currentThread();
216                     Thread.sleep(200);
217                     timeout -= 200;
218                 }
219                 catch (InterruptedException e)
220                 {
221                     LOG.ignore(e);
222                 }
223             }
224         }
225         return connection;
226     }
227 
228     public AbstractHttpConnection reserveConnection(long timeout) throws IOException
229     {
230         AbstractHttpConnection connection = getConnection(timeout);
231         if (connection != null)
232             connection.setReserved(true);
233         return connection;
234     }
235 
236     public AbstractHttpConnection getIdleConnection() throws IOException
237     {
238         AbstractHttpConnection connection = null;
239         while (true)
240         {
241             synchronized (this)
242             {
243                 if (connection != null)
244                 {
245                     _connections.remove(connection);
246                     connection.close();
247                     connection = null;
248                 }
249                 if (_idle.size() > 0)
250                     connection = _idle.remove(_idle.size() - 1);
251             }
252 
253             if (connection == null)
254             {
255                 return null;
256             }
257             
258             // Check if the connection was idle,
259             // but it expired just a moment ago
260             if (connection.cancelIdleTimeout())
261             {
262                 return connection;
263             }
264         }
265     }
266 
267     protected void startNewConnection()
268     {
269         try
270         {
271             synchronized (this)
272             {
273                 _pendingConnections++;
274             }
275             final Connector connector = _client._connector;
276             if (connector != null)
277                 connector.startConnection(this);
278         }
279         catch (Exception e)
280         {
281             LOG.debug(e);
282             onConnectionFailed(e);
283         }
284     }
285 
286     public void onConnectionFailed(Throwable throwable)
287     {
288         Throwable connect_failure = null;
289 
290         boolean startConnection = false;
291         synchronized (this)
292         {
293             _pendingConnections--;
294             if (_newConnection > 0)
295             {
296                 connect_failure = throwable;
297                 _newConnection--;
298             }
299             else if (_queue.size() > 0)
300             {
301                 HttpExchange ex = _queue.remove(0);
302                 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
303                     ex.getEventListener().onConnectionFailed(throwable);
304 
305                 // Since an existing connection had failed, we need to create a
306                 // connection if the  queue is not empty and client is running.
307                 if (!_queue.isEmpty() && _client.isStarted())
308                     startConnection = true;
309             }
310         }
311 
312         if (startConnection)
313             startNewConnection();
314 
315         if (connect_failure != null)
316         {
317             try
318             {
319                 _newQueue.put(connect_failure);
320             }
321             catch (InterruptedException e)
322             {
323                 LOG.ignore(e);
324             }
325         }
326     }
327 
328     public void onException(Throwable throwable)
329     {
330         synchronized (this)
331         {
332             _pendingConnections--;
333             if (_queue.size() > 0)
334             {
335                 HttpExchange ex = _queue.remove(0);
336                 if(ex.setStatus(HttpExchange.STATUS_EXCEPTED))
337                     ex.getEventListener().onException(throwable);
338             }
339         }
340     }
341 
342     public void onNewConnection(final AbstractHttpConnection connection) throws IOException
343     {
344         Connection q_connection = null;
345 
346         synchronized (this)
347         {
348             _pendingConnections--;
349             _connections.add(connection);
350 
351             if (_newConnection > 0)
352             {
353                 q_connection = connection;
354                 _newConnection--;
355             }
356             else if (_queue.size() == 0)
357             {
358                 connection.setIdleTimeout();
359                 _idle.add(connection);
360             }
361             else
362             {
363                 EndPoint endPoint = connection.getEndPoint();
364                 if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint)
365                 {
366                     SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint;
367                     HttpExchange exchange = _queue.get(0);
368                     ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange);
369                     connect.setAddress(getProxy());
370                     send(connection, connect);
371                 }
372                 else
373                 {
374                     HttpExchange exchange = _queue.remove(0);
375                     send(connection, exchange);
376                 }
377             }
378         }
379 
380         if (q_connection != null)
381         {
382             try
383             {
384                 _newQueue.put(q_connection);
385             }
386             catch (InterruptedException e)
387             {
388                 LOG.ignore(e);
389             }
390         }
391     }
392 
393     public void returnConnection(AbstractHttpConnection connection, boolean close) throws IOException
394     {
395         if (connection.isReserved())
396             connection.setReserved(false);
397 
398         if (close)
399         {
400             try
401             {
402                 connection.close();
403             }
404             catch (IOException e)
405             {
406                 LOG.ignore(e);
407             }
408         }
409 
410         if (!_client.isStarted())
411             return;
412 
413         if (!close && connection.getEndPoint().isOpen())
414         {
415             synchronized (this)
416             {
417                 if (_queue.size() == 0)
418                 {
419                     connection.setIdleTimeout();
420                     _idle.add(connection);
421                 }
422                 else
423                 {
424                     HttpExchange ex = _queue.remove(0);
425                     send(connection, ex);
426                 }
427                 this.notifyAll();
428             }
429         }
430         else
431         {
432             boolean startConnection = false;
433             synchronized (this)
434             {
435                 _connections.remove(connection);
436                 if (!_queue.isEmpty())
437                     startConnection = true;
438             }
439 
440             if (startConnection)
441                 startNewConnection();
442         }
443     }
444 
445     public void returnIdleConnection(AbstractHttpConnection connection)
446     {
447         // TODO work out the real idle time;
448         long idleForMs=connection!=null&&connection.getEndPoint()!=null?connection.getEndPoint().getMaxIdleTime():-1;
449         connection.onIdleExpired(idleForMs);
450 
451         boolean startConnection = false;
452         synchronized (this)
453         {
454             _idle.remove(connection);
455             _connections.remove(connection);
456 
457             if (!_queue.isEmpty() && _client.isStarted())
458                 startConnection = true;
459         }
460 
461         if (startConnection)
462             startNewConnection();
463     }
464 
465     public void send(HttpExchange ex) throws IOException
466     {
467         LinkedList<String> listeners = _client.getRegisteredListeners();
468 
469         if (listeners != null)
470         {
471             // Add registered listeners, fail if we can't load them
472             for (int i = listeners.size(); i > 0; --i)
473             {
474                 String listenerClass = listeners.get(i - 1);
475 
476                 try
477                 {
478                     Class listener = Class.forName(listenerClass);
479                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
480                     HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
481                     ex.setEventListener(elistener);
482                 }
483                 catch (final Exception e)
484                 {
485                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass)
486                     {
487                         {initCause(e);}
488                     };
489                 }
490             }
491         }
492 
493         // Security is supported by default and should be the first consulted
494         if (_client.hasRealms())
495         {
496             ex.setEventListener(new SecurityListener(this, ex));
497         }
498 
499         doSend(ex);
500     }
501 
502     public void resend(HttpExchange ex) throws IOException
503     {
504         ex.getEventListener().onRetry();
505         ex.reset();
506         doSend(ex);
507     }
508 
509     protected void doSend(HttpExchange ex) throws IOException
510     {
511         // add cookies
512         // TODO handle max-age etc.
513         if (_cookies != null)
514         {
515             StringBuilder buf = null;
516             for (HttpCookie cookie : _cookies)
517             {
518                 if (buf == null)
519                     buf = new StringBuilder();
520                 else
521                     buf.append("; ");
522                 buf.append(cookie.getName()); // TODO quotes
523                 buf.append("=");
524                 buf.append(cookie.getValue()); // TODO quotes
525             }
526             if (buf != null)
527                 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
528         }
529 
530         // Add any known authorizations
531         if (_authorizations != null)
532         {
533             Authentication auth = (Authentication)_authorizations.match(ex.getRequestURI());
534             if (auth != null)
535                 (auth).setCredentials(ex);
536         }
537 
538         // Schedule the timeout here, before we queue the exchange
539         // so that we count also the queue time in the timeout
540         ex.scheduleTimeout(this);
541 
542         AbstractHttpConnection connection = getIdleConnection();
543         if (connection != null)
544         {
545             send(connection, ex);
546         }
547         else
548         {
549             boolean startConnection = false;
550             synchronized (this)
551             {
552                 if (_queue.size() == _maxQueueSize)
553                     throw new RejectedExecutionException("Queue full for address " + _address);
554 
555                 _queue.add(ex);
556                 if (_connections.size() + _pendingConnections < _maxConnections)
557                     startConnection = true;
558             }
559 
560             if (startConnection)
561                 startNewConnection();
562         }
563     }
564 
565     protected void exchangeExpired(HttpExchange exchange)
566     {
567         // The exchange may expire while waiting in the
568         // destination queue, make sure it is removed
569         synchronized (this)
570         {
571             _queue.remove(exchange);
572         }
573     }
574 
575     protected void send(AbstractHttpConnection connection, HttpExchange exchange) throws IOException
576     {
577         synchronized (this)
578         {
579             // If server closes the connection, put the exchange back
580             // to the exchange queue and recycle the connection
581             if (!connection.send(exchange))
582             {
583                 if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
584                     _queue.add(0, exchange);
585                 returnIdleConnection(connection);
586             }
587         }
588     }
589 
590     @Override
591     public synchronized String toString()
592     {
593         return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n",hashCode(),_address.getHost(),_address.getPort(),_connections.size(),_maxConnections,_idle.size(),_queue.size(),_maxQueueSize);
594     }
595 
596     public synchronized String toDetailString()
597     {
598         StringBuilder b = new StringBuilder();
599         b.append(toString());
600         b.append('\n');
601         synchronized (this)
602         {
603             for (AbstractHttpConnection connection : _connections)
604             {
605                 b.append(connection.toDetailString());
606                 if (_idle.contains(connection))
607                     b.append(" IDLE");
608                 b.append('\n');
609             }
610         }
611         b.append("--");
612         b.append('\n');
613 
614         return b.toString();
615     }
616 
617     public void setProxy(Address proxy)
618     {
619         _proxy = proxy;
620     }
621 
622     public Address getProxy()
623     {
624         return _proxy;
625     }
626 
627     public Authentication getProxyAuthentication()
628     {
629         return _proxyAuthentication;
630     }
631 
632     public void setProxyAuthentication(Authentication authentication)
633     {
634         _proxyAuthentication = authentication;
635     }
636 
637     public boolean isProxied()
638     {
639         return _proxy != null;
640     }
641 
642     public void close() throws IOException
643     {
644         synchronized (this)
645         {
646             for (AbstractHttpConnection connection : _connections)
647             {
648                 connection.close();
649             }
650         }
651     }
652 
653     /* ------------------------------------------------------------ */
654     /**
655      * @see org.eclipse.jetty.util.component.Dumpable#dump()
656      */
657     public String dump()
658     {
659         return AggregateLifeCycle.dump(this);
660     }
661 
662     /* ------------------------------------------------------------ */
663     /**
664      * @see org.eclipse.jetty.util.component.Dumpable#dump(java.lang.Appendable, java.lang.String)
665      */
666     public void dump(Appendable out, String indent) throws IOException
667     {
668         synchronized (this)
669         {
670             out.append(String.valueOf(this)+"idle="+_idle.size()+" pending="+_pendingConnections).append("\n");
671             AggregateLifeCycle.dump(out,indent,_connections);
672         }
673     }
674 
675     private class ConnectExchange extends ContentExchange
676     {
677         private final SelectConnector.UpgradableEndPoint proxyEndPoint;
678         private final HttpExchange exchange;
679 
680         public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint, HttpExchange exchange)
681         {
682             this.proxyEndPoint = proxyEndPoint;
683             this.exchange = exchange;
684             setMethod(HttpMethods.CONNECT);
685             setVersion(exchange.getVersion());
686             String serverHostAndPort = serverAddress.toString();
687             setRequestURI(serverHostAndPort);
688             addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
689             addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
690             addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
691         }
692 
693         @Override
694         protected void onResponseComplete() throws IOException
695         {
696             int responseStatus = getResponseStatus();
697             if (responseStatus == HttpStatus.OK_200)
698             {
699                 proxyEndPoint.upgrade();
700             }
701             else if(responseStatus == HttpStatus.GATEWAY_TIMEOUT_504)
702             {
703                 onExpire();
704             }
705             else
706             {
707                 onException(new ProtocolException("Proxy: " + proxyEndPoint.getRemoteAddr() +":" + proxyEndPoint.getRemotePort() + " didn't return http return code 200, but " + responseStatus + " while trying to request: " + exchange.getAddress().toString()));
708             }
709         }
710 
711         @Override
712         protected void onConnectionFailed(Throwable x)
713         {
714             HttpDestination.this.onConnectionFailed(x);
715         }
716 
717         @Override
718         protected void onException(Throwable x)
719         {
720             _queue.remove(exchange);
721             if (exchange.setStatus(STATUS_EXCEPTED))
722                 exchange.getEventListener().onException(x);
723         }
724 
725         @Override
726         protected void onExpire()
727         {
728             _queue.remove(exchange);
729             if (exchange.setStatus(STATUS_EXPIRED))
730                 exchange.getEventListener().onExpire();
731         }
732 
733     }
734 }