1
2
3
4
5
6
7
8
9
10
11
12
13 package org.eclipse.jetty.client;
14
15
16 import java.io.IOException;
17 import java.lang.reflect.Constructor;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22
23 import org.eclipse.jetty.client.HttpClient.Connector;
24 import org.eclipse.jetty.client.security.Authentication;
25 import org.eclipse.jetty.client.security.SecurityListener;
26 import org.eclipse.jetty.http.HttpCookie;
27 import org.eclipse.jetty.http.HttpHeaders;
28 import org.eclipse.jetty.http.PathMap;
29 import org.eclipse.jetty.io.Buffer;
30 import org.eclipse.jetty.io.ByteArrayBuffer;
31 import org.eclipse.jetty.util.log.Log;
32
33
34
35
36
37 public class HttpDestination
38 {
39 private final ByteArrayBuffer _hostHeader;
40 private final Address _address;
41 private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
42 private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
43 private final HttpClient _client;
44 private final boolean _ssl;
45 private final int _maxConnections;
46 private int _pendingConnections = 0;
47 private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
48 private int _newConnection = 0;
49 private Address _proxy;
50 private Authentication _proxyAuthentication;
51 private PathMap _authorizations;
52 private List<HttpCookie> _cookies;
53
54 public void dump() throws IOException
55 {
56 synchronized (this)
57 {
58 Log.info(this.toString());
59 Log.info("connections=" + _connections.size());
60 Log.info("idle=" + _idle.size());
61 Log.info("pending=" + _pendingConnections);
62 for (HttpConnection c : _connections)
63 {
64 if (!c.isIdle())
65 c.dump();
66 }
67 }
68 }
69
70
71 private LinkedList<HttpExchange> _queue = new LinkedList<HttpExchange>();
72
73 HttpDestination(HttpClient client, Address address, boolean ssl, int maxConnections)
74 {
75 _client = client;
76 _address = address;
77 _ssl = ssl;
78 _maxConnections = maxConnections;
79 String addressString = address.getHost();
80 if (address.getPort() != (_ssl ? 443 : 80)) addressString += ":" + address.getPort();
81 _hostHeader = new ByteArrayBuffer(addressString);
82 }
83
84 public Address getAddress()
85 {
86 return _address;
87 }
88
89 public Buffer getHostHeader()
90 {
91 return _hostHeader;
92 }
93
94 public HttpClient getHttpClient()
95 {
96 return _client;
97 }
98
99 public boolean isSecure()
100 {
101 return _ssl;
102 }
103
104 public int getConnections()
105 {
106 synchronized (this)
107 {
108 return _connections.size();
109 }
110 }
111
112 public int getIdleConnections()
113 {
114 synchronized (this)
115 {
116 return _idle.size();
117 }
118 }
119
120 public void addAuthorization(String pathSpec, Authentication authorization)
121 {
122 synchronized (this)
123 {
124 if (_authorizations == null)
125 _authorizations = new PathMap();
126 _authorizations.put(pathSpec, authorization);
127 }
128
129
130 }
131
132 public void addCookie(HttpCookie cookie)
133 {
134 synchronized (this)
135 {
136 if (_cookies == null)
137 _cookies = new ArrayList<HttpCookie>();
138 _cookies.add(cookie);
139 }
140
141
142 }
143
144
145
146
147
148
149
150
151
152 private HttpConnection getConnection(long timeout) throws IOException
153 {
154 HttpConnection connection = null;
155
156 while ((connection == null) && (connection = getIdleConnection()) == null && timeout>0)
157 {
158 boolean starting = false;
159 synchronized (this)
160 {
161 int totalConnections = _connections.size() + _pendingConnections;
162 if (totalConnections < _maxConnections)
163 {
164 _newConnection++;
165 startNewConnection();
166 starting = true;
167 }
168 }
169
170 if (!starting)
171 {
172 try
173 {
174 Thread.currentThread();
175 Thread.sleep(200);
176 timeout-=200;
177 }
178 catch (InterruptedException e)
179 {
180 Log.ignore(e);
181 }
182 }
183 else
184 {
185 try
186 {
187 Object o = _newQueue.take();
188 if (o instanceof HttpConnection)
189 {
190 connection = (HttpConnection)o;
191 }
192 else
193 throw (IOException)o;
194 }
195 catch (InterruptedException e)
196 {
197 Log.ignore(e);
198 }
199 }
200 }
201 return connection;
202 }
203
204 public HttpConnection reserveConnection(long timeout) throws IOException
205 {
206 HttpConnection connection = getConnection(timeout);
207 if (connection != null)
208 connection.setReserved(true);
209 return connection;
210 }
211
212 public HttpConnection getIdleConnection() throws IOException
213 {
214 HttpConnection connection = null;
215 while (true)
216 {
217 synchronized (this)
218 {
219 if (connection!=null)
220 {
221 _connections.remove(connection);
222 connection.close();
223 connection=null;
224 }
225 if (_idle.size() > 0)
226 connection = _idle.remove(_idle.size()-1);
227 }
228
229 if (connection==null)
230 return null;
231
232
233
234 if (connection.cancelIdleTimeout())
235 return connection;
236 }
237 }
238
239 protected void startNewConnection()
240 {
241 try
242 {
243 synchronized (this)
244 {
245 _pendingConnections++;
246 }
247 final Connector connector=_client._connector;
248 if (connector!=null)
249 connector.startConnection(this);
250 }
251 catch (Exception e)
252 {
253 Log.debug(e);
254 onConnectionFailed(e);
255 }
256 }
257
258 public void onConnectionFailed(Throwable throwable)
259 {
260 Throwable connect_failure = null;
261
262 synchronized (this)
263 {
264 _pendingConnections--;
265 if (_newConnection > 0)
266 {
267 connect_failure = throwable;
268 _newConnection--;
269 }
270 else if (_queue.size() > 0)
271 {
272 HttpExchange ex = _queue.removeFirst();
273 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
274 ex.getEventListener().onConnectionFailed(throwable);
275
276
277
278 if (!_queue.isEmpty() && _client.isStarted())
279 startNewConnection();
280 }
281 }
282
283 if (connect_failure != null)
284 {
285 try
286 {
287 _newQueue.put(connect_failure);
288 }
289 catch (InterruptedException e)
290 {
291 Log.ignore(e);
292 }
293 }
294 }
295
296 public void onException(Throwable throwable)
297 {
298 synchronized (this)
299 {
300 _pendingConnections--;
301 if (_queue.size() > 0)
302 {
303 HttpExchange ex = _queue.removeFirst();
304 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
305 ex.getEventListener().onException(throwable);
306 }
307 }
308 }
309
310 public void onNewConnection(HttpConnection connection) throws IOException
311 {
312 HttpConnection q_connection = null;
313
314 synchronized (this)
315 {
316 _pendingConnections--;
317 _connections.add(connection);
318
319 if (_newConnection > 0)
320 {
321 q_connection = connection;
322 _newConnection--;
323 }
324 else if (_queue.size() == 0)
325 {
326 connection.setIdleTimeout();
327 _idle.add(connection);
328 }
329 else
330 {
331 HttpExchange ex = _queue.removeFirst();
332 send(connection, ex);
333 }
334 }
335
336 if (q_connection != null)
337 {
338 try
339 {
340 _newQueue.put(q_connection);
341 }
342 catch (InterruptedException e)
343 {
344 Log.ignore(e);
345 }
346 }
347 }
348
349 public void returnConnection(HttpConnection connection, boolean close) throws IOException
350 {
351 if (connection.isReserved())
352 connection.setReserved(false);
353
354 if (close)
355 {
356 try
357 {
358 connection.close();
359 }
360 catch (IOException e)
361 {
362 Log.ignore(e);
363 }
364 }
365
366 if (!_client.isStarted())
367 return;
368
369 if (!close && connection.getEndPoint().isOpen())
370 {
371 synchronized (this)
372 {
373 if (_queue.size() == 0)
374 {
375 connection.setIdleTimeout();
376 _idle.add(connection);
377 }
378 else
379 {
380 HttpExchange ex = _queue.removeFirst();
381 send(connection, ex);
382 }
383 this.notifyAll();
384 }
385 }
386 else
387 {
388 synchronized (this)
389 {
390 _connections.remove(connection);
391 if (!_queue.isEmpty())
392 startNewConnection();
393 }
394 }
395 }
396
397 public void returnIdleConnection(HttpConnection connection)
398 {
399 try
400 {
401 connection.close();
402 }
403 catch (IOException e)
404 {
405 Log.ignore(e);
406 }
407
408 synchronized (this)
409 {
410 _idle.remove(connection);
411 _connections.remove(connection);
412
413 if (!_queue.isEmpty() && _client.isStarted())
414 startNewConnection();
415 }
416 }
417
418 public void send(HttpExchange ex) throws IOException
419 {
420 LinkedList<String> listeners = _client.getRegisteredListeners();
421
422 if (listeners != null)
423 {
424
425 for (int i = listeners.size(); i > 0; --i)
426 {
427 String listenerClass = listeners.get(i - 1);
428
429 try
430 {
431 Class listener = Class.forName(listenerClass);
432 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
433 HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
434 ex.setEventListener(elistener);
435 }
436 catch (Exception e)
437 {
438 e.printStackTrace();
439 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
440 }
441 }
442 }
443
444
445 if (_client.hasRealms())
446 {
447 ex.setEventListener(new SecurityListener(this, ex));
448 }
449
450 doSend(ex);
451 }
452
453 public void resend(HttpExchange ex) throws IOException
454 {
455 ex.getEventListener().onRetry();
456 ex.reset();
457 doSend(ex);
458 }
459
460 protected void doSend(HttpExchange ex) throws IOException
461 {
462
463
464 if (_cookies != null)
465 {
466 StringBuilder buf = null;
467 for (HttpCookie cookie : _cookies)
468 {
469 if (buf == null)
470 buf = new StringBuilder();
471 else
472 buf.append("; ");
473 buf.append(cookie.getName());
474 buf.append("=");
475 buf.append(cookie.getValue());
476 }
477 if (buf != null)
478 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
479 }
480
481
482 if (_authorizations != null)
483 {
484 Authentication auth = (Authentication)_authorizations.match(ex.getURI());
485 if (auth != null)
486 (auth).setCredentials(ex);
487 }
488
489 HttpConnection connection = getIdleConnection();
490 if (connection != null)
491 {
492 send(connection, ex);
493 }
494 else
495 {
496 synchronized (this)
497 {
498 _queue.add(ex);
499 if (_connections.size() + _pendingConnections < _maxConnections)
500 {
501 startNewConnection();
502 }
503 }
504 }
505 }
506
507 protected void send(HttpConnection connection, HttpExchange exchange) throws IOException
508 {
509 synchronized (this)
510 {
511
512
513 if(!connection.send(exchange))
514 {
515 _queue.addFirst(exchange);
516 returnIdleConnection(connection);
517 }
518 }
519 }
520
521 @Override
522 public synchronized String toString()
523 {
524 return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
525 }
526
527 public synchronized String toDetailString()
528 {
529 StringBuilder b = new StringBuilder();
530 b.append(toString());
531 b.append('\n');
532 synchronized (this)
533 {
534 for (HttpConnection connection : _connections)
535 {
536 b.append(connection.toDetailString());
537 if (_idle.contains(connection))
538 b.append(" IDLE");
539 b.append('\n');
540 }
541 }
542 b.append("--");
543 b.append('\n');
544
545 return b.toString();
546 }
547
548 public void setProxy(Address proxy)
549 {
550 _proxy = proxy;
551 }
552
553 public Address getProxy()
554 {
555 return _proxy;
556 }
557
558 public Authentication getProxyAuthentication()
559 {
560 return _proxyAuthentication;
561 }
562
563 public void setProxyAuthentication(Authentication authentication)
564 {
565 _proxyAuthentication = authentication;
566 }
567
568 public boolean isProxied()
569 {
570 return _proxy != null;
571 }
572
573 public void close() throws IOException
574 {
575 synchronized (this)
576 {
577 for (HttpConnection connection : _connections)
578 {
579 connection.close();
580 }
581 }
582 }
583 }