1
2
3
4
5
6
7
8
9
10
11
12
13 package org.eclipse.jetty.client;
14
15 import java.io.IOException;
16 import java.lang.reflect.Constructor;
17 import java.net.ConnectException;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22
23 import org.eclipse.jetty.client.HttpClient.Connector;
24 import org.eclipse.jetty.client.security.Authentication;
25 import org.eclipse.jetty.client.security.SecurityListener;
26 import org.eclipse.jetty.http.HttpCookie;
27 import org.eclipse.jetty.http.HttpHeaders;
28 import org.eclipse.jetty.http.HttpMethods;
29 import org.eclipse.jetty.http.HttpStatus;
30 import org.eclipse.jetty.http.PathMap;
31 import org.eclipse.jetty.io.Buffer;
32 import org.eclipse.jetty.io.ByteArrayBuffer;
33 import org.eclipse.jetty.io.EndPoint;
34 import org.eclipse.jetty.util.log.Log;
35
36
37
38
39
40 public class HttpDestination
41 {
42 private final ByteArrayBuffer _hostHeader;
43 private final Address _address;
44 private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
45 private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
46 private final HttpClient _client;
47 private final boolean _ssl;
48 private final int _maxConnections;
49 private int _pendingConnections = 0;
50 private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
51 private int _newConnection = 0;
52 private Address _proxy;
53 private Authentication _proxyAuthentication;
54 private PathMap _authorizations;
55 private List<HttpCookie> _cookies;
56
57 public void dump() throws IOException
58 {
59 synchronized (this)
60 {
61 Log.info(this.toString());
62 Log.info("connections=" + _connections.size());
63 Log.info("idle=" + _idle.size());
64 Log.info("pending=" + _pendingConnections);
65 for (HttpConnection c : _connections)
66 {
67 if (!c.isIdle())
68 c.dump();
69 }
70 }
71 }
72
73
74 private LinkedList<HttpExchange> _queue = new LinkedList<HttpExchange>();
75
76 HttpDestination(HttpClient client, Address address, boolean ssl, int maxConnections)
77 {
78 _client = client;
79 _address = address;
80 _ssl = ssl;
81 _maxConnections = maxConnections;
82 String addressString = address.getHost();
83 if (address.getPort() != (_ssl ? 443 : 80)) addressString += ":" + address.getPort();
84 _hostHeader = new ByteArrayBuffer(addressString);
85 }
86
87 public Address getAddress()
88 {
89 return _address;
90 }
91
92 public Buffer getHostHeader()
93 {
94 return _hostHeader;
95 }
96
97 public HttpClient getHttpClient()
98 {
99 return _client;
100 }
101
102 public boolean isSecure()
103 {
104 return _ssl;
105 }
106
107 public int getConnections()
108 {
109 synchronized (this)
110 {
111 return _connections.size();
112 }
113 }
114
115 public int getIdleConnections()
116 {
117 synchronized (this)
118 {
119 return _idle.size();
120 }
121 }
122
123 public void addAuthorization(String pathSpec, Authentication authorization)
124 {
125 synchronized (this)
126 {
127 if (_authorizations == null)
128 _authorizations = new PathMap();
129 _authorizations.put(pathSpec, authorization);
130 }
131
132
133 }
134
135 public void addCookie(HttpCookie cookie)
136 {
137 synchronized (this)
138 {
139 if (_cookies == null)
140 _cookies = new ArrayList<HttpCookie>();
141 _cookies.add(cookie);
142 }
143
144
145 }
146
147
148
149
150
151
152
153
154
155 private HttpConnection getConnection(long timeout) throws IOException
156 {
157 HttpConnection connection = null;
158
159 while ((connection == null) && (connection = getIdleConnection()) == null && timeout>0)
160 {
161 boolean starting = false;
162 synchronized (this)
163 {
164 int totalConnections = _connections.size() + _pendingConnections;
165 if (totalConnections < _maxConnections)
166 {
167 _newConnection++;
168 startNewConnection();
169 starting = true;
170 }
171 }
172
173 if (!starting)
174 {
175 try
176 {
177 Thread.currentThread();
178 Thread.sleep(200);
179 timeout-=200;
180 }
181 catch (InterruptedException e)
182 {
183 Log.ignore(e);
184 }
185 }
186 else
187 {
188 try
189 {
190 Object o = _newQueue.take();
191 if (o instanceof HttpConnection)
192 {
193 connection = (HttpConnection)o;
194 }
195 else
196 throw (IOException)o;
197 }
198 catch (InterruptedException e)
199 {
200 Log.ignore(e);
201 }
202 }
203 }
204 return connection;
205 }
206
207 public HttpConnection reserveConnection(long timeout) throws IOException
208 {
209 HttpConnection connection = getConnection(timeout);
210 if (connection != null)
211 connection.setReserved(true);
212 return connection;
213 }
214
215 public HttpConnection getIdleConnection() throws IOException
216 {
217 HttpConnection connection = null;
218 while (true)
219 {
220 synchronized (this)
221 {
222 if (connection!=null)
223 {
224 _connections.remove(connection);
225 connection.close();
226 connection=null;
227 }
228 if (_idle.size() > 0)
229 connection = _idle.remove(_idle.size()-1);
230 }
231
232 if (connection==null)
233 return null;
234
235
236
237 if (connection.cancelIdleTimeout())
238 return connection;
239 }
240 }
241
242 protected void startNewConnection()
243 {
244 try
245 {
246 synchronized (this)
247 {
248 _pendingConnections++;
249 }
250 final Connector connector=_client._connector;
251 if (connector!=null)
252 connector.startConnection(this);
253 }
254 catch (Exception e)
255 {
256 Log.debug(e);
257 onConnectionFailed(e);
258 }
259 }
260
261 public void onConnectionFailed(Throwable throwable)
262 {
263 Throwable connect_failure = null;
264
265 synchronized (this)
266 {
267 _pendingConnections--;
268 if (_newConnection > 0)
269 {
270 connect_failure = throwable;
271 _newConnection--;
272 }
273 else if (_queue.size() > 0)
274 {
275 HttpExchange ex = _queue.removeFirst();
276 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
277 ex.getEventListener().onConnectionFailed(throwable);
278
279
280
281 if (!_queue.isEmpty() && _client.isStarted())
282 startNewConnection();
283 }
284 }
285
286 if (connect_failure != null)
287 {
288 try
289 {
290 _newQueue.put(connect_failure);
291 }
292 catch (InterruptedException e)
293 {
294 Log.ignore(e);
295 }
296 }
297 }
298
299 public void onException(Throwable throwable)
300 {
301 synchronized (this)
302 {
303 _pendingConnections--;
304 if (_queue.size() > 0)
305 {
306 HttpExchange ex = _queue.removeFirst();
307 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
308 ex.getEventListener().onException(throwable);
309 }
310 }
311 }
312
313 public void onNewConnection(final HttpConnection connection) throws IOException
314 {
315 HttpConnection q_connection = null;
316
317 synchronized (this)
318 {
319 _pendingConnections--;
320 _connections.add(connection);
321
322 if (_newConnection > 0)
323 {
324 q_connection = connection;
325 _newConnection--;
326 }
327 else if (_queue.size() == 0)
328 {
329 connection.setIdleTimeout();
330 _idle.add(connection);
331 }
332 else
333 {
334 EndPoint endPoint = connection.getEndPoint();
335 if (isProxied() && endPoint instanceof SelectConnector.ProxySelectChannelEndPoint)
336 {
337 SelectConnector.ProxySelectChannelEndPoint proxyEndPoint = (SelectConnector.ProxySelectChannelEndPoint)endPoint;
338 HttpExchange exchange = _queue.get(0);
339 ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange);
340 connect.setAddress(getProxy());
341 send(connection, connect);
342 }
343 else
344 {
345 HttpExchange exchange = _queue.removeFirst();
346 send(connection, exchange);
347 }
348 }
349 }
350
351 if (q_connection != null)
352 {
353 try
354 {
355 _newQueue.put(q_connection);
356 }
357 catch (InterruptedException e)
358 {
359 Log.ignore(e);
360 }
361 }
362 }
363
364 public void returnConnection(HttpConnection connection, boolean close) throws IOException
365 {
366 if (connection.isReserved())
367 connection.setReserved(false);
368
369 if (close)
370 {
371 try
372 {
373 connection.close();
374 }
375 catch (IOException e)
376 {
377 Log.ignore(e);
378 }
379 }
380
381 if (!_client.isStarted())
382 return;
383
384 if (!close && connection.getEndPoint().isOpen())
385 {
386 synchronized (this)
387 {
388 if (_queue.size() == 0)
389 {
390 connection.setIdleTimeout();
391 _idle.add(connection);
392 }
393 else
394 {
395 HttpExchange ex = _queue.removeFirst();
396 send(connection, ex);
397 }
398 this.notifyAll();
399 }
400 }
401 else
402 {
403 synchronized (this)
404 {
405 _connections.remove(connection);
406 if (!_queue.isEmpty())
407 startNewConnection();
408 }
409 }
410 }
411
412 public void returnIdleConnection(HttpConnection connection)
413 {
414 try
415 {
416 connection.close();
417 }
418 catch (IOException e)
419 {
420 Log.ignore(e);
421 }
422
423 synchronized (this)
424 {
425 _idle.remove(connection);
426 _connections.remove(connection);
427
428 if (!_queue.isEmpty() && _client.isStarted())
429 startNewConnection();
430 }
431 }
432
433 public void send(HttpExchange ex) throws IOException
434 {
435 LinkedList<String> listeners = _client.getRegisteredListeners();
436
437 if (listeners != null)
438 {
439
440 for (int i = listeners.size(); i > 0; --i)
441 {
442 String listenerClass = listeners.get(i - 1);
443
444 try
445 {
446 Class listener = Class.forName(listenerClass);
447 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
448 HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
449 ex.setEventListener(elistener);
450 }
451 catch (Exception e)
452 {
453 e.printStackTrace();
454 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
455 }
456 }
457 }
458
459
460 if (_client.hasRealms())
461 {
462 ex.setEventListener(new SecurityListener(this, ex));
463 }
464
465 doSend(ex);
466 }
467
468 public void resend(HttpExchange ex) throws IOException
469 {
470 ex.getEventListener().onRetry();
471 ex.reset();
472 doSend(ex);
473 }
474
475 protected void doSend(HttpExchange ex) throws IOException
476 {
477
478
479 if (_cookies != null)
480 {
481 StringBuilder buf = null;
482 for (HttpCookie cookie : _cookies)
483 {
484 if (buf == null)
485 buf = new StringBuilder();
486 else
487 buf.append("; ");
488 buf.append(cookie.getName());
489 buf.append("=");
490 buf.append(cookie.getValue());
491 }
492 if (buf != null)
493 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
494 }
495
496
497 if (_authorizations != null)
498 {
499 Authentication auth = (Authentication)_authorizations.match(ex.getURI());
500 if (auth != null)
501 (auth).setCredentials(ex);
502 }
503
504 HttpConnection connection = getIdleConnection();
505 if (connection != null)
506 {
507 send(connection, ex);
508 }
509 else
510 {
511 synchronized (this)
512 {
513 _queue.add(ex);
514 if (_connections.size() + _pendingConnections < _maxConnections)
515 {
516 startNewConnection();
517 }
518 }
519 }
520 }
521
522 protected void send(HttpConnection connection, HttpExchange exchange) throws IOException
523 {
524 synchronized (this)
525 {
526
527
528 if(!connection.send(exchange))
529 {
530 _queue.addFirst(exchange);
531 returnIdleConnection(connection);
532 }
533 }
534 }
535
536 @Override
537 public synchronized String toString()
538 {
539 return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
540 }
541
542 public synchronized String toDetailString()
543 {
544 StringBuilder b = new StringBuilder();
545 b.append(toString());
546 b.append('\n');
547 synchronized (this)
548 {
549 for (HttpConnection connection : _connections)
550 {
551 b.append(connection.toDetailString());
552 if (_idle.contains(connection))
553 b.append(" IDLE");
554 b.append('\n');
555 }
556 }
557 b.append("--");
558 b.append('\n');
559
560 return b.toString();
561 }
562
563 public void setProxy(Address proxy)
564 {
565 _proxy = proxy;
566 }
567
568 public Address getProxy()
569 {
570 return _proxy;
571 }
572
573 public Authentication getProxyAuthentication()
574 {
575 return _proxyAuthentication;
576 }
577
578 public void setProxyAuthentication(Authentication authentication)
579 {
580 _proxyAuthentication = authentication;
581 }
582
583 public boolean isProxied()
584 {
585 return _proxy != null;
586 }
587
588 public void close() throws IOException
589 {
590 synchronized (this)
591 {
592 for (HttpConnection connection : _connections)
593 {
594 connection.close();
595 }
596 }
597 }
598
599 private class ConnectExchange extends ContentExchange
600 {
601 private final SelectConnector.ProxySelectChannelEndPoint proxyEndPoint;
602 private final HttpExchange exchange;
603
604 public ConnectExchange(Address serverAddress, SelectConnector.ProxySelectChannelEndPoint proxyEndPoint, HttpExchange exchange)
605 {
606 this.proxyEndPoint = proxyEndPoint;
607 this.exchange = exchange;
608 setMethod(HttpMethods.CONNECT);
609 String serverHostAndPort = serverAddress.toString();
610 setURI(serverHostAndPort);
611 addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
612 addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
613 addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
614 }
615
616 @Override
617 protected void onResponseComplete() throws IOException
618 {
619 if (getResponseStatus() == HttpStatus.OK_200)
620 {
621 proxyEndPoint.upgrade();
622 }
623 else
624 {
625 onConnectionFailed(new ConnectException(exchange.getAddress().toString()));
626 }
627 }
628
629 @Override
630 protected void onConnectionFailed(Throwable x)
631 {
632 HttpDestination.this.onConnectionFailed(x);
633 }
634 }
635 }