1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.lang.reflect.Constructor;
18 import java.net.ProtocolException;
19 import java.util.ArrayList;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.concurrent.ArrayBlockingQueue;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.RejectedExecutionException;
25
26 import org.eclipse.jetty.client.HttpClient.Connector;
27 import org.eclipse.jetty.client.security.Authentication;
28 import org.eclipse.jetty.client.security.SecurityListener;
29 import org.eclipse.jetty.http.HttpCookie;
30 import org.eclipse.jetty.http.HttpHeaders;
31 import org.eclipse.jetty.http.HttpMethods;
32 import org.eclipse.jetty.http.HttpStatus;
33 import org.eclipse.jetty.http.PathMap;
34 import org.eclipse.jetty.io.Buffer;
35 import org.eclipse.jetty.io.ByteArrayBuffer;
36 import org.eclipse.jetty.io.Connection;
37 import org.eclipse.jetty.io.EndPoint;
38 import org.eclipse.jetty.util.component.AggregateLifeCycle;
39 import org.eclipse.jetty.util.component.Dumpable;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42
43
44
45
46 public class HttpDestination implements Dumpable
47 {
48 private static final Logger LOG = Log.getLogger(HttpDestination.class);
49
50 private final List<HttpExchange> _queue = new LinkedList<HttpExchange>();
51 private final List<AbstractHttpConnection> _connections = new LinkedList<AbstractHttpConnection>();
52 private final BlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
53 private final List<AbstractHttpConnection> _idle = new ArrayList<AbstractHttpConnection>();
54 private final HttpClient _client;
55 private final Address _address;
56 private final boolean _ssl;
57 private final ByteArrayBuffer _hostHeader;
58 private volatile int _maxConnections;
59 private volatile int _maxQueueSize;
60 private int _pendingConnections = 0;
61 private int _newConnection = 0;
62 private volatile Address _proxy;
63 private Authentication _proxyAuthentication;
64 private PathMap _authorizations;
65 private List<HttpCookie> _cookies;
66
67
68
69 HttpDestination(HttpClient client, Address address, boolean ssl)
70 {
71 _client = client;
72 _address = address;
73 _ssl = ssl;
74 _maxConnections = _client.getMaxConnectionsPerAddress();
75 _maxQueueSize = _client.getMaxQueueSizePerAddress();
76 String addressString = address.getHost();
77 if (address.getPort() != (_ssl ? 443 : 80))
78 addressString += ":" + address.getPort();
79 _hostHeader = new ByteArrayBuffer(addressString);
80 }
81
82 public HttpClient getHttpClient()
83 {
84 return _client;
85 }
86
87 public Address getAddress()
88 {
89 return _address;
90 }
91
92 public boolean isSecure()
93 {
94 return _ssl;
95 }
96
97 public Buffer getHostHeader()
98 {
99 return _hostHeader;
100 }
101
102 public int getMaxConnections()
103 {
104 return _maxConnections;
105 }
106
107 public void setMaxConnections(int maxConnections)
108 {
109 this._maxConnections = maxConnections;
110 }
111
112 public int getMaxQueueSize()
113 {
114 return _maxQueueSize;
115 }
116
117 public void setMaxQueueSize(int maxQueueSize)
118 {
119 this._maxQueueSize = maxQueueSize;
120 }
121
122 public int getConnections()
123 {
124 synchronized (this)
125 {
126 return _connections.size();
127 }
128 }
129
130 public int getIdleConnections()
131 {
132 synchronized (this)
133 {
134 return _idle.size();
135 }
136 }
137
138 public void addAuthorization(String pathSpec, Authentication authorization)
139 {
140 synchronized (this)
141 {
142 if (_authorizations == null)
143 _authorizations = new PathMap();
144 _authorizations.put(pathSpec, authorization);
145 }
146
147
148 }
149
150 public void addCookie(HttpCookie cookie)
151 {
152 synchronized (this)
153 {
154 if (_cookies == null)
155 _cookies = new ArrayList<HttpCookie>();
156 _cookies.add(cookie);
157 }
158
159
160 }
161
162
163
164
165
166
167
168
169
170
171 private AbstractHttpConnection getConnection(long timeout) throws IOException
172 {
173 AbstractHttpConnection connection = null;
174
175 while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
176 {
177 boolean startConnection = false;
178 synchronized (this)
179 {
180 int totalConnections = _connections.size() + _pendingConnections;
181 if (totalConnections < _maxConnections)
182 {
183 _newConnection++;
184 startConnection = true;
185 }
186 }
187
188 if (startConnection)
189 {
190 startNewConnection();
191 try
192 {
193 Object o = _newQueue.take();
194 if (o instanceof AbstractHttpConnection)
195 {
196 connection = (AbstractHttpConnection)o;
197 }
198 else
199 throw (IOException)o;
200 }
201 catch (InterruptedException e)
202 {
203 LOG.ignore(e);
204 }
205 }
206 else
207 {
208 try
209 {
210 Thread.currentThread();
211 Thread.sleep(200);
212 timeout -= 200;
213 }
214 catch (InterruptedException e)
215 {
216 LOG.ignore(e);
217 }
218 }
219 }
220 return connection;
221 }
222
223 public AbstractHttpConnection reserveConnection(long timeout) throws IOException
224 {
225 AbstractHttpConnection connection = getConnection(timeout);
226 if (connection != null)
227 connection.setReserved(true);
228 return connection;
229 }
230
231 public AbstractHttpConnection getIdleConnection() throws IOException
232 {
233 AbstractHttpConnection connection = null;
234 while (true)
235 {
236 synchronized (this)
237 {
238 if (connection != null)
239 {
240 _connections.remove(connection);
241 connection.close();
242 connection = null;
243 }
244 if (_idle.size() > 0)
245 connection = _idle.remove(_idle.size() - 1);
246 }
247
248 if (connection == null)
249 {
250 return null;
251 }
252
253
254
255 if (connection.cancelIdleTimeout())
256 {
257 return connection;
258 }
259 }
260 }
261
262 protected void startNewConnection()
263 {
264 try
265 {
266 synchronized (this)
267 {
268 _pendingConnections++;
269 }
270 final Connector connector = _client._connector;
271 if (connector != null)
272 connector.startConnection(this);
273 }
274 catch (Exception e)
275 {
276 LOG.debug(e);
277 onConnectionFailed(e);
278 }
279 }
280
281 public void onConnectionFailed(Throwable throwable)
282 {
283 Throwable connect_failure = null;
284
285 boolean startConnection = false;
286 synchronized (this)
287 {
288 _pendingConnections--;
289 if (_newConnection > 0)
290 {
291 connect_failure = throwable;
292 _newConnection--;
293 }
294 else if (_queue.size() > 0)
295 {
296 HttpExchange ex = _queue.remove(0);
297 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
298 ex.getEventListener().onConnectionFailed(throwable);
299
300
301
302 if (!_queue.isEmpty() && _client.isStarted())
303 startConnection = true;
304 }
305 }
306
307 if (startConnection)
308 startNewConnection();
309
310 if (connect_failure != null)
311 {
312 try
313 {
314 _newQueue.put(connect_failure);
315 }
316 catch (InterruptedException e)
317 {
318 LOG.ignore(e);
319 }
320 }
321 }
322
323 public void onException(Throwable throwable)
324 {
325 synchronized (this)
326 {
327 _pendingConnections--;
328 if (_queue.size() > 0)
329 {
330 HttpExchange ex = _queue.remove(0);
331 if(ex.setStatus(HttpExchange.STATUS_EXCEPTED))
332 ex.getEventListener().onException(throwable);
333 }
334 }
335 }
336
337 public void onNewConnection(final AbstractHttpConnection connection) throws IOException
338 {
339 Connection q_connection = null;
340
341 synchronized (this)
342 {
343 _pendingConnections--;
344 _connections.add(connection);
345
346 if (_newConnection > 0)
347 {
348 q_connection = connection;
349 _newConnection--;
350 }
351 else if (_queue.size() == 0)
352 {
353 connection.setIdleTimeout();
354 _idle.add(connection);
355 }
356 else
357 {
358 EndPoint endPoint = connection.getEndPoint();
359 if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint)
360 {
361 SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint;
362 HttpExchange exchange = _queue.get(0);
363 ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange);
364 connect.setAddress(getProxy());
365 send(connection, connect);
366 }
367 else
368 {
369 HttpExchange exchange = _queue.remove(0);
370 send(connection, exchange);
371 }
372 }
373 }
374
375 if (q_connection != null)
376 {
377 try
378 {
379 _newQueue.put(q_connection);
380 }
381 catch (InterruptedException e)
382 {
383 LOG.ignore(e);
384 }
385 }
386 }
387
388 public void returnConnection(AbstractHttpConnection connection, boolean close) throws IOException
389 {
390 if (connection.isReserved())
391 connection.setReserved(false);
392
393 if (close)
394 {
395 try
396 {
397 connection.close();
398 }
399 catch (IOException e)
400 {
401 LOG.ignore(e);
402 }
403 }
404
405 if (!_client.isStarted())
406 return;
407
408 if (!close && connection.getEndPoint().isOpen())
409 {
410 synchronized (this)
411 {
412 if (_queue.size() == 0)
413 {
414 connection.setIdleTimeout();
415 _idle.add(connection);
416 }
417 else
418 {
419 HttpExchange ex = _queue.remove(0);
420 send(connection, ex);
421 }
422 this.notifyAll();
423 }
424 }
425 else
426 {
427 boolean startConnection = false;
428 synchronized (this)
429 {
430 _connections.remove(connection);
431 if (!_queue.isEmpty())
432 startConnection = true;
433 }
434
435 if (startConnection)
436 startNewConnection();
437 }
438 }
439
440 public void returnIdleConnection(AbstractHttpConnection connection)
441 {
442
443 long idleForMs=connection!=null&&connection.getEndPoint()!=null?connection.getEndPoint().getMaxIdleTime():-1;
444 connection.onIdleExpired(idleForMs);
445
446 boolean startConnection = false;
447 synchronized (this)
448 {
449 _idle.remove(connection);
450 _connections.remove(connection);
451
452 if (!_queue.isEmpty() && _client.isStarted())
453 startConnection = true;
454 }
455
456 if (startConnection)
457 startNewConnection();
458 }
459
460 public void send(HttpExchange ex) throws IOException
461 {
462 LinkedList<String> listeners = _client.getRegisteredListeners();
463
464 if (listeners != null)
465 {
466
467 for (int i = listeners.size(); i > 0; --i)
468 {
469 String listenerClass = listeners.get(i - 1);
470
471 try
472 {
473 Class listener = Class.forName(listenerClass);
474 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
475 HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
476 ex.setEventListener(elistener);
477 }
478 catch (Exception e)
479 {
480 e.printStackTrace();
481 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
482 }
483 }
484 }
485
486
487 if (_client.hasRealms())
488 {
489 ex.setEventListener(new SecurityListener(this, ex));
490 }
491
492 doSend(ex);
493 }
494
495 public void resend(HttpExchange ex) throws IOException
496 {
497 ex.getEventListener().onRetry();
498 ex.reset();
499 doSend(ex);
500 }
501
502 protected void doSend(HttpExchange ex) throws IOException
503 {
504
505
506 if (_cookies != null)
507 {
508 StringBuilder buf = null;
509 for (HttpCookie cookie : _cookies)
510 {
511 if (buf == null)
512 buf = new StringBuilder();
513 else
514 buf.append("; ");
515 buf.append(cookie.getName());
516 buf.append("=");
517 buf.append(cookie.getValue());
518 }
519 if (buf != null)
520 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
521 }
522
523
524 if (_authorizations != null)
525 {
526 Authentication auth = (Authentication)_authorizations.match(ex.getRequestURI());
527 if (auth != null)
528 (auth).setCredentials(ex);
529 }
530
531
532
533 ex.scheduleTimeout(this);
534
535 AbstractHttpConnection connection = getIdleConnection();
536 if (connection != null)
537 {
538 send(connection, ex);
539 }
540 else
541 {
542 boolean startConnection = false;
543 synchronized (this)
544 {
545 if (_queue.size() == _maxQueueSize)
546 throw new RejectedExecutionException("Queue full for address " + _address);
547
548 _queue.add(ex);
549 if (_connections.size() + _pendingConnections < _maxConnections)
550 startConnection = true;
551 }
552
553 if (startConnection)
554 startNewConnection();
555 }
556 }
557
558 protected void exchangeExpired(HttpExchange exchange)
559 {
560
561
562 synchronized (this)
563 {
564 _queue.remove(exchange);
565 }
566 }
567
568 protected void send(AbstractHttpConnection connection, HttpExchange exchange) throws IOException
569 {
570 synchronized (this)
571 {
572
573
574 if (!connection.send(exchange))
575 {
576 if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
577 _queue.add(0, exchange);
578 returnIdleConnection(connection);
579 }
580 }
581 }
582
583 @Override
584 public synchronized String toString()
585 {
586 return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n",hashCode(),_address.getHost(),_address.getPort(),_connections.size(),_maxConnections,_idle.size(),_queue.size(),_maxQueueSize);
587 }
588
589 public synchronized String toDetailString()
590 {
591 StringBuilder b = new StringBuilder();
592 b.append(toString());
593 b.append('\n');
594 synchronized (this)
595 {
596 for (AbstractHttpConnection connection : _connections)
597 {
598 b.append(connection.toDetailString());
599 if (_idle.contains(connection))
600 b.append(" IDLE");
601 b.append('\n');
602 }
603 }
604 b.append("--");
605 b.append('\n');
606
607 return b.toString();
608 }
609
610 public void setProxy(Address proxy)
611 {
612 _proxy = proxy;
613 }
614
615 public Address getProxy()
616 {
617 return _proxy;
618 }
619
620 public Authentication getProxyAuthentication()
621 {
622 return _proxyAuthentication;
623 }
624
625 public void setProxyAuthentication(Authentication authentication)
626 {
627 _proxyAuthentication = authentication;
628 }
629
630 public boolean isProxied()
631 {
632 return _proxy != null;
633 }
634
635 public void close() throws IOException
636 {
637 synchronized (this)
638 {
639 for (AbstractHttpConnection connection : _connections)
640 {
641 connection.close();
642 }
643 }
644 }
645
646
647
648
649
650 public String dump()
651 {
652 return AggregateLifeCycle.dump(this);
653 }
654
655
656
657
658
659 public void dump(Appendable out, String indent) throws IOException
660 {
661 synchronized (this)
662 {
663 out.append(String.valueOf(this)+"idle="+_idle.size()+" pending="+_pendingConnections).append("\n");
664 AggregateLifeCycle.dump(out,indent,_connections);
665 }
666 }
667
668 private class ConnectExchange extends ContentExchange
669 {
670 private final SelectConnector.UpgradableEndPoint proxyEndPoint;
671 private final HttpExchange exchange;
672
673 public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint, HttpExchange exchange)
674 {
675 this.proxyEndPoint = proxyEndPoint;
676 this.exchange = exchange;
677 setMethod(HttpMethods.CONNECT);
678 setVersion(exchange.getVersion());
679 String serverHostAndPort = serverAddress.toString();
680 setRequestURI(serverHostAndPort);
681 addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
682 addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
683 addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
684 }
685
686 @Override
687 protected void onResponseComplete() throws IOException
688 {
689 int responseStatus = getResponseStatus();
690 if (responseStatus == HttpStatus.OK_200)
691 {
692 proxyEndPoint.upgrade();
693 }
694 else if(responseStatus == HttpStatus.GATEWAY_TIMEOUT_504)
695 {
696 onExpire();
697 }
698 else
699 {
700 onException(new ProtocolException("Proxy: " + proxyEndPoint.getRemoteAddr() +":" + proxyEndPoint.getRemotePort() + " didn't return http return code 200, but " + responseStatus + " while trying to request: " + exchange.getAddress().toString()));
701 }
702 }
703
704 @Override
705 protected void onConnectionFailed(Throwable x)
706 {
707 HttpDestination.this.onConnectionFailed(x);
708 }
709
710 @Override
711 protected void onException(Throwable x)
712 {
713 _queue.remove(exchange);
714 if (exchange.setStatus(STATUS_EXCEPTED))
715 exchange.getEventListener().onException(x);
716 }
717
718 @Override
719 protected void onExpire()
720 {
721 _queue.remove(exchange);
722 if (exchange.setStatus(STATUS_EXPIRED))
723 exchange.getEventListener().onExpire();
724 }
725
726 }
727 }