1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.lang.reflect.Constructor;
23 import java.net.ProtocolException;
24 import java.util.ArrayList;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.RejectedExecutionException;
30
31 import org.eclipse.jetty.client.HttpClient.Connector;
32 import org.eclipse.jetty.client.security.Authentication;
33 import org.eclipse.jetty.client.security.SecurityListener;
34 import org.eclipse.jetty.http.HttpCookie;
35 import org.eclipse.jetty.http.HttpHeaders;
36 import org.eclipse.jetty.http.HttpMethods;
37 import org.eclipse.jetty.http.HttpStatus;
38 import org.eclipse.jetty.http.PathMap;
39 import org.eclipse.jetty.io.Buffer;
40 import org.eclipse.jetty.io.ByteArrayBuffer;
41 import org.eclipse.jetty.io.Connection;
42 import org.eclipse.jetty.io.EndPoint;
43 import org.eclipse.jetty.util.component.AggregateLifeCycle;
44 import org.eclipse.jetty.util.component.Dumpable;
45 import org.eclipse.jetty.util.log.Log;
46 import org.eclipse.jetty.util.log.Logger;
47
48
49
50
51 public class HttpDestination implements Dumpable
52 {
53 private static final Logger LOG = Log.getLogger(HttpDestination.class);
54
55 private final List<HttpExchange> _exchanges = new LinkedList<HttpExchange>();
56 private final List<AbstractHttpConnection> _connections = new LinkedList<AbstractHttpConnection>();
57 private final BlockingQueue<Object> _reservedConnections = new ArrayBlockingQueue<Object>(10, true);
58 private final List<AbstractHttpConnection> _idleConnections = new ArrayList<AbstractHttpConnection>();
59 private final HttpClient _client;
60 private final Address _address;
61 private final boolean _ssl;
62 private final ByteArrayBuffer _hostHeader;
63 private volatile int _maxConnections;
64 private volatile int _maxQueueSize;
65 private int _pendingConnections = 0;
66 private int _pendingReservedConnections = 0;
67 private volatile Address _proxy;
68 private Authentication _proxyAuthentication;
69 private PathMap _authorizations;
70 private List<HttpCookie> _cookies;
71
72 HttpDestination(HttpClient client, Address address, boolean ssl)
73 {
74 _client = client;
75 _address = address;
76 _ssl = ssl;
77 _maxConnections = _client.getMaxConnectionsPerAddress();
78 _maxQueueSize = _client.getMaxQueueSizePerAddress();
79 String addressString = address.getHost();
80 if (address.getPort() != (_ssl ? 443 : 80))
81 addressString += ":" + address.getPort();
82 _hostHeader = new ByteArrayBuffer(addressString);
83 }
84
85 public HttpClient getHttpClient()
86 {
87 return _client;
88 }
89
90 public Address getAddress()
91 {
92 return _address;
93 }
94
95 public boolean isSecure()
96 {
97 return _ssl;
98 }
99
100 public Buffer getHostHeader()
101 {
102 return _hostHeader;
103 }
104
105 public int getMaxConnections()
106 {
107 return _maxConnections;
108 }
109
110 public void setMaxConnections(int maxConnections)
111 {
112 this._maxConnections = maxConnections;
113 }
114
115 public int getMaxQueueSize()
116 {
117 return _maxQueueSize;
118 }
119
120 public void setMaxQueueSize(int maxQueueSize)
121 {
122 this._maxQueueSize = maxQueueSize;
123 }
124
125 public int getConnections()
126 {
127 synchronized (this)
128 {
129 return _connections.size();
130 }
131 }
132
133 public int getIdleConnections()
134 {
135 synchronized (this)
136 {
137 return _idleConnections.size();
138 }
139 }
140
141 public void addAuthorization(String pathSpec, Authentication authorization)
142 {
143 synchronized (this)
144 {
145 if (_authorizations == null)
146 _authorizations = new PathMap();
147 _authorizations.put(pathSpec, authorization);
148 }
149
150
151 }
152
153 public void addCookie(HttpCookie cookie)
154 {
155 synchronized (this)
156 {
157 if (_cookies == null)
158 _cookies = new ArrayList<HttpCookie>();
159 _cookies.add(cookie);
160 }
161
162
163 }
164
165
166
167
168
169
170
171
172
173
174 private AbstractHttpConnection getConnection(long timeout) throws IOException
175 {
176 AbstractHttpConnection connection = null;
177
178 while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
179 {
180 boolean startConnection = false;
181 synchronized (this)
182 {
183 int totalConnections = _connections.size() + _pendingConnections;
184 if (totalConnections < _maxConnections)
185 {
186 _pendingReservedConnections++;
187 startConnection = true;
188 }
189 }
190
191 if (startConnection)
192 {
193 startNewConnection();
194 try
195 {
196 Object o = _reservedConnections.take();
197 if (o instanceof AbstractHttpConnection)
198 {
199 connection = (AbstractHttpConnection)o;
200 }
201 else
202 throw (IOException)o;
203 }
204 catch (InterruptedException e)
205 {
206 LOG.ignore(e);
207 }
208 }
209 else
210 {
211 try
212 {
213 Thread.currentThread();
214 Thread.sleep(200);
215 timeout -= 200;
216 }
217 catch (InterruptedException e)
218 {
219 LOG.ignore(e);
220 }
221 }
222 }
223 return connection;
224 }
225
226 public AbstractHttpConnection reserveConnection(long timeout) throws IOException
227 {
228 AbstractHttpConnection connection = getConnection(timeout);
229 if (connection != null)
230 connection.setReserved(true);
231 return connection;
232 }
233
234 public AbstractHttpConnection getIdleConnection() throws IOException
235 {
236 AbstractHttpConnection connection = null;
237 while (true)
238 {
239 synchronized (this)
240 {
241 if (connection != null)
242 {
243 _connections.remove(connection);
244 connection.close();
245 connection = null;
246 }
247 if (_idleConnections.size() > 0)
248 connection = _idleConnections.remove(_idleConnections.size() - 1);
249 }
250
251 if (connection == null)
252 {
253 return null;
254 }
255
256
257
258 if (connection.cancelIdleTimeout())
259 {
260 return connection;
261 }
262 }
263 }
264
265 protected void startNewConnection()
266 {
267 try
268 {
269 synchronized (this)
270 {
271 _pendingConnections++;
272 }
273 final Connector connector = _client._connector;
274 if (connector != null)
275 connector.startConnection(this);
276 }
277 catch (Exception e)
278 {
279 LOG.debug(e);
280 onConnectionFailed(e);
281 }
282 }
283
284 public void onConnectionFailed(Throwable throwable)
285 {
286 Throwable connect_failure = null;
287
288 boolean startConnection = false;
289 synchronized (this)
290 {
291 _pendingConnections--;
292 if (_pendingReservedConnections > 0)
293 {
294 connect_failure = throwable;
295 _pendingReservedConnections--;
296 }
297 else if (_exchanges.size() > 0)
298 {
299 HttpExchange ex = _exchanges.remove(0);
300 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
301 ex.getEventListener().onConnectionFailed(throwable);
302
303
304
305 if (!_exchanges.isEmpty() && _client.isStarted())
306 startConnection = true;
307 }
308 }
309
310 if (startConnection)
311 startNewConnection();
312
313 if (connect_failure != null)
314 {
315 try
316 {
317 _reservedConnections.put(connect_failure);
318 }
319 catch (InterruptedException e)
320 {
321 LOG.ignore(e);
322 }
323 }
324 }
325
326 public void onException(Throwable throwable)
327 {
328 synchronized (this)
329 {
330 _pendingConnections--;
331 if (_exchanges.size() > 0)
332 {
333 HttpExchange ex = _exchanges.remove(0);
334 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
335 ex.getEventListener().onException(throwable);
336 }
337 }
338 }
339
340 public void onNewConnection(final AbstractHttpConnection connection) throws IOException
341 {
342 Connection reservedConnection = null;
343
344 synchronized (this)
345 {
346 _pendingConnections--;
347 _connections.add(connection);
348
349 if (_pendingReservedConnections > 0)
350 {
351 reservedConnection = connection;
352 _pendingReservedConnections--;
353 }
354 else
355 {
356
357 EndPoint endPoint = connection.getEndPoint();
358 if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint)
359 {
360 SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint;
361 ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint);
362 connect.setAddress(getProxy());
363 LOG.debug("Establishing tunnel to {} via {}", getAddress(), getProxy());
364 send(connection, connect);
365 }
366 else
367 {
368
369 if (_exchanges.size() == 0)
370 {
371 LOG.debug("No exchanges for new connection {}", connection);
372 connection.setIdleTimeout();
373 _idleConnections.add(connection);
374 }
375 else
376 {
377 HttpExchange exchange = _exchanges.remove(0);
378 send(connection, exchange);
379 }
380 }
381 }
382 }
383
384 if (reservedConnection != null)
385 {
386 try
387 {
388 _reservedConnections.put(reservedConnection);
389 }
390 catch (InterruptedException e)
391 {
392 LOG.ignore(e);
393 }
394 }
395 }
396
397 public void returnConnection(AbstractHttpConnection connection, boolean close) throws IOException
398 {
399 if (connection.isReserved())
400 connection.setReserved(false);
401
402 if (close)
403 {
404 try
405 {
406 connection.close();
407 }
408 catch (IOException e)
409 {
410 LOG.ignore(e);
411 }
412 }
413
414 if (!_client.isStarted())
415 return;
416
417 if (!close && connection.getEndPoint().isOpen())
418 {
419 synchronized (this)
420 {
421 if (_exchanges.size() == 0)
422 {
423 connection.setIdleTimeout();
424 _idleConnections.add(connection);
425 }
426 else
427 {
428 HttpExchange ex = _exchanges.remove(0);
429 send(connection, ex);
430 }
431 this.notifyAll();
432 }
433 }
434 else
435 {
436 boolean startConnection = false;
437 synchronized (this)
438 {
439 _connections.remove(connection);
440 if (!_exchanges.isEmpty())
441 startConnection = true;
442 }
443
444 if (startConnection)
445 startNewConnection();
446 }
447 }
448
449 public void returnIdleConnection(AbstractHttpConnection connection)
450 {
451
452 long idleForMs = connection.getEndPoint() != null ? connection.getEndPoint().getMaxIdleTime() : -1;
453 connection.onIdleExpired(idleForMs);
454
455 boolean startConnection = false;
456 synchronized (this)
457 {
458 _idleConnections.remove(connection);
459 _connections.remove(connection);
460
461 if (!_exchanges.isEmpty() && _client.isStarted())
462 startConnection = true;
463 }
464
465 if (startConnection)
466 startNewConnection();
467 }
468
469 public void send(HttpExchange ex) throws IOException
470 {
471 LinkedList<String> listeners = _client.getRegisteredListeners();
472
473 if (listeners != null)
474 {
475
476 for (int i = listeners.size(); i > 0; --i)
477 {
478 String listenerClass = listeners.get(i - 1);
479 try
480 {
481 Class<?> listener = Class.forName(listenerClass);
482 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
483 HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
484 ex.setEventListener(elistener);
485 }
486 catch (final Exception e)
487 {
488 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass)
489 {
490 {
491 initCause(e);
492 }
493 };
494 }
495 }
496 }
497
498
499 if (_client.hasRealms())
500 {
501 ex.setEventListener(new SecurityListener(this, ex));
502 }
503
504 doSend(ex);
505 }
506
507 public void resend(HttpExchange ex) throws IOException
508 {
509 ex.getEventListener().onRetry();
510 ex.reset();
511 doSend(ex);
512 }
513
514 protected void doSend(HttpExchange ex) throws IOException
515 {
516
517
518 if (_cookies != null)
519 {
520 StringBuilder buf = null;
521 for (HttpCookie cookie : _cookies)
522 {
523 if (buf == null)
524 buf = new StringBuilder();
525 else
526 buf.append("; ");
527 buf.append(cookie.getName());
528 buf.append("=");
529 buf.append(cookie.getValue());
530 }
531 if (buf != null)
532 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
533 }
534
535
536 if (_authorizations != null)
537 {
538 Authentication auth = (Authentication)_authorizations.match(ex.getRequestURI());
539 if (auth != null)
540 (auth).setCredentials(ex);
541 }
542
543
544
545 ex.scheduleTimeout(this);
546
547 AbstractHttpConnection connection = getIdleConnection();
548 if (connection != null)
549 {
550 send(connection, ex);
551 }
552 else
553 {
554 boolean startConnection = false;
555 synchronized (this)
556 {
557 if (_exchanges.size() == _maxQueueSize)
558 throw new RejectedExecutionException("Queue full for address " + _address);
559
560 _exchanges.add(ex);
561 if (_connections.size() + _pendingConnections < _maxConnections)
562 startConnection = true;
563 }
564
565 if (startConnection)
566 startNewConnection();
567 }
568 }
569
570 protected void exchangeExpired(HttpExchange exchange)
571 {
572
573
574 synchronized (this)
575 {
576 _exchanges.remove(exchange);
577 }
578 }
579
580 protected void send(AbstractHttpConnection connection, HttpExchange exchange) throws IOException
581 {
582 synchronized (this)
583 {
584
585
586 if (!connection.send(exchange))
587 {
588 if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
589 _exchanges.add(0, exchange);
590 returnIdleConnection(connection);
591 }
592 }
593 }
594
595 @Override
596 public synchronized String toString()
597 {
598 return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n", hashCode(), _address.getHost(), _address.getPort(), _connections.size(), _maxConnections, _idleConnections.size(), _exchanges.size(), _maxQueueSize);
599 }
600
601 public synchronized String toDetailString()
602 {
603 StringBuilder b = new StringBuilder();
604 b.append(toString());
605 b.append('\n');
606 synchronized (this)
607 {
608 for (AbstractHttpConnection connection : _connections)
609 {
610 b.append(connection.toDetailString());
611 if (_idleConnections.contains(connection))
612 b.append(" IDLE");
613 b.append('\n');
614 }
615 }
616 b.append("--");
617 b.append('\n');
618
619 return b.toString();
620 }
621
622 public void setProxy(Address proxy)
623 {
624 _proxy = proxy;
625 }
626
627 public Address getProxy()
628 {
629 return _proxy;
630 }
631
632 public Authentication getProxyAuthentication()
633 {
634 return _proxyAuthentication;
635 }
636
637 public void setProxyAuthentication(Authentication authentication)
638 {
639 _proxyAuthentication = authentication;
640 }
641
642 public boolean isProxied()
643 {
644 return _proxy != null;
645 }
646
647 public void close() throws IOException
648 {
649 synchronized (this)
650 {
651 for (AbstractHttpConnection connection : _connections)
652 {
653 connection.close();
654 }
655 }
656 }
657
658 public String dump()
659 {
660 return AggregateLifeCycle.dump(this);
661 }
662
663 public void dump(Appendable out, String indent) throws IOException
664 {
665 synchronized (this)
666 {
667 out.append(String.valueOf(this));
668 out.append("idle=");
669 out.append(String.valueOf(_idleConnections.size()));
670 out.append(" pending=");
671 out.append(String.valueOf(_pendingConnections));
672 out.append("\n");
673 AggregateLifeCycle.dump(out, indent, _connections);
674 }
675 }
676
677 private class ConnectExchange extends ContentExchange
678 {
679 private final SelectConnector.UpgradableEndPoint proxyEndPoint;
680
681 public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint)
682 {
683 this.proxyEndPoint = proxyEndPoint;
684 setMethod(HttpMethods.CONNECT);
685 String serverHostAndPort = serverAddress.toString();
686 setRequestURI(serverHostAndPort);
687 addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
688 addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
689 addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
690 }
691
692 @Override
693 protected void onResponseComplete() throws IOException
694 {
695 int responseStatus = getResponseStatus();
696 if (responseStatus == HttpStatus.OK_200)
697 {
698 proxyEndPoint.upgrade();
699 }
700 else if (responseStatus == HttpStatus.GATEWAY_TIMEOUT_504)
701 {
702 onExpire();
703 }
704 else
705 {
706 onException(new ProtocolException("Proxy: " + proxyEndPoint.getRemoteAddr() + ":" + proxyEndPoint.getRemotePort() + " didn't return http return code 200, but " + responseStatus));
707 }
708 }
709
710 @Override
711 protected void onConnectionFailed(Throwable x)
712 {
713 HttpDestination.this.onConnectionFailed(x);
714 }
715
716 @Override
717 protected void onException(Throwable x)
718 {
719 HttpExchange exchange = null;
720 synchronized (HttpDestination.this)
721 {
722 if (!_exchanges.isEmpty())
723 exchange = _exchanges.remove(0);
724 }
725 if (exchange != null && exchange.setStatus(STATUS_EXCEPTED))
726 exchange.getEventListener().onException(x);
727 }
728
729 @Override
730 protected void onExpire()
731 {
732 HttpExchange exchange = null;
733 synchronized (HttpDestination.this)
734 {
735 if (!_exchanges.isEmpty())
736 exchange = _exchanges.remove(0);
737 }
738 if (exchange != null && exchange.setStatus(STATUS_EXPIRED))
739 exchange.getEventListener().onExpire();
740 }
741 }
742 }