1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.lang.reflect.Constructor;
18 import java.net.ConnectException;
19 import java.util.ArrayList;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.concurrent.ArrayBlockingQueue;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.RejectedExecutionException;
25
26 import org.eclipse.jetty.client.HttpClient.Connector;
27 import org.eclipse.jetty.client.security.Authentication;
28 import org.eclipse.jetty.client.security.SecurityListener;
29 import org.eclipse.jetty.http.HttpCookie;
30 import org.eclipse.jetty.http.HttpHeaders;
31 import org.eclipse.jetty.http.HttpMethods;
32 import org.eclipse.jetty.http.HttpStatus;
33 import org.eclipse.jetty.http.PathMap;
34 import org.eclipse.jetty.io.Buffer;
35 import org.eclipse.jetty.io.ByteArrayBuffer;
36 import org.eclipse.jetty.io.Connection;
37 import org.eclipse.jetty.io.EndPoint;
38 import org.eclipse.jetty.util.component.AggregateLifeCycle;
39 import org.eclipse.jetty.util.component.Dumpable;
40 import org.eclipse.jetty.util.log.Log;
41
42
43
44
45 public class HttpDestination implements Dumpable
46 {
47 private final List<HttpExchange> _queue = new LinkedList<HttpExchange>();
48 private final List<HttpConnection> _connections = new LinkedList<HttpConnection>();
49 private final BlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
50 private final List<HttpConnection> _idle = new ArrayList<HttpConnection>();
51 private final HttpClient _client;
52 private final Address _address;
53 private final boolean _ssl;
54 private final ByteArrayBuffer _hostHeader;
55 private volatile int _maxConnections;
56 private volatile int _maxQueueSize;
57 private int _pendingConnections = 0;
58 private int _newConnection = 0;
59 private volatile Address _proxy;
60 private Authentication _proxyAuthentication;
61 private PathMap _authorizations;
62 private List<HttpCookie> _cookies;
63
64
65
66 HttpDestination(HttpClient client, Address address, boolean ssl)
67 {
68 _client = client;
69 _address = address;
70 _ssl = ssl;
71 _maxConnections = _client.getMaxConnectionsPerAddress();
72 _maxQueueSize = _client.getMaxQueueSizePerAddress();
73 String addressString = address.getHost();
74 if (address.getPort() != (_ssl ? 443 : 80))
75 addressString += ":" + address.getPort();
76 _hostHeader = new ByteArrayBuffer(addressString);
77 }
78
79 public HttpClient getHttpClient()
80 {
81 return _client;
82 }
83
84 public Address getAddress()
85 {
86 return _address;
87 }
88
89 public boolean isSecure()
90 {
91 return _ssl;
92 }
93
94 public Buffer getHostHeader()
95 {
96 return _hostHeader;
97 }
98
99 public int getMaxConnections()
100 {
101 return _maxConnections;
102 }
103
104 public void setMaxConnections(int maxConnections)
105 {
106 this._maxConnections = maxConnections;
107 }
108
109 public int getMaxQueueSize()
110 {
111 return _maxQueueSize;
112 }
113
114 public void setMaxQueueSize(int maxQueueSize)
115 {
116 this._maxQueueSize = maxQueueSize;
117 }
118
119 public int getConnections()
120 {
121 synchronized (this)
122 {
123 return _connections.size();
124 }
125 }
126
127 public int getIdleConnections()
128 {
129 synchronized (this)
130 {
131 return _idle.size();
132 }
133 }
134
135 public void addAuthorization(String pathSpec, Authentication authorization)
136 {
137 synchronized (this)
138 {
139 if (_authorizations == null)
140 _authorizations = new PathMap();
141 _authorizations.put(pathSpec, authorization);
142 }
143
144
145 }
146
147 public void addCookie(HttpCookie cookie)
148 {
149 synchronized (this)
150 {
151 if (_cookies == null)
152 _cookies = new ArrayList<HttpCookie>();
153 _cookies.add(cookie);
154 }
155
156
157 }
158
159
160
161
162
163
164
165
166
167
168 private HttpConnection getConnection(long timeout) throws IOException
169 {
170 HttpConnection connection = null;
171
172 while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
173 {
174 boolean startConnection = false;
175 synchronized (this)
176 {
177 int totalConnections = _connections.size() + _pendingConnections;
178 if (totalConnections < _maxConnections)
179 {
180 _newConnection++;
181 startConnection = true;
182 }
183 }
184
185 if (startConnection)
186 {
187 startNewConnection();
188 try
189 {
190 Object o = _newQueue.take();
191 if (o instanceof HttpConnection)
192 {
193 connection = (HttpConnection)o;
194 }
195 else
196 throw (IOException)o;
197 }
198 catch (InterruptedException e)
199 {
200 Log.ignore(e);
201 }
202 }
203 else
204 {
205 try
206 {
207 Thread.currentThread();
208 Thread.sleep(200);
209 timeout -= 200;
210 }
211 catch (InterruptedException e)
212 {
213 Log.ignore(e);
214 }
215 }
216 }
217 return connection;
218 }
219
220 public HttpConnection reserveConnection(long timeout) throws IOException
221 {
222 HttpConnection connection = getConnection(timeout);
223 if (connection != null)
224 connection.setReserved(true);
225 return connection;
226 }
227
228 public HttpConnection getIdleConnection() throws IOException
229 {
230 HttpConnection connection = null;
231 while (true)
232 {
233 synchronized (this)
234 {
235 if (connection != null)
236 {
237 _connections.remove(connection);
238 connection.close();
239 connection = null;
240 }
241 if (_idle.size() > 0)
242 connection = _idle.remove(_idle.size() - 1);
243 }
244
245 if (connection == null)
246 return null;
247
248
249
250 if (connection.cancelIdleTimeout())
251 return connection;
252 }
253 }
254
255 protected void startNewConnection()
256 {
257 try
258 {
259 synchronized (this)
260 {
261 _pendingConnections++;
262 }
263 final Connector connector = _client._connector;
264 if (connector != null)
265 connector.startConnection(this);
266 }
267 catch (Exception e)
268 {
269 Log.debug(e);
270 onConnectionFailed(e);
271 }
272 }
273
274 public void onConnectionFailed(Throwable throwable)
275 {
276 Throwable connect_failure = null;
277
278 boolean startConnection = false;
279 synchronized (this)
280 {
281 _pendingConnections--;
282 if (_newConnection > 0)
283 {
284 connect_failure = throwable;
285 _newConnection--;
286 }
287 else if (_queue.size() > 0)
288 {
289 HttpExchange ex = _queue.remove(0);
290 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
291 ex.getEventListener().onConnectionFailed(throwable);
292
293
294
295 if (!_queue.isEmpty() && _client.isStarted())
296 startConnection = true;
297 }
298 }
299
300 if (startConnection)
301 startNewConnection();
302
303 if (connect_failure != null)
304 {
305 try
306 {
307 _newQueue.put(connect_failure);
308 }
309 catch (InterruptedException e)
310 {
311 Log.ignore(e);
312 }
313 }
314 }
315
316 public void onException(Throwable throwable)
317 {
318 synchronized (this)
319 {
320 _pendingConnections--;
321 if (_queue.size() > 0)
322 {
323 HttpExchange ex = _queue.remove(0);
324 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
325 ex.getEventListener().onException(throwable);
326 }
327 }
328 }
329
330 public void onNewConnection(final HttpConnection connection) throws IOException
331 {
332 Connection q_connection = null;
333
334 synchronized (this)
335 {
336 _pendingConnections--;
337 _connections.add(connection);
338
339 if (_newConnection > 0)
340 {
341 q_connection = connection;
342 _newConnection--;
343 }
344 else if (_queue.size() == 0)
345 {
346 connection.setIdleTimeout();
347 _idle.add(connection);
348 }
349 else
350 {
351 EndPoint endPoint = connection.getEndPoint();
352 if (isProxied() && endPoint instanceof SelectConnector.ProxySelectChannelEndPoint)
353 {
354 SelectConnector.ProxySelectChannelEndPoint proxyEndPoint = (SelectConnector.ProxySelectChannelEndPoint)endPoint;
355 HttpExchange exchange = _queue.get(0);
356 ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange);
357 connect.setAddress(getProxy());
358 send(connection, connect);
359 }
360 else
361 {
362 HttpExchange exchange = _queue.remove(0);
363 send(connection, exchange);
364 }
365 }
366 }
367
368 if (q_connection != null)
369 {
370 try
371 {
372 _newQueue.put(q_connection);
373 }
374 catch (InterruptedException e)
375 {
376 Log.ignore(e);
377 }
378 }
379 }
380
381 public void returnConnection(HttpConnection connection, boolean close) throws IOException
382 {
383 if (connection.isReserved())
384 connection.setReserved(false);
385
386 if (close)
387 {
388 try
389 {
390 connection.close();
391 }
392 catch (IOException e)
393 {
394 Log.ignore(e);
395 }
396 }
397
398 if (!_client.isStarted())
399 return;
400
401 if (!close && connection.getEndPoint().isOpen())
402 {
403 synchronized (this)
404 {
405 if (_queue.size() == 0)
406 {
407 connection.setIdleTimeout();
408 _idle.add(connection);
409 }
410 else
411 {
412 HttpExchange ex = _queue.remove(0);
413 send(connection, ex);
414 }
415 this.notifyAll();
416 }
417 }
418 else
419 {
420 boolean startConnection = false;
421 synchronized (this)
422 {
423 _connections.remove(connection);
424 if (!_queue.isEmpty())
425 startConnection = true;
426 }
427
428 if (startConnection)
429 startNewConnection();
430 }
431 }
432
433 public void returnIdleConnection(HttpConnection connection)
434 {
435 try
436 {
437 connection.close();
438 }
439 catch (IOException e)
440 {
441 Log.ignore(e);
442 }
443
444 boolean startConnection = false;
445 synchronized (this)
446 {
447 _idle.remove(connection);
448 _connections.remove(connection);
449
450 if (!_queue.isEmpty() && _client.isStarted())
451 startConnection = true;
452 }
453
454 if (startConnection)
455 startNewConnection();
456 }
457
458 public void send(HttpExchange ex) throws IOException
459 {
460 LinkedList<String> listeners = _client.getRegisteredListeners();
461
462 if (listeners != null)
463 {
464
465 for (int i = listeners.size(); i > 0; --i)
466 {
467 String listenerClass = listeners.get(i - 1);
468
469 try
470 {
471 Class listener = Class.forName(listenerClass);
472 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
473 HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
474 ex.setEventListener(elistener);
475 }
476 catch (Exception e)
477 {
478 e.printStackTrace();
479 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
480 }
481 }
482 }
483
484
485 if (_client.hasRealms())
486 {
487 ex.setEventListener(new SecurityListener(this, ex));
488 }
489
490 doSend(ex);
491 }
492
493 public void resend(HttpExchange ex) throws IOException
494 {
495 ex.getEventListener().onRetry();
496 ex.reset();
497 doSend(ex);
498 }
499
500 protected void doSend(HttpExchange ex) throws IOException
501 {
502
503
504 if (_cookies != null)
505 {
506 StringBuilder buf = null;
507 for (HttpCookie cookie : _cookies)
508 {
509 if (buf == null)
510 buf = new StringBuilder();
511 else
512 buf.append("; ");
513 buf.append(cookie.getName());
514 buf.append("=");
515 buf.append(cookie.getValue());
516 }
517 if (buf != null)
518 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
519 }
520
521
522 if (_authorizations != null)
523 {
524 Authentication auth = (Authentication)_authorizations.match(ex.getURI());
525 if (auth != null)
526 (auth).setCredentials(ex);
527 }
528
529
530
531 ex.scheduleTimeout(this);
532
533 HttpConnection connection = getIdleConnection();
534 if (connection != null)
535 {
536 send(connection, ex);
537 }
538 else
539 {
540 boolean startConnection = false;
541 synchronized (this)
542 {
543 if (_queue.size() == _maxQueueSize)
544 throw new RejectedExecutionException("Queue full for address " + _address);
545
546 _queue.add(ex);
547 if (_connections.size() + _pendingConnections < _maxConnections)
548 startConnection = true;
549 }
550
551 if (startConnection)
552 startNewConnection();
553 }
554 }
555
556 protected void exchangeExpired(HttpExchange exchange)
557 {
558
559
560 synchronized (this)
561 {
562 _queue.remove(exchange);
563 }
564 }
565
566 protected void send(HttpConnection connection, HttpExchange exchange) throws IOException
567 {
568 synchronized (this)
569 {
570
571
572 if (!connection.send(exchange))
573 {
574 if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
575 _queue.add(0, exchange);
576 returnIdleConnection(connection);
577 }
578 }
579 }
580
581 @Override
582 public synchronized String toString()
583 {
584 return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
585 }
586
587 public synchronized String toDetailString()
588 {
589 StringBuilder b = new StringBuilder();
590 b.append(toString());
591 b.append('\n');
592 synchronized (this)
593 {
594 for (HttpConnection connection : _connections)
595 {
596 b.append(connection.toDetailString());
597 if (_idle.contains(connection))
598 b.append(" IDLE");
599 b.append('\n');
600 }
601 }
602 b.append("--");
603 b.append('\n');
604
605 return b.toString();
606 }
607
608 public void setProxy(Address proxy)
609 {
610 _proxy = proxy;
611 }
612
613 public Address getProxy()
614 {
615 return _proxy;
616 }
617
618 public Authentication getProxyAuthentication()
619 {
620 return _proxyAuthentication;
621 }
622
623 public void setProxyAuthentication(Authentication authentication)
624 {
625 _proxyAuthentication = authentication;
626 }
627
628 public boolean isProxied()
629 {
630 return _proxy != null;
631 }
632
633 public void close() throws IOException
634 {
635 synchronized (this)
636 {
637 for (HttpConnection connection : _connections)
638 {
639 connection.close();
640 }
641 }
642 }
643
644
645
646
647
648 public String dump()
649 {
650 return AggregateLifeCycle.dump(this);
651 }
652
653
654
655
656
657 public void dump(Appendable out, String indent) throws IOException
658 {
659 synchronized (this)
660 {
661 out.append(String.valueOf(this)+"idle="+_idle.size()+" pending="+_pendingConnections).append("\n");
662 AggregateLifeCycle.dump(out,indent,_connections);
663 }
664 }
665
666 private class ConnectExchange extends ContentExchange
667 {
668 private final SelectConnector.ProxySelectChannelEndPoint proxyEndPoint;
669 private final HttpExchange exchange;
670
671 public ConnectExchange(Address serverAddress, SelectConnector.ProxySelectChannelEndPoint proxyEndPoint, HttpExchange exchange)
672 {
673 this.proxyEndPoint = proxyEndPoint;
674 this.exchange = exchange;
675 setMethod(HttpMethods.CONNECT);
676 setVersion(exchange.getVersion());
677 String serverHostAndPort = serverAddress.toString();
678 setURI(serverHostAndPort);
679 addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
680 addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
681 addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
682 }
683
684 @Override
685 protected void onResponseComplete() throws IOException
686 {
687 if (getResponseStatus() == HttpStatus.OK_200)
688 {
689 proxyEndPoint.upgrade();
690 }
691 else
692 {
693 onConnectionFailed(new ConnectException(exchange.getAddress().toString()));
694 }
695 }
696
697 @Override
698 protected void onConnectionFailed(Throwable x)
699 {
700 HttpDestination.this.onConnectionFailed(x);
701 }
702 }
703 }