1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.lang.reflect.Constructor;
23 import java.net.ProtocolException;
24 import java.util.ArrayList;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.RejectedExecutionException;
30
31 import org.eclipse.jetty.client.HttpClient.Connector;
32 import org.eclipse.jetty.client.security.Authentication;
33 import org.eclipse.jetty.client.security.SecurityListener;
34 import org.eclipse.jetty.http.HttpCookie;
35 import org.eclipse.jetty.http.HttpHeaders;
36 import org.eclipse.jetty.http.HttpMethods;
37 import org.eclipse.jetty.http.HttpStatus;
38 import org.eclipse.jetty.http.PathMap;
39 import org.eclipse.jetty.io.Buffer;
40 import org.eclipse.jetty.io.ByteArrayBuffer;
41 import org.eclipse.jetty.io.Connection;
42 import org.eclipse.jetty.io.EndPoint;
43 import org.eclipse.jetty.util.component.AggregateLifeCycle;
44 import org.eclipse.jetty.util.component.Dumpable;
45 import org.eclipse.jetty.util.log.Log;
46 import org.eclipse.jetty.util.log.Logger;
47
48
49
50
51 public class HttpDestination implements Dumpable
52 {
53 private static final Logger LOG = Log.getLogger(HttpDestination.class);
54
55 private final List<HttpExchange> _queue = new LinkedList<HttpExchange>();
56 private final List<AbstractHttpConnection> _connections = new LinkedList<AbstractHttpConnection>();
57 private final BlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
58 private final List<AbstractHttpConnection> _idle = new ArrayList<AbstractHttpConnection>();
59 private final HttpClient _client;
60 private final Address _address;
61 private final boolean _ssl;
62 private final ByteArrayBuffer _hostHeader;
63 private volatile int _maxConnections;
64 private volatile int _maxQueueSize;
65 private int _pendingConnections = 0;
66 private int _newConnection = 0;
67 private volatile Address _proxy;
68 private Authentication _proxyAuthentication;
69 private PathMap _authorizations;
70 private List<HttpCookie> _cookies;
71
72
73
74 HttpDestination(HttpClient client, Address address, boolean ssl)
75 {
76 _client = client;
77 _address = address;
78 _ssl = ssl;
79 _maxConnections = _client.getMaxConnectionsPerAddress();
80 _maxQueueSize = _client.getMaxQueueSizePerAddress();
81 String addressString = address.getHost();
82 if (address.getPort() != (_ssl ? 443 : 80))
83 addressString += ":" + address.getPort();
84 _hostHeader = new ByteArrayBuffer(addressString);
85 }
86
87 public HttpClient getHttpClient()
88 {
89 return _client;
90 }
91
92 public Address getAddress()
93 {
94 return _address;
95 }
96
97 public boolean isSecure()
98 {
99 return _ssl;
100 }
101
102 public Buffer getHostHeader()
103 {
104 return _hostHeader;
105 }
106
107 public int getMaxConnections()
108 {
109 return _maxConnections;
110 }
111
112 public void setMaxConnections(int maxConnections)
113 {
114 this._maxConnections = maxConnections;
115 }
116
117 public int getMaxQueueSize()
118 {
119 return _maxQueueSize;
120 }
121
122 public void setMaxQueueSize(int maxQueueSize)
123 {
124 this._maxQueueSize = maxQueueSize;
125 }
126
127 public int getConnections()
128 {
129 synchronized (this)
130 {
131 return _connections.size();
132 }
133 }
134
135 public int getIdleConnections()
136 {
137 synchronized (this)
138 {
139 return _idle.size();
140 }
141 }
142
143 public void addAuthorization(String pathSpec, Authentication authorization)
144 {
145 synchronized (this)
146 {
147 if (_authorizations == null)
148 _authorizations = new PathMap();
149 _authorizations.put(pathSpec, authorization);
150 }
151
152
153 }
154
155 public void addCookie(HttpCookie cookie)
156 {
157 synchronized (this)
158 {
159 if (_cookies == null)
160 _cookies = new ArrayList<HttpCookie>();
161 _cookies.add(cookie);
162 }
163
164
165 }
166
167
168
169
170
171
172
173
174
175
176 private AbstractHttpConnection getConnection(long timeout) throws IOException
177 {
178 AbstractHttpConnection connection = null;
179
180 while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
181 {
182 boolean startConnection = false;
183 synchronized (this)
184 {
185 int totalConnections = _connections.size() + _pendingConnections;
186 if (totalConnections < _maxConnections)
187 {
188 _newConnection++;
189 startConnection = true;
190 }
191 }
192
193 if (startConnection)
194 {
195 startNewConnection();
196 try
197 {
198 Object o = _newQueue.take();
199 if (o instanceof AbstractHttpConnection)
200 {
201 connection = (AbstractHttpConnection)o;
202 }
203 else
204 throw (IOException)o;
205 }
206 catch (InterruptedException e)
207 {
208 LOG.ignore(e);
209 }
210 }
211 else
212 {
213 try
214 {
215 Thread.currentThread();
216 Thread.sleep(200);
217 timeout -= 200;
218 }
219 catch (InterruptedException e)
220 {
221 LOG.ignore(e);
222 }
223 }
224 }
225 return connection;
226 }
227
228 public AbstractHttpConnection reserveConnection(long timeout) throws IOException
229 {
230 AbstractHttpConnection connection = getConnection(timeout);
231 if (connection != null)
232 connection.setReserved(true);
233 return connection;
234 }
235
236 public AbstractHttpConnection getIdleConnection() throws IOException
237 {
238 AbstractHttpConnection connection = null;
239 while (true)
240 {
241 synchronized (this)
242 {
243 if (connection != null)
244 {
245 _connections.remove(connection);
246 connection.close();
247 connection = null;
248 }
249 if (_idle.size() > 0)
250 connection = _idle.remove(_idle.size() - 1);
251 }
252
253 if (connection == null)
254 {
255 return null;
256 }
257
258
259
260 if (connection.cancelIdleTimeout())
261 {
262 return connection;
263 }
264 }
265 }
266
267 protected void startNewConnection()
268 {
269 try
270 {
271 synchronized (this)
272 {
273 _pendingConnections++;
274 }
275 final Connector connector = _client._connector;
276 if (connector != null)
277 connector.startConnection(this);
278 }
279 catch (Exception e)
280 {
281 LOG.debug(e);
282 onConnectionFailed(e);
283 }
284 }
285
286 public void onConnectionFailed(Throwable throwable)
287 {
288 Throwable connect_failure = null;
289
290 boolean startConnection = false;
291 synchronized (this)
292 {
293 _pendingConnections--;
294 if (_newConnection > 0)
295 {
296 connect_failure = throwable;
297 _newConnection--;
298 }
299 else if (_queue.size() > 0)
300 {
301 HttpExchange ex = _queue.remove(0);
302 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
303 ex.getEventListener().onConnectionFailed(throwable);
304
305
306
307 if (!_queue.isEmpty() && _client.isStarted())
308 startConnection = true;
309 }
310 }
311
312 if (startConnection)
313 startNewConnection();
314
315 if (connect_failure != null)
316 {
317 try
318 {
319 _newQueue.put(connect_failure);
320 }
321 catch (InterruptedException e)
322 {
323 LOG.ignore(e);
324 }
325 }
326 }
327
328 public void onException(Throwable throwable)
329 {
330 synchronized (this)
331 {
332 _pendingConnections--;
333 if (_queue.size() > 0)
334 {
335 HttpExchange ex = _queue.remove(0);
336 if(ex.setStatus(HttpExchange.STATUS_EXCEPTED))
337 ex.getEventListener().onException(throwable);
338 }
339 }
340 }
341
342 public void onNewConnection(final AbstractHttpConnection connection) throws IOException
343 {
344 Connection q_connection = null;
345
346 synchronized (this)
347 {
348 _pendingConnections--;
349 _connections.add(connection);
350
351 if (_newConnection > 0)
352 {
353 q_connection = connection;
354 _newConnection--;
355 }
356 else if (_queue.size() == 0)
357 {
358 connection.setIdleTimeout();
359 _idle.add(connection);
360 }
361 else
362 {
363 EndPoint endPoint = connection.getEndPoint();
364 if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint)
365 {
366 SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint;
367 HttpExchange exchange = _queue.get(0);
368 ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange);
369 connect.setAddress(getProxy());
370 send(connection, connect);
371 }
372 else
373 {
374 HttpExchange exchange = _queue.remove(0);
375 send(connection, exchange);
376 }
377 }
378 }
379
380 if (q_connection != null)
381 {
382 try
383 {
384 _newQueue.put(q_connection);
385 }
386 catch (InterruptedException e)
387 {
388 LOG.ignore(e);
389 }
390 }
391 }
392
393 public void returnConnection(AbstractHttpConnection connection, boolean close) throws IOException
394 {
395 if (connection.isReserved())
396 connection.setReserved(false);
397
398 if (close)
399 {
400 try
401 {
402 connection.close();
403 }
404 catch (IOException e)
405 {
406 LOG.ignore(e);
407 }
408 }
409
410 if (!_client.isStarted())
411 return;
412
413 if (!close && connection.getEndPoint().isOpen())
414 {
415 synchronized (this)
416 {
417 if (_queue.size() == 0)
418 {
419 connection.setIdleTimeout();
420 _idle.add(connection);
421 }
422 else
423 {
424 HttpExchange ex = _queue.remove(0);
425 send(connection, ex);
426 }
427 this.notifyAll();
428 }
429 }
430 else
431 {
432 boolean startConnection = false;
433 synchronized (this)
434 {
435 _connections.remove(connection);
436 if (!_queue.isEmpty())
437 startConnection = true;
438 }
439
440 if (startConnection)
441 startNewConnection();
442 }
443 }
444
445 public void returnIdleConnection(AbstractHttpConnection connection)
446 {
447
448 long idleForMs=connection!=null&&connection.getEndPoint()!=null?connection.getEndPoint().getMaxIdleTime():-1;
449 connection.onIdleExpired(idleForMs);
450
451 boolean startConnection = false;
452 synchronized (this)
453 {
454 _idle.remove(connection);
455 _connections.remove(connection);
456
457 if (!_queue.isEmpty() && _client.isStarted())
458 startConnection = true;
459 }
460
461 if (startConnection)
462 startNewConnection();
463 }
464
465 public void send(HttpExchange ex) throws IOException
466 {
467 LinkedList<String> listeners = _client.getRegisteredListeners();
468
469 if (listeners != null)
470 {
471
472 for (int i = listeners.size(); i > 0; --i)
473 {
474 String listenerClass = listeners.get(i - 1);
475
476 try
477 {
478 Class listener = Class.forName(listenerClass);
479 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
480 HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
481 ex.setEventListener(elistener);
482 }
483 catch (final Exception e)
484 {
485 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass)
486 {
487 {initCause(e);}
488 };
489 }
490 }
491 }
492
493
494 if (_client.hasRealms())
495 {
496 ex.setEventListener(new SecurityListener(this, ex));
497 }
498
499 doSend(ex);
500 }
501
502 public void resend(HttpExchange ex) throws IOException
503 {
504 ex.getEventListener().onRetry();
505 ex.reset();
506 doSend(ex);
507 }
508
509 protected void doSend(HttpExchange ex) throws IOException
510 {
511
512
513 if (_cookies != null)
514 {
515 StringBuilder buf = null;
516 for (HttpCookie cookie : _cookies)
517 {
518 if (buf == null)
519 buf = new StringBuilder();
520 else
521 buf.append("; ");
522 buf.append(cookie.getName());
523 buf.append("=");
524 buf.append(cookie.getValue());
525 }
526 if (buf != null)
527 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
528 }
529
530
531 if (_authorizations != null)
532 {
533 Authentication auth = (Authentication)_authorizations.match(ex.getRequestURI());
534 if (auth != null)
535 (auth).setCredentials(ex);
536 }
537
538
539
540 ex.scheduleTimeout(this);
541
542 AbstractHttpConnection connection = getIdleConnection();
543 if (connection != null)
544 {
545 send(connection, ex);
546 }
547 else
548 {
549 boolean startConnection = false;
550 synchronized (this)
551 {
552 if (_queue.size() == _maxQueueSize)
553 throw new RejectedExecutionException("Queue full for address " + _address);
554
555 _queue.add(ex);
556 if (_connections.size() + _pendingConnections < _maxConnections)
557 startConnection = true;
558 }
559
560 if (startConnection)
561 startNewConnection();
562 }
563 }
564
565 protected void exchangeExpired(HttpExchange exchange)
566 {
567
568
569 synchronized (this)
570 {
571 _queue.remove(exchange);
572 }
573 }
574
575 protected void send(AbstractHttpConnection connection, HttpExchange exchange) throws IOException
576 {
577 synchronized (this)
578 {
579
580
581 if (!connection.send(exchange))
582 {
583 if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
584 _queue.add(0, exchange);
585 returnIdleConnection(connection);
586 }
587 }
588 }
589
590 @Override
591 public synchronized String toString()
592 {
593 return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n",hashCode(),_address.getHost(),_address.getPort(),_connections.size(),_maxConnections,_idle.size(),_queue.size(),_maxQueueSize);
594 }
595
596 public synchronized String toDetailString()
597 {
598 StringBuilder b = new StringBuilder();
599 b.append(toString());
600 b.append('\n');
601 synchronized (this)
602 {
603 for (AbstractHttpConnection connection : _connections)
604 {
605 b.append(connection.toDetailString());
606 if (_idle.contains(connection))
607 b.append(" IDLE");
608 b.append('\n');
609 }
610 }
611 b.append("--");
612 b.append('\n');
613
614 return b.toString();
615 }
616
617 public void setProxy(Address proxy)
618 {
619 _proxy = proxy;
620 }
621
622 public Address getProxy()
623 {
624 return _proxy;
625 }
626
627 public Authentication getProxyAuthentication()
628 {
629 return _proxyAuthentication;
630 }
631
632 public void setProxyAuthentication(Authentication authentication)
633 {
634 _proxyAuthentication = authentication;
635 }
636
637 public boolean isProxied()
638 {
639 return _proxy != null;
640 }
641
642 public void close() throws IOException
643 {
644 synchronized (this)
645 {
646 for (AbstractHttpConnection connection : _connections)
647 {
648 connection.close();
649 }
650 }
651 }
652
653
654
655
656
657 public String dump()
658 {
659 return AggregateLifeCycle.dump(this);
660 }
661
662
663
664
665
666 public void dump(Appendable out, String indent) throws IOException
667 {
668 synchronized (this)
669 {
670 out.append(String.valueOf(this)+"idle="+_idle.size()+" pending="+_pendingConnections).append("\n");
671 AggregateLifeCycle.dump(out,indent,_connections);
672 }
673 }
674
675 private class ConnectExchange extends ContentExchange
676 {
677 private final SelectConnector.UpgradableEndPoint proxyEndPoint;
678 private final HttpExchange exchange;
679
680 public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint, HttpExchange exchange)
681 {
682 this.proxyEndPoint = proxyEndPoint;
683 this.exchange = exchange;
684 setMethod(HttpMethods.CONNECT);
685 setVersion(exchange.getVersion());
686 String serverHostAndPort = serverAddress.toString();
687 setRequestURI(serverHostAndPort);
688 addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
689 addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
690 addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
691 }
692
693 @Override
694 protected void onResponseComplete() throws IOException
695 {
696 int responseStatus = getResponseStatus();
697 if (responseStatus == HttpStatus.OK_200)
698 {
699 proxyEndPoint.upgrade();
700 }
701 else if(responseStatus == HttpStatus.GATEWAY_TIMEOUT_504)
702 {
703 onExpire();
704 }
705 else
706 {
707 onException(new ProtocolException("Proxy: " + proxyEndPoint.getRemoteAddr() +":" + proxyEndPoint.getRemotePort() + " didn't return http return code 200, but " + responseStatus + " while trying to request: " + exchange.getAddress().toString()));
708 }
709 }
710
711 @Override
712 protected void onConnectionFailed(Throwable x)
713 {
714 HttpDestination.this.onConnectionFailed(x);
715 }
716
717 @Override
718 protected void onException(Throwable x)
719 {
720 _queue.remove(exchange);
721 if (exchange.setStatus(STATUS_EXCEPTED))
722 exchange.getEventListener().onException(x);
723 }
724
725 @Override
726 protected void onExpire()
727 {
728 _queue.remove(exchange);
729 if (exchange.setStatus(STATUS_EXPIRED))
730 exchange.getEventListener().onExpire();
731 }
732
733 }
734 }