1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.lang.reflect.Constructor;
18 import java.net.ConnectException;
19 import java.util.ArrayList;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.concurrent.ArrayBlockingQueue;
23
24 import org.eclipse.jetty.client.HttpClient.Connector;
25 import org.eclipse.jetty.client.security.Authentication;
26 import org.eclipse.jetty.client.security.SecurityListener;
27 import org.eclipse.jetty.http.HttpCookie;
28 import org.eclipse.jetty.http.HttpHeaders;
29 import org.eclipse.jetty.http.HttpMethods;
30 import org.eclipse.jetty.http.HttpStatus;
31 import org.eclipse.jetty.http.PathMap;
32 import org.eclipse.jetty.io.Buffer;
33 import org.eclipse.jetty.io.ByteArrayBuffer;
34 import org.eclipse.jetty.io.Connection;
35 import org.eclipse.jetty.io.EndPoint;
36 import org.eclipse.jetty.util.log.Log;
37
38
39
40
41 public class HttpDestination
42 {
43 private final ByteArrayBuffer _hostHeader;
44 private final Address _address;
45 private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
46 private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
47 private final HttpClient _client;
48 private final boolean _ssl;
49 private final int _maxConnections;
50 private int _pendingConnections = 0;
51 private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
52 private int _newConnection = 0;
53 private Address _proxy;
54 private Authentication _proxyAuthentication;
55 private PathMap _authorizations;
56 private List<HttpCookie> _cookies;
57
58 public void dump() throws IOException
59 {
60 synchronized (this)
61 {
62 Log.info(this.toString());
63 Log.info("connections=" + _connections.size());
64 Log.info("idle=" + _idle.size());
65 Log.info("pending=" + _pendingConnections);
66 for (HttpConnection c : _connections)
67 {
68 if (!c.isIdle())
69 c.dump();
70 }
71 }
72 }
73
74
75 private LinkedList<HttpExchange> _queue = new LinkedList<HttpExchange>();
76
77 HttpDestination(HttpClient client, Address address, boolean ssl, int maxConnections)
78 {
79 _client = client;
80 _address = address;
81 _ssl = ssl;
82 _maxConnections = maxConnections;
83 String addressString = address.getHost();
84 if (address.getPort() != (_ssl ? 443 : 80))
85 addressString += ":" + address.getPort();
86 _hostHeader = new ByteArrayBuffer(addressString);
87 }
88
89 public Address getAddress()
90 {
91 return _address;
92 }
93
94 public Buffer getHostHeader()
95 {
96 return _hostHeader;
97 }
98
99 public HttpClient getHttpClient()
100 {
101 return _client;
102 }
103
104 public boolean isSecure()
105 {
106 return _ssl;
107 }
108
109 public int getConnections()
110 {
111 synchronized (this)
112 {
113 return _connections.size();
114 }
115 }
116
117 public int getIdleConnections()
118 {
119 synchronized (this)
120 {
121 return _idle.size();
122 }
123 }
124
125 public void addAuthorization(String pathSpec, Authentication authorization)
126 {
127 synchronized (this)
128 {
129 if (_authorizations == null)
130 _authorizations = new PathMap();
131 _authorizations.put(pathSpec, authorization);
132 }
133
134
135 }
136
137 public void addCookie(HttpCookie cookie)
138 {
139 synchronized (this)
140 {
141 if (_cookies == null)
142 _cookies = new ArrayList<HttpCookie>();
143 _cookies.add(cookie);
144 }
145
146
147 }
148
149
150
151
152
153
154
155
156
157
158 private HttpConnection getConnection(long timeout) throws IOException
159 {
160 HttpConnection connection = null;
161
162 while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
163 {
164 boolean startConnection = false;
165 synchronized (this)
166 {
167 int totalConnections = _connections.size() + _pendingConnections;
168 if (totalConnections < _maxConnections)
169 {
170 _newConnection++;
171 startConnection = true;
172 }
173 }
174
175 if (startConnection)
176 {
177 startNewConnection();
178 try
179 {
180 Object o = _newQueue.take();
181 if (o instanceof HttpConnection)
182 {
183 connection = (HttpConnection)o;
184 }
185 else
186 throw (IOException)o;
187 }
188 catch (InterruptedException e)
189 {
190 Log.ignore(e);
191 }
192 }
193 else
194 {
195 try
196 {
197 Thread.currentThread();
198 Thread.sleep(200);
199 timeout -= 200;
200 }
201 catch (InterruptedException e)
202 {
203 Log.ignore(e);
204 }
205 }
206 }
207 return connection;
208 }
209
210 public HttpConnection reserveConnection(long timeout) throws IOException
211 {
212 HttpConnection connection = getConnection(timeout);
213 if (connection != null)
214 connection.setReserved(true);
215 return connection;
216 }
217
218 public HttpConnection getIdleConnection() throws IOException
219 {
220 HttpConnection connection = null;
221 while (true)
222 {
223 synchronized (this)
224 {
225 if (connection != null)
226 {
227 _connections.remove(connection);
228 connection.close();
229 connection = null;
230 }
231 if (_idle.size() > 0)
232 connection = _idle.remove(_idle.size() - 1);
233 }
234
235 if (connection == null)
236 return null;
237
238
239
240 if (connection.cancelIdleTimeout())
241 return connection;
242 }
243 }
244
245 protected void startNewConnection()
246 {
247 try
248 {
249 synchronized (this)
250 {
251 _pendingConnections++;
252 }
253 final Connector connector = _client._connector;
254 if (connector != null)
255 connector.startConnection(this);
256 }
257 catch (Exception e)
258 {
259 Log.debug(e);
260 onConnectionFailed(e);
261 }
262 }
263
264 public void onConnectionFailed(Throwable throwable)
265 {
266 Throwable connect_failure = null;
267
268 boolean startConnection = false;
269 synchronized (this)
270 {
271 _pendingConnections--;
272 if (_newConnection > 0)
273 {
274 connect_failure = throwable;
275 _newConnection--;
276 }
277 else if (_queue.size() > 0)
278 {
279 HttpExchange ex = _queue.removeFirst();
280 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
281 ex.getEventListener().onConnectionFailed(throwable);
282
283
284
285 if (!_queue.isEmpty() && _client.isStarted())
286 startConnection = true;
287 }
288 }
289
290 if (startConnection)
291 startNewConnection();
292
293 if (connect_failure != null)
294 {
295 try
296 {
297 _newQueue.put(connect_failure);
298 }
299 catch (InterruptedException e)
300 {
301 Log.ignore(e);
302 }
303 }
304 }
305
306 public void onException(Throwable throwable)
307 {
308 synchronized (this)
309 {
310 _pendingConnections--;
311 if (_queue.size() > 0)
312 {
313 HttpExchange ex = _queue.removeFirst();
314 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
315 ex.getEventListener().onException(throwable);
316 }
317 }
318 }
319
320 public void onNewConnection(final HttpConnection connection) throws IOException
321 {
322 Connection q_connection = null;
323
324 synchronized (this)
325 {
326 _pendingConnections--;
327 _connections.add(connection);
328
329 if (_newConnection > 0)
330 {
331 q_connection = connection;
332 _newConnection--;
333 }
334 else if (_queue.size() == 0)
335 {
336 connection.setIdleTimeout();
337 _idle.add(connection);
338 }
339 else
340 {
341 EndPoint endPoint = connection.getEndPoint();
342 if (isProxied() && endPoint instanceof SelectConnector.ProxySelectChannelEndPoint)
343 {
344 SelectConnector.ProxySelectChannelEndPoint proxyEndPoint = (SelectConnector.ProxySelectChannelEndPoint)endPoint;
345 HttpExchange exchange = _queue.get(0);
346 ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange);
347 connect.setAddress(getProxy());
348 send(connection, connect);
349 }
350 else
351 {
352 HttpExchange exchange = _queue.removeFirst();
353 send(connection, exchange);
354 }
355 }
356 }
357
358 if (q_connection != null)
359 {
360 try
361 {
362 _newQueue.put(q_connection);
363 }
364 catch (InterruptedException e)
365 {
366 Log.ignore(e);
367 }
368 }
369 }
370
371 public void returnConnection(HttpConnection connection, boolean close) throws IOException
372 {
373 if (connection.isReserved())
374 connection.setReserved(false);
375
376 if (close)
377 {
378 try
379 {
380 connection.close();
381 }
382 catch (IOException e)
383 {
384 Log.ignore(e);
385 }
386 }
387
388 if (!_client.isStarted())
389 return;
390
391 if (!close && connection.getEndPoint().isOpen())
392 {
393 synchronized (this)
394 {
395 if (_queue.size() == 0)
396 {
397 connection.setIdleTimeout();
398 _idle.add(connection);
399 }
400 else
401 {
402 HttpExchange ex = _queue.removeFirst();
403 send(connection, ex);
404 }
405 this.notifyAll();
406 }
407 }
408 else
409 {
410 boolean startConnection = false;
411 synchronized (this)
412 {
413 _connections.remove(connection);
414 if (!_queue.isEmpty())
415 startConnection = true;
416 }
417
418 if (startConnection)
419 startNewConnection();
420 }
421 }
422
423 public void returnIdleConnection(HttpConnection connection)
424 {
425 try
426 {
427 connection.close();
428 }
429 catch (IOException e)
430 {
431 Log.ignore(e);
432 }
433
434 boolean startConnection = false;
435 synchronized (this)
436 {
437 _idle.remove(connection);
438 _connections.remove(connection);
439
440 if (!_queue.isEmpty() && _client.isStarted())
441 startConnection = true;
442 }
443
444 if (startConnection)
445 startNewConnection();
446 }
447
448 public void send(HttpExchange ex) throws IOException
449 {
450 LinkedList<String> listeners = _client.getRegisteredListeners();
451
452 if (listeners != null)
453 {
454
455 for (int i = listeners.size(); i > 0; --i)
456 {
457 String listenerClass = listeners.get(i - 1);
458
459 try
460 {
461 Class listener = Class.forName(listenerClass);
462 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
463 HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
464 ex.setEventListener(elistener);
465 }
466 catch (Exception e)
467 {
468 e.printStackTrace();
469 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
470 }
471 }
472 }
473
474
475 if (_client.hasRealms())
476 {
477 ex.setEventListener(new SecurityListener(this, ex));
478 }
479
480 doSend(ex);
481 }
482
483 public void resend(HttpExchange ex) throws IOException
484 {
485 ex.getEventListener().onRetry();
486 ex.reset();
487 doSend(ex);
488 }
489
490 protected void doSend(HttpExchange ex) throws IOException
491 {
492
493
494 if (_cookies != null)
495 {
496 StringBuilder buf = null;
497 for (HttpCookie cookie : _cookies)
498 {
499 if (buf == null)
500 buf = new StringBuilder();
501 else
502 buf.append("; ");
503 buf.append(cookie.getName());
504 buf.append("=");
505 buf.append(cookie.getValue());
506 }
507 if (buf != null)
508 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
509 }
510
511
512 if (_authorizations != null)
513 {
514 Authentication auth = (Authentication)_authorizations.match(ex.getURI());
515 if (auth != null)
516 (auth).setCredentials(ex);
517 }
518
519 HttpConnection connection = getIdleConnection();
520 if (connection != null)
521 {
522 send(connection, ex);
523 }
524 else
525 {
526 boolean startConnection = false;
527 synchronized (this)
528 {
529 _queue.add(ex);
530 if (_connections.size() + _pendingConnections < _maxConnections)
531 startConnection = true;
532 }
533
534 if (startConnection)
535 startNewConnection();
536 }
537 }
538
539 protected void send(HttpConnection connection, HttpExchange exchange) throws IOException
540 {
541 synchronized (this)
542 {
543
544
545 if (!connection.send(exchange))
546 {
547 _queue.addFirst(exchange);
548 returnIdleConnection(connection);
549 }
550 }
551 }
552
553 @Override
554 public synchronized String toString()
555 {
556 return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
557 }
558
559 public synchronized String toDetailString()
560 {
561 StringBuilder b = new StringBuilder();
562 b.append(toString());
563 b.append('\n');
564 synchronized (this)
565 {
566 for (HttpConnection connection : _connections)
567 {
568 b.append(connection.toDetailString());
569 if (_idle.contains(connection))
570 b.append(" IDLE");
571 b.append('\n');
572 }
573 }
574 b.append("--");
575 b.append('\n');
576
577 return b.toString();
578 }
579
580 public void setProxy(Address proxy)
581 {
582 _proxy = proxy;
583 }
584
585 public Address getProxy()
586 {
587 return _proxy;
588 }
589
590 public Authentication getProxyAuthentication()
591 {
592 return _proxyAuthentication;
593 }
594
595 public void setProxyAuthentication(Authentication authentication)
596 {
597 _proxyAuthentication = authentication;
598 }
599
600 public boolean isProxied()
601 {
602 return _proxy != null;
603 }
604
605 public void close() throws IOException
606 {
607 synchronized (this)
608 {
609 for (HttpConnection connection : _connections)
610 {
611 connection.close();
612 }
613 }
614 }
615
616 private class ConnectExchange extends ContentExchange
617 {
618 private final SelectConnector.ProxySelectChannelEndPoint proxyEndPoint;
619 private final HttpExchange exchange;
620
621 public ConnectExchange(Address serverAddress, SelectConnector.ProxySelectChannelEndPoint proxyEndPoint, HttpExchange exchange)
622 {
623 this.proxyEndPoint = proxyEndPoint;
624 this.exchange = exchange;
625 setMethod(HttpMethods.CONNECT);
626 String serverHostAndPort = serverAddress.toString();
627 setURI(serverHostAndPort);
628 addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
629 addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
630 addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
631 }
632
633 @Override
634 protected void onResponseComplete() throws IOException
635 {
636 if (getResponseStatus() == HttpStatus.OK_200)
637 {
638 proxyEndPoint.upgrade();
639 }
640 else
641 {
642 onConnectionFailed(new ConnectException(exchange.getAddress().toString()));
643 }
644 }
645
646 @Override
647 protected void onConnectionFailed(Throwable x)
648 {
649 HttpDestination.this.onConnectionFailed(x);
650 }
651 }
652 }