1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.lang.reflect.Constructor;
18 import java.net.ProtocolException;
19 import java.util.ArrayList;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.concurrent.ArrayBlockingQueue;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.RejectedExecutionException;
25
26 import org.eclipse.jetty.client.HttpClient.Connector;
27 import org.eclipse.jetty.client.security.Authentication;
28 import org.eclipse.jetty.client.security.SecurityListener;
29 import org.eclipse.jetty.http.HttpCookie;
30 import org.eclipse.jetty.http.HttpHeaders;
31 import org.eclipse.jetty.http.HttpMethods;
32 import org.eclipse.jetty.http.HttpStatus;
33 import org.eclipse.jetty.http.PathMap;
34 import org.eclipse.jetty.io.Buffer;
35 import org.eclipse.jetty.io.ByteArrayBuffer;
36 import org.eclipse.jetty.io.Connection;
37 import org.eclipse.jetty.io.EndPoint;
38 import org.eclipse.jetty.util.component.AggregateLifeCycle;
39 import org.eclipse.jetty.util.component.Dumpable;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42
43
44
45
46 public class HttpDestination implements Dumpable
47 {
48 private static final Logger LOG = Log.getLogger(HttpDestination.class);
49
50 private final List<HttpExchange> _queue = new LinkedList<HttpExchange>();
51 private final List<AbstractHttpConnection> _connections = new LinkedList<AbstractHttpConnection>();
52 private final BlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
53 private final List<AbstractHttpConnection> _idle = new ArrayList<AbstractHttpConnection>();
54 private final HttpClient _client;
55 private final Address _address;
56 private final boolean _ssl;
57 private final ByteArrayBuffer _hostHeader;
58 private volatile int _maxConnections;
59 private volatile int _maxQueueSize;
60 private int _pendingConnections = 0;
61 private int _newConnection = 0;
62 private volatile Address _proxy;
63 private Authentication _proxyAuthentication;
64 private PathMap _authorizations;
65 private List<HttpCookie> _cookies;
66
67
68
69 HttpDestination(HttpClient client, Address address, boolean ssl)
70 {
71 _client = client;
72 _address = address;
73 _ssl = ssl;
74 _maxConnections = _client.getMaxConnectionsPerAddress();
75 _maxQueueSize = _client.getMaxQueueSizePerAddress();
76 String addressString = address.getHost();
77 if (address.getPort() != (_ssl ? 443 : 80))
78 addressString += ":" + address.getPort();
79 _hostHeader = new ByteArrayBuffer(addressString);
80 }
81
82 public HttpClient getHttpClient()
83 {
84 return _client;
85 }
86
87 public Address getAddress()
88 {
89 return _address;
90 }
91
92 public boolean isSecure()
93 {
94 return _ssl;
95 }
96
97 public Buffer getHostHeader()
98 {
99 return _hostHeader;
100 }
101
102 public int getMaxConnections()
103 {
104 return _maxConnections;
105 }
106
107 public void setMaxConnections(int maxConnections)
108 {
109 this._maxConnections = maxConnections;
110 }
111
112 public int getMaxQueueSize()
113 {
114 return _maxQueueSize;
115 }
116
117 public void setMaxQueueSize(int maxQueueSize)
118 {
119 this._maxQueueSize = maxQueueSize;
120 }
121
122 public int getConnections()
123 {
124 synchronized (this)
125 {
126 return _connections.size();
127 }
128 }
129
130 public int getIdleConnections()
131 {
132 synchronized (this)
133 {
134 return _idle.size();
135 }
136 }
137
138 public void addAuthorization(String pathSpec, Authentication authorization)
139 {
140 synchronized (this)
141 {
142 if (_authorizations == null)
143 _authorizations = new PathMap();
144 _authorizations.put(pathSpec, authorization);
145 }
146
147
148 }
149
150 public void addCookie(HttpCookie cookie)
151 {
152 synchronized (this)
153 {
154 if (_cookies == null)
155 _cookies = new ArrayList<HttpCookie>();
156 _cookies.add(cookie);
157 }
158
159
160 }
161
162
163
164
165
166
167
168
169
170
171 private AbstractHttpConnection getConnection(long timeout) throws IOException
172 {
173 AbstractHttpConnection connection = null;
174
175 while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
176 {
177 boolean startConnection = false;
178 synchronized (this)
179 {
180 int totalConnections = _connections.size() + _pendingConnections;
181 if (totalConnections < _maxConnections)
182 {
183 _newConnection++;
184 startConnection = true;
185 }
186 }
187
188 if (startConnection)
189 {
190 startNewConnection();
191 try
192 {
193 Object o = _newQueue.take();
194 if (o instanceof AbstractHttpConnection)
195 {
196 connection = (AbstractHttpConnection)o;
197 }
198 else
199 throw (IOException)o;
200 }
201 catch (InterruptedException e)
202 {
203 LOG.ignore(e);
204 }
205 }
206 else
207 {
208 try
209 {
210 Thread.currentThread();
211 Thread.sleep(200);
212 timeout -= 200;
213 }
214 catch (InterruptedException e)
215 {
216 LOG.ignore(e);
217 }
218 }
219 }
220 return connection;
221 }
222
223 public AbstractHttpConnection reserveConnection(long timeout) throws IOException
224 {
225 AbstractHttpConnection connection = getConnection(timeout);
226 if (connection != null)
227 connection.setReserved(true);
228 return connection;
229 }
230
231 public AbstractHttpConnection getIdleConnection() throws IOException
232 {
233 AbstractHttpConnection connection = null;
234 while (true)
235 {
236 synchronized (this)
237 {
238 if (connection != null)
239 {
240 _connections.remove(connection);
241 connection.close();
242 connection = null;
243 }
244 if (_idle.size() > 0)
245 connection = _idle.remove(_idle.size() - 1);
246 }
247
248 if (connection == null)
249 {
250 return null;
251 }
252
253
254
255 if (connection.cancelIdleTimeout())
256 {
257 return connection;
258 }
259 }
260 }
261
262 protected void startNewConnection()
263 {
264 try
265 {
266 synchronized (this)
267 {
268 _pendingConnections++;
269 }
270 final Connector connector = _client._connector;
271 if (connector != null)
272 connector.startConnection(this);
273 }
274 catch (Exception e)
275 {
276 LOG.debug(e);
277 onConnectionFailed(e);
278 }
279 }
280
281 public void onConnectionFailed(Throwable throwable)
282 {
283 Throwable connect_failure = null;
284
285 boolean startConnection = false;
286 synchronized (this)
287 {
288 _pendingConnections--;
289 if (_newConnection > 0)
290 {
291 connect_failure = throwable;
292 _newConnection--;
293 }
294 else if (_queue.size() > 0)
295 {
296 HttpExchange ex = _queue.remove(0);
297 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
298 ex.getEventListener().onConnectionFailed(throwable);
299
300
301
302 if (!_queue.isEmpty() && _client.isStarted())
303 startConnection = true;
304 }
305 }
306
307 if (startConnection)
308 startNewConnection();
309
310 if (connect_failure != null)
311 {
312 try
313 {
314 _newQueue.put(connect_failure);
315 }
316 catch (InterruptedException e)
317 {
318 LOG.ignore(e);
319 }
320 }
321 }
322
323 public void onException(Throwable throwable)
324 {
325 synchronized (this)
326 {
327 _pendingConnections--;
328 if (_queue.size() > 0)
329 {
330 HttpExchange ex = _queue.remove(0);
331 if(ex.setStatus(HttpExchange.STATUS_EXCEPTED))
332 ex.getEventListener().onException(throwable);
333 }
334 }
335 }
336
337 public void onNewConnection(final AbstractHttpConnection connection) throws IOException
338 {
339 Connection q_connection = null;
340
341 synchronized (this)
342 {
343 _pendingConnections--;
344 _connections.add(connection);
345
346 if (_newConnection > 0)
347 {
348 q_connection = connection;
349 _newConnection--;
350 }
351 else if (_queue.size() == 0)
352 {
353 connection.setIdleTimeout();
354 _idle.add(connection);
355 }
356 else
357 {
358 EndPoint endPoint = connection.getEndPoint();
359 if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint)
360 {
361 SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint;
362 HttpExchange exchange = _queue.get(0);
363 ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange);
364 connect.setAddress(getProxy());
365 send(connection, connect);
366 }
367 else
368 {
369 HttpExchange exchange = _queue.remove(0);
370 send(connection, exchange);
371 }
372 }
373 }
374
375 if (q_connection != null)
376 {
377 try
378 {
379 _newQueue.put(q_connection);
380 }
381 catch (InterruptedException e)
382 {
383 LOG.ignore(e);
384 }
385 }
386 }
387
388 public void returnConnection(AbstractHttpConnection connection, boolean close) throws IOException
389 {
390 if (connection.isReserved())
391 connection.setReserved(false);
392
393 if (close)
394 {
395 try
396 {
397 connection.close();
398 }
399 catch (IOException e)
400 {
401 LOG.ignore(e);
402 }
403 }
404
405 if (!_client.isStarted())
406 return;
407
408 if (!close && connection.getEndPoint().isOpen())
409 {
410 synchronized (this)
411 {
412 if (_queue.size() == 0)
413 {
414 connection.setIdleTimeout();
415 _idle.add(connection);
416 }
417 else
418 {
419 HttpExchange ex = _queue.remove(0);
420 send(connection, ex);
421 }
422 this.notifyAll();
423 }
424 }
425 else
426 {
427 boolean startConnection = false;
428 synchronized (this)
429 {
430 _connections.remove(connection);
431 if (!_queue.isEmpty())
432 startConnection = true;
433 }
434
435 if (startConnection)
436 startNewConnection();
437 }
438 }
439
440 public void returnIdleConnection(AbstractHttpConnection connection)
441 {
442 connection.onIdleExpired();
443
444 boolean startConnection = false;
445 synchronized (this)
446 {
447 _idle.remove(connection);
448 _connections.remove(connection);
449
450 if (!_queue.isEmpty() && _client.isStarted())
451 startConnection = true;
452 }
453
454 if (startConnection)
455 startNewConnection();
456 }
457
458 public void send(HttpExchange ex) throws IOException
459 {
460 LinkedList<String> listeners = _client.getRegisteredListeners();
461
462 if (listeners != null)
463 {
464
465 for (int i = listeners.size(); i > 0; --i)
466 {
467 String listenerClass = listeners.get(i - 1);
468
469 try
470 {
471 Class listener = Class.forName(listenerClass);
472 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
473 HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
474 ex.setEventListener(elistener);
475 }
476 catch (Exception e)
477 {
478 e.printStackTrace();
479 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
480 }
481 }
482 }
483
484
485 if (_client.hasRealms())
486 {
487 ex.setEventListener(new SecurityListener(this, ex));
488 }
489
490 doSend(ex);
491 }
492
493 public void resend(HttpExchange ex) throws IOException
494 {
495 ex.getEventListener().onRetry();
496 ex.reset();
497 doSend(ex);
498 }
499
500 protected void doSend(HttpExchange ex) throws IOException
501 {
502
503
504 if (_cookies != null)
505 {
506 StringBuilder buf = null;
507 for (HttpCookie cookie : _cookies)
508 {
509 if (buf == null)
510 buf = new StringBuilder();
511 else
512 buf.append("; ");
513 buf.append(cookie.getName());
514 buf.append("=");
515 buf.append(cookie.getValue());
516 }
517 if (buf != null)
518 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
519 }
520
521
522 if (_authorizations != null)
523 {
524 Authentication auth = (Authentication)_authorizations.match(ex.getRequestURI());
525 if (auth != null)
526 (auth).setCredentials(ex);
527 }
528
529
530
531 ex.scheduleTimeout(this);
532
533 AbstractHttpConnection connection = getIdleConnection();
534 if (connection != null)
535 {
536 send(connection, ex);
537 }
538 else
539 {
540 boolean startConnection = false;
541 synchronized (this)
542 {
543 if (_queue.size() == _maxQueueSize)
544 throw new RejectedExecutionException("Queue full for address " + _address);
545
546 _queue.add(ex);
547 if (_connections.size() + _pendingConnections < _maxConnections)
548 startConnection = true;
549 }
550
551 if (startConnection)
552 startNewConnection();
553 }
554 }
555
556 protected void exchangeExpired(HttpExchange exchange)
557 {
558
559
560 synchronized (this)
561 {
562 _queue.remove(exchange);
563 }
564 }
565
566 protected void send(AbstractHttpConnection connection, HttpExchange exchange) throws IOException
567 {
568 synchronized (this)
569 {
570
571
572 if (!connection.send(exchange))
573 {
574 if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
575 _queue.add(0, exchange);
576 returnIdleConnection(connection);
577 }
578 }
579 }
580
581 @Override
582 public synchronized String toString()
583 {
584 return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n",hashCode(),_address.getHost(),_address.getPort(),_connections.size(),_maxConnections,_idle.size(),_queue.size(),_maxQueueSize);
585 }
586
587 public synchronized String toDetailString()
588 {
589 StringBuilder b = new StringBuilder();
590 b.append(toString());
591 b.append('\n');
592 synchronized (this)
593 {
594 for (AbstractHttpConnection connection : _connections)
595 {
596 b.append(connection.toDetailString());
597 if (_idle.contains(connection))
598 b.append(" IDLE");
599 b.append('\n');
600 }
601 }
602 b.append("--");
603 b.append('\n');
604
605 return b.toString();
606 }
607
608 public void setProxy(Address proxy)
609 {
610 _proxy = proxy;
611 }
612
613 public Address getProxy()
614 {
615 return _proxy;
616 }
617
618 public Authentication getProxyAuthentication()
619 {
620 return _proxyAuthentication;
621 }
622
623 public void setProxyAuthentication(Authentication authentication)
624 {
625 _proxyAuthentication = authentication;
626 }
627
628 public boolean isProxied()
629 {
630 return _proxy != null;
631 }
632
633 public void close() throws IOException
634 {
635 synchronized (this)
636 {
637 for (AbstractHttpConnection connection : _connections)
638 {
639 connection.close();
640 }
641 }
642 }
643
644
645
646
647
648 public String dump()
649 {
650 return AggregateLifeCycle.dump(this);
651 }
652
653
654
655
656
657 public void dump(Appendable out, String indent) throws IOException
658 {
659 synchronized (this)
660 {
661 out.append(String.valueOf(this)+"idle="+_idle.size()+" pending="+_pendingConnections).append("\n");
662 AggregateLifeCycle.dump(out,indent,_connections);
663 }
664 }
665
666 private class ConnectExchange extends ContentExchange
667 {
668 private final SelectConnector.UpgradableEndPoint proxyEndPoint;
669 private final HttpExchange exchange;
670
671 public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint, HttpExchange exchange)
672 {
673 this.proxyEndPoint = proxyEndPoint;
674 this.exchange = exchange;
675 setMethod(HttpMethods.CONNECT);
676 setVersion(exchange.getVersion());
677 String serverHostAndPort = serverAddress.toString();
678 setRequestURI(serverHostAndPort);
679 addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
680 addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
681 addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
682 }
683
684 @Override
685 protected void onResponseComplete() throws IOException
686 {
687 int responseStatus = getResponseStatus();
688 if (responseStatus == HttpStatus.OK_200)
689 {
690 proxyEndPoint.upgrade();
691 }
692 else if(responseStatus == HttpStatus.GATEWAY_TIMEOUT_504)
693 {
694 onExpire();
695 }
696 else
697 {
698 onException(new ProtocolException("Proxy: " + proxyEndPoint.getRemoteAddr() +":" + proxyEndPoint.getRemotePort() + " didn't return http return code 200, but " + responseStatus + " while trying to request: " + exchange.getAddress().toString()));
699 }
700 }
701
702 @Override
703 protected void onConnectionFailed(Throwable x)
704 {
705 HttpDestination.this.onConnectionFailed(x);
706 }
707
708 @Override
709 protected void onException(Throwable x)
710 {
711 _queue.remove(exchange);
712 if (exchange.setStatus(STATUS_EXCEPTED))
713 exchange.getEventListener().onException(x);
714 }
715
716 @Override
717 protected void onExpire()
718 {
719 _queue.remove(exchange);
720 if (exchange.setStatus(STATUS_EXPIRED))
721 exchange.getEventListener().onExpire();
722 }
723
724 }
725 }