1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.lang.reflect.Constructor;
18 import java.net.ConnectException;
19 import java.util.ArrayList;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.concurrent.ArrayBlockingQueue;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.RejectedExecutionException;
25
26 import org.eclipse.jetty.client.HttpClient.Connector;
27 import org.eclipse.jetty.client.security.Authentication;
28 import org.eclipse.jetty.client.security.SecurityListener;
29 import org.eclipse.jetty.http.HttpCookie;
30 import org.eclipse.jetty.http.HttpHeaders;
31 import org.eclipse.jetty.http.HttpMethods;
32 import org.eclipse.jetty.http.HttpStatus;
33 import org.eclipse.jetty.http.PathMap;
34 import org.eclipse.jetty.io.Buffer;
35 import org.eclipse.jetty.io.ByteArrayBuffer;
36 import org.eclipse.jetty.io.Connection;
37 import org.eclipse.jetty.io.EndPoint;
38 import org.eclipse.jetty.util.component.AggregateLifeCycle;
39 import org.eclipse.jetty.util.component.Dumpable;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42
43
44
45
46 public class HttpDestination implements Dumpable
47 {
48 private static final Logger LOG = Log.getLogger(HttpDestination.class);
49
50 private final List<HttpExchange> _queue = new LinkedList<HttpExchange>();
51 private final List<HttpConnection> _connections = new LinkedList<HttpConnection>();
52 private final BlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
53 private final List<HttpConnection> _idle = new ArrayList<HttpConnection>();
54 private final HttpClient _client;
55 private final Address _address;
56 private final boolean _ssl;
57 private final ByteArrayBuffer _hostHeader;
58 private volatile int _maxConnections;
59 private volatile int _maxQueueSize;
60 private int _pendingConnections = 0;
61 private int _newConnection = 0;
62 private volatile Address _proxy;
63 private Authentication _proxyAuthentication;
64 private PathMap _authorizations;
65 private List<HttpCookie> _cookies;
66
67
68
69 HttpDestination(HttpClient client, Address address, boolean ssl)
70 {
71 _client = client;
72 _address = address;
73 _ssl = ssl;
74 _maxConnections = _client.getMaxConnectionsPerAddress();
75 _maxQueueSize = _client.getMaxQueueSizePerAddress();
76 String addressString = address.getHost();
77 if (address.getPort() != (_ssl ? 443 : 80))
78 addressString += ":" + address.getPort();
79 _hostHeader = new ByteArrayBuffer(addressString);
80 }
81
82 public HttpClient getHttpClient()
83 {
84 return _client;
85 }
86
87 public Address getAddress()
88 {
89 return _address;
90 }
91
92 public boolean isSecure()
93 {
94 return _ssl;
95 }
96
97 public Buffer getHostHeader()
98 {
99 return _hostHeader;
100 }
101
102 public int getMaxConnections()
103 {
104 return _maxConnections;
105 }
106
107 public void setMaxConnections(int maxConnections)
108 {
109 this._maxConnections = maxConnections;
110 }
111
112 public int getMaxQueueSize()
113 {
114 return _maxQueueSize;
115 }
116
117 public void setMaxQueueSize(int maxQueueSize)
118 {
119 this._maxQueueSize = maxQueueSize;
120 }
121
122 public int getConnections()
123 {
124 synchronized (this)
125 {
126 return _connections.size();
127 }
128 }
129
130 public int getIdleConnections()
131 {
132 synchronized (this)
133 {
134 return _idle.size();
135 }
136 }
137
138 public void addAuthorization(String pathSpec, Authentication authorization)
139 {
140 synchronized (this)
141 {
142 if (_authorizations == null)
143 _authorizations = new PathMap();
144 _authorizations.put(pathSpec, authorization);
145 }
146
147
148 }
149
150 public void addCookie(HttpCookie cookie)
151 {
152 synchronized (this)
153 {
154 if (_cookies == null)
155 _cookies = new ArrayList<HttpCookie>();
156 _cookies.add(cookie);
157 }
158
159
160 }
161
162
163
164
165
166
167
168
169
170
171 private HttpConnection getConnection(long timeout) throws IOException
172 {
173 HttpConnection connection = null;
174
175 while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
176 {
177 boolean startConnection = false;
178 synchronized (this)
179 {
180 int totalConnections = _connections.size() + _pendingConnections;
181 if (totalConnections < _maxConnections)
182 {
183 _newConnection++;
184 startConnection = true;
185 }
186 }
187
188 if (startConnection)
189 {
190 startNewConnection();
191 try
192 {
193 Object o = _newQueue.take();
194 if (o instanceof HttpConnection)
195 {
196 connection = (HttpConnection)o;
197 }
198 else
199 throw (IOException)o;
200 }
201 catch (InterruptedException e)
202 {
203 LOG.ignore(e);
204 }
205 }
206 else
207 {
208 try
209 {
210 Thread.currentThread();
211 Thread.sleep(200);
212 timeout -= 200;
213 }
214 catch (InterruptedException e)
215 {
216 LOG.ignore(e);
217 }
218 }
219 }
220 return connection;
221 }
222
223 public HttpConnection reserveConnection(long timeout) throws IOException
224 {
225 HttpConnection connection = getConnection(timeout);
226 if (connection != null)
227 connection.setReserved(true);
228 return connection;
229 }
230
231 public HttpConnection getIdleConnection() throws IOException
232 {
233 HttpConnection connection = null;
234 while (true)
235 {
236 synchronized (this)
237 {
238 if (connection != null)
239 {
240 _connections.remove(connection);
241 connection.close();
242 connection = null;
243 }
244 if (_idle.size() > 0)
245 connection = _idle.remove(_idle.size() - 1);
246 }
247
248 if (connection == null)
249 return null;
250
251
252
253 if (connection.cancelIdleTimeout())
254 return connection;
255 }
256 }
257
258 protected void startNewConnection()
259 {
260 try
261 {
262 synchronized (this)
263 {
264 _pendingConnections++;
265 }
266 final Connector connector = _client._connector;
267 if (connector != null)
268 connector.startConnection(this);
269 }
270 catch (Exception e)
271 {
272 LOG.debug(e);
273 onConnectionFailed(e);
274 }
275 }
276
277 public void onConnectionFailed(Throwable throwable)
278 {
279 Throwable connect_failure = null;
280
281 boolean startConnection = false;
282 synchronized (this)
283 {
284 _pendingConnections--;
285 if (_newConnection > 0)
286 {
287 connect_failure = throwable;
288 _newConnection--;
289 }
290 else if (_queue.size() > 0)
291 {
292 HttpExchange ex = _queue.remove(0);
293 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
294 ex.getEventListener().onConnectionFailed(throwable);
295
296
297
298 if (!_queue.isEmpty() && _client.isStarted())
299 startConnection = true;
300 }
301 }
302
303 if (startConnection)
304 startNewConnection();
305
306 if (connect_failure != null)
307 {
308 try
309 {
310 _newQueue.put(connect_failure);
311 }
312 catch (InterruptedException e)
313 {
314 LOG.ignore(e);
315 }
316 }
317 }
318
319 public void onException(Throwable throwable)
320 {
321 synchronized (this)
322 {
323 _pendingConnections--;
324 if (_queue.size() > 0)
325 {
326 HttpExchange ex = _queue.remove(0);
327 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
328 ex.getEventListener().onException(throwable);
329 }
330 }
331 }
332
333 public void onNewConnection(final HttpConnection connection) throws IOException
334 {
335 Connection q_connection = null;
336
337 synchronized (this)
338 {
339 _pendingConnections--;
340 _connections.add(connection);
341
342 if (_newConnection > 0)
343 {
344 q_connection = connection;
345 _newConnection--;
346 }
347 else if (_queue.size() == 0)
348 {
349 connection.setIdleTimeout();
350 _idle.add(connection);
351 }
352 else
353 {
354 EndPoint endPoint = connection.getEndPoint();
355 if (isProxied() && endPoint instanceof SelectConnector.ProxySelectChannelEndPoint)
356 {
357 SelectConnector.ProxySelectChannelEndPoint proxyEndPoint = (SelectConnector.ProxySelectChannelEndPoint)endPoint;
358 HttpExchange exchange = _queue.get(0);
359 ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange);
360 connect.setAddress(getProxy());
361 send(connection, connect);
362 }
363 else
364 {
365 HttpExchange exchange = _queue.remove(0);
366 send(connection, exchange);
367 }
368 }
369 }
370
371 if (q_connection != null)
372 {
373 try
374 {
375 _newQueue.put(q_connection);
376 }
377 catch (InterruptedException e)
378 {
379 LOG.ignore(e);
380 }
381 }
382 }
383
384 public void returnConnection(HttpConnection connection, boolean close) throws IOException
385 {
386 if (connection.isReserved())
387 connection.setReserved(false);
388
389 if (close)
390 {
391 try
392 {
393 connection.close();
394 }
395 catch (IOException e)
396 {
397 LOG.ignore(e);
398 }
399 }
400
401 if (!_client.isStarted())
402 return;
403
404 if (!close && connection.getEndPoint().isOpen())
405 {
406 synchronized (this)
407 {
408 if (_queue.size() == 0)
409 {
410 connection.setIdleTimeout();
411 _idle.add(connection);
412 }
413 else
414 {
415 HttpExchange ex = _queue.remove(0);
416 send(connection, ex);
417 }
418 this.notifyAll();
419 }
420 }
421 else
422 {
423 boolean startConnection = false;
424 synchronized (this)
425 {
426 _connections.remove(connection);
427 if (!_queue.isEmpty())
428 startConnection = true;
429 }
430
431 if (startConnection)
432 startNewConnection();
433 }
434 }
435
436 public void returnIdleConnection(HttpConnection connection)
437 {
438 try
439 {
440 connection.close();
441 }
442 catch (IOException e)
443 {
444 LOG.ignore(e);
445 }
446
447 boolean startConnection = false;
448 synchronized (this)
449 {
450 _idle.remove(connection);
451 _connections.remove(connection);
452
453 if (!_queue.isEmpty() && _client.isStarted())
454 startConnection = true;
455 }
456
457 if (startConnection)
458 startNewConnection();
459 }
460
461 public void send(HttpExchange ex) throws IOException
462 {
463 LinkedList<String> listeners = _client.getRegisteredListeners();
464
465 if (listeners != null)
466 {
467
468 for (int i = listeners.size(); i > 0; --i)
469 {
470 String listenerClass = listeners.get(i - 1);
471
472 try
473 {
474 Class listener = Class.forName(listenerClass);
475 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
476 HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
477 ex.setEventListener(elistener);
478 }
479 catch (Exception e)
480 {
481 e.printStackTrace();
482 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
483 }
484 }
485 }
486
487
488 if (_client.hasRealms())
489 {
490 ex.setEventListener(new SecurityListener(this, ex));
491 }
492
493 doSend(ex);
494 }
495
496 public void resend(HttpExchange ex) throws IOException
497 {
498 ex.getEventListener().onRetry();
499 ex.reset();
500 doSend(ex);
501 }
502
503 protected void doSend(HttpExchange ex) throws IOException
504 {
505
506
507 if (_cookies != null)
508 {
509 StringBuilder buf = null;
510 for (HttpCookie cookie : _cookies)
511 {
512 if (buf == null)
513 buf = new StringBuilder();
514 else
515 buf.append("; ");
516 buf.append(cookie.getName());
517 buf.append("=");
518 buf.append(cookie.getValue());
519 }
520 if (buf != null)
521 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
522 }
523
524
525 if (_authorizations != null)
526 {
527 Authentication auth = (Authentication)_authorizations.match(ex.getURI());
528 if (auth != null)
529 (auth).setCredentials(ex);
530 }
531
532
533
534 ex.scheduleTimeout(this);
535
536 HttpConnection connection = getIdleConnection();
537 if (connection != null)
538 {
539 send(connection, ex);
540 }
541 else
542 {
543 boolean startConnection = false;
544 synchronized (this)
545 {
546 if (_queue.size() == _maxQueueSize)
547 throw new RejectedExecutionException("Queue full for address " + _address);
548
549 _queue.add(ex);
550 if (_connections.size() + _pendingConnections < _maxConnections)
551 startConnection = true;
552 }
553
554 if (startConnection)
555 startNewConnection();
556 }
557 }
558
559 protected void exchangeExpired(HttpExchange exchange)
560 {
561
562
563 synchronized (this)
564 {
565 _queue.remove(exchange);
566 }
567 }
568
569 protected void send(HttpConnection connection, HttpExchange exchange) throws IOException
570 {
571 synchronized (this)
572 {
573
574
575 if (!connection.send(exchange))
576 {
577 if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
578 _queue.add(0, exchange);
579 returnIdleConnection(connection);
580 }
581 }
582 }
583
584 @Override
585 public synchronized String toString()
586 {
587 return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n",hashCode(),_address.getHost(),_address.getPort(),_connections.size(),_maxConnections,_idle.size(),_queue.size(),_maxQueueSize);
588 }
589
590 public synchronized String toDetailString()
591 {
592 StringBuilder b = new StringBuilder();
593 b.append(toString());
594 b.append('\n');
595 synchronized (this)
596 {
597 for (HttpConnection connection : _connections)
598 {
599 b.append(connection.toDetailString());
600 if (_idle.contains(connection))
601 b.append(" IDLE");
602 b.append('\n');
603 }
604 }
605 b.append("--");
606 b.append('\n');
607
608 return b.toString();
609 }
610
611 public void setProxy(Address proxy)
612 {
613 _proxy = proxy;
614 }
615
616 public Address getProxy()
617 {
618 return _proxy;
619 }
620
621 public Authentication getProxyAuthentication()
622 {
623 return _proxyAuthentication;
624 }
625
626 public void setProxyAuthentication(Authentication authentication)
627 {
628 _proxyAuthentication = authentication;
629 }
630
631 public boolean isProxied()
632 {
633 return _proxy != null;
634 }
635
636 public void close() throws IOException
637 {
638 synchronized (this)
639 {
640 for (HttpConnection connection : _connections)
641 {
642 connection.close();
643 }
644 }
645 }
646
647
648
649
650
651 public String dump()
652 {
653 return AggregateLifeCycle.dump(this);
654 }
655
656
657
658
659
660 public void dump(Appendable out, String indent) throws IOException
661 {
662 synchronized (this)
663 {
664 out.append(String.valueOf(this)+"idle="+_idle.size()+" pending="+_pendingConnections).append("\n");
665 AggregateLifeCycle.dump(out,indent,_connections);
666 }
667 }
668
669 private class ConnectExchange extends ContentExchange
670 {
671 private final SelectConnector.ProxySelectChannelEndPoint proxyEndPoint;
672 private final HttpExchange exchange;
673
674 public ConnectExchange(Address serverAddress, SelectConnector.ProxySelectChannelEndPoint proxyEndPoint, HttpExchange exchange)
675 {
676 this.proxyEndPoint = proxyEndPoint;
677 this.exchange = exchange;
678 setMethod(HttpMethods.CONNECT);
679 setVersion(exchange.getVersion());
680 String serverHostAndPort = serverAddress.toString();
681 setRequestURI(serverHostAndPort);
682 addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
683 addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
684 addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
685 }
686
687 @Override
688 protected void onResponseComplete() throws IOException
689 {
690 if (getResponseStatus() == HttpStatus.OK_200)
691 {
692 proxyEndPoint.upgrade();
693 }
694 else
695 {
696 onConnectionFailed(new ConnectException(exchange.getAddress().toString()));
697 }
698 }
699
700 @Override
701 protected void onConnectionFailed(Throwable x)
702 {
703 HttpDestination.this.onConnectionFailed(x);
704 }
705 }
706 }