1
2
3
4
5
6
7
8
9
10
11
12
13 package org.eclipse.jetty.client;
14
15
16 import java.io.IOException;
17 import java.lang.reflect.Constructor;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22
23 import org.eclipse.jetty.client.security.Authorization;
24 import org.eclipse.jetty.client.security.SecurityListener;
25 import org.eclipse.jetty.http.HttpCookie;
26 import org.eclipse.jetty.http.HttpHeaders;
27 import org.eclipse.jetty.http.PathMap;
28 import org.eclipse.jetty.io.Buffer;
29 import org.eclipse.jetty.io.ByteArrayBuffer;
30 import org.eclipse.jetty.util.log.Log;
31
32
33
34
35
36 public class HttpDestination
37 {
38 private final ByteArrayBuffer _hostHeader;
39 private final Address _address;
40 private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
41 private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
42 private final HttpClient _client;
43 private final boolean _ssl;
44 private final int _maxConnections;
45 private int _pendingConnections = 0;
46 private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
47 private int _newConnection = 0;
48 private Address _proxy;
49 private Authorization _proxyAuthentication;
50 private PathMap _authorizations;
51 private List<HttpCookie> _cookies;
52
53
54 public void dump() throws IOException
55 {
56 synchronized (this)
57 {
58 Log.info(this.toString());
59 Log.info("connections=" + _connections.size());
60 Log.info("idle=" + _idle.size());
61 Log.info("pending=" + _pendingConnections);
62 for (HttpConnection c : _connections)
63 {
64 if (!c.isIdle())
65 c.dump();
66 }
67 }
68 }
69
70
71 private LinkedList<HttpExchange> _queue = new LinkedList<HttpExchange>();
72
73
74 HttpDestination(HttpClient client, Address address, boolean ssl, int maxConnections)
75 {
76 _client = client;
77 _address = address;
78 _ssl = ssl;
79 _maxConnections = maxConnections;
80 String addressString = address.getHost();
81 if (address.getPort() != (_ssl ? 443 : 80)) addressString += ":" + address.getPort();
82 _hostHeader = new ByteArrayBuffer(addressString);
83 }
84
85
86 public Address getAddress()
87 {
88 return _address;
89 }
90
91
92 public Buffer getHostHeader()
93 {
94 return _hostHeader;
95 }
96
97
98 public HttpClient getHttpClient()
99 {
100 return _client;
101 }
102
103
104 public boolean isSecure()
105 {
106 return _ssl;
107 }
108
109
110 public void addAuthorization(String pathSpec, Authorization authorization)
111 {
112 synchronized (this)
113 {
114 if (_authorizations == null)
115 _authorizations = new PathMap();
116 _authorizations.put(pathSpec, authorization);
117 }
118
119
120 }
121
122
123 public void addCookie(HttpCookie cookie)
124 {
125 synchronized (this)
126 {
127 if (_cookies == null)
128 _cookies = new ArrayList<HttpCookie>();
129 _cookies.add(cookie);
130 }
131
132
133 }
134
135
136
137
138
139
140
141
142
143
144 private HttpConnection getConnection(long timeout) throws IOException
145 {
146 HttpConnection connection = null;
147
148 while ((connection == null) && (connection = getIdleConnection()) == null && timeout>0)
149 {
150 int totalConnections = 0;
151 boolean starting = false;
152 synchronized (this)
153 {
154 totalConnections = _connections.size() + _pendingConnections;
155 if (totalConnections < _maxConnections)
156 {
157 _newConnection++;
158 startNewConnection();
159 starting = true;
160 }
161 }
162
163 if (!starting)
164 {
165 try
166 {
167 Thread.currentThread().sleep(200);
168 timeout-=200;
169 }
170 catch (InterruptedException e)
171 {
172 Log.ignore(e);
173 }
174 }
175 else
176 {
177 try
178 {
179 Object o = _newQueue.take();
180 if (o instanceof HttpConnection)
181 {
182 connection = (HttpConnection)o;
183 }
184 else
185 throw (IOException)o;
186 }
187 catch (InterruptedException e)
188 {
189 Log.ignore(e);
190 }
191 }
192 }
193 return connection;
194 }
195
196
197 public HttpConnection reserveConnection(long timeout) throws IOException
198 {
199 HttpConnection connection = getConnection(timeout);
200 if (connection != null)
201 connection.setReserved(true);
202 return connection;
203 }
204
205
206 public HttpConnection getIdleConnection() throws IOException
207 {
208 long now = System.currentTimeMillis();
209 long idleTimeout=_client.getIdleTimeout();
210 HttpConnection connection = null;
211 while (true)
212 {
213 synchronized (this)
214 {
215 if (connection!=null)
216 {
217 _connections.remove(connection);
218 connection.getEndPoint().close();
219 connection=null;
220 }
221 if (_idle.size() > 0)
222 connection = _idle.remove(_idle.size()-1);
223 }
224
225 if (connection==null)
226 return null;
227
228 long last = connection.getLast();
229 if (connection.getEndPoint().isOpen() && (last==0 || ((now-last)<idleTimeout)) )
230 return connection;
231
232 }
233 }
234
235
236 protected void startNewConnection()
237 {
238 try
239 {
240 synchronized (this)
241 {
242 _pendingConnections++;
243 }
244 _client._connector.startConnection(this);
245 }
246 catch (Exception e)
247 {
248 e.printStackTrace();
249 onConnectionFailed(e);
250 }
251 }
252
253
254 public void onConnectionFailed(Throwable throwable)
255 {
256 Throwable connect_failure = null;
257
258 synchronized (this)
259 {
260 _pendingConnections--;
261 if (_newConnection > 0)
262 {
263 connect_failure = throwable;
264 _newConnection--;
265 }
266 else if (_queue.size() > 0)
267 {
268 HttpExchange ex = _queue.removeFirst();
269 ex.getEventListener().onConnectionFailed(throwable);
270 }
271 }
272
273 if (connect_failure != null)
274 {
275 try
276 {
277 _newQueue.put(connect_failure);
278 }
279 catch (InterruptedException e)
280 {
281 Log.ignore(e);
282 }
283 }
284 }
285
286
287 public void onException(Throwable throwable)
288 {
289 synchronized (this)
290 {
291 _pendingConnections--;
292 if (_queue.size() > 0)
293 {
294 HttpExchange ex = _queue.removeFirst();
295 ex.getEventListener().onException(throwable);
296 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
297 }
298 }
299 }
300
301
302 public void onNewConnection(HttpConnection connection) throws IOException
303 {
304 HttpConnection q_connection = null;
305
306 synchronized (this)
307 {
308 _pendingConnections--;
309 _connections.add(connection);
310
311 if (_newConnection > 0)
312 {
313 q_connection = connection;
314 _newConnection--;
315 }
316 else if (_queue.size() == 0)
317 {
318 _idle.add(connection);
319 }
320 else
321 {
322 HttpExchange ex = _queue.removeFirst();
323 connection.send(ex);
324 }
325 }
326
327 if (q_connection != null)
328 {
329 try
330 {
331 _newQueue.put(q_connection);
332 }
333 catch (InterruptedException e)
334 {
335 Log.ignore(e);
336 }
337 }
338 }
339
340
341 public void returnConnection(HttpConnection connection, boolean close) throws IOException
342 {
343 if (connection.isReserved())
344 connection.setReserved(false);
345
346 if (close)
347 {
348 try
349 {
350 connection.close();
351 }
352 catch (IOException e)
353 {
354 Log.ignore(e);
355 }
356 }
357
358 if (!_client.isStarted())
359 return;
360
361 if (!close && connection.getEndPoint().isOpen())
362 {
363 synchronized (this)
364 {
365 if (_queue.size() == 0)
366 {
367 connection.setLast(System.currentTimeMillis());
368 _idle.add(connection);
369 }
370 else
371 {
372 HttpExchange ex = _queue.removeFirst();
373 connection.send(ex);
374 }
375 this.notifyAll();
376 }
377 }
378 else
379 {
380 synchronized (this)
381 {
382 _connections.remove(connection);
383 if (!_queue.isEmpty())
384 startNewConnection();
385 }
386 }
387 }
388
389
390 public void send(HttpExchange ex) throws IOException
391 {
392 LinkedList<String> listeners = _client.getRegisteredListeners();
393
394 if (listeners != null)
395 {
396
397 for (int i = listeners.size(); i > 0; --i)
398 {
399 String listenerClass = listeners.get(i - 1);
400
401 try
402 {
403 Class listener = Class.forName(listenerClass);
404 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
405 HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
406 ex.setEventListener(elistener);
407 }
408 catch (Exception e)
409 {
410 e.printStackTrace();
411 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
412 }
413 }
414 }
415
416
417 if (_client.hasRealms())
418 {
419 ex.setEventListener(new SecurityListener(this, ex));
420 }
421
422 doSend(ex);
423 }
424
425
426 public void resend(HttpExchange ex) throws IOException
427 {
428 ex.getEventListener().onRetry();
429 doSend(ex);
430 }
431
432
433 protected void doSend(HttpExchange ex) throws IOException
434 {
435
436
437 if (_cookies != null)
438 {
439 StringBuilder buf = null;
440 for (HttpCookie cookie : _cookies)
441 {
442 if (buf == null)
443 buf = new StringBuilder();
444 else
445 buf.append("; ");
446 buf.append(cookie.getName());
447 buf.append("=");
448 buf.append(cookie.getValue());
449 }
450 if (buf != null)
451 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
452 }
453
454
455 if (_authorizations != null)
456 {
457 Authorization auth = (Authorization)_authorizations.match(ex.getURI());
458 if (auth != null)
459 ((Authorization)auth).setCredentials(ex);
460 }
461
462 HttpConnection connection = getIdleConnection();
463 if (connection != null)
464 {
465 boolean sent = connection.send(ex);
466 if (!sent) connection = null;
467 }
468
469 if (connection == null)
470 {
471 synchronized (this)
472 {
473 _queue.add(ex);
474 if (_connections.size() + _pendingConnections < _maxConnections)
475 {
476 startNewConnection();
477 }
478 }
479 }
480 }
481
482
483 public synchronized String toString()
484 {
485 return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
486 }
487
488
489 public synchronized String toDetailString()
490 {
491 StringBuilder b = new StringBuilder();
492 b.append(toString());
493 b.append('\n');
494 synchronized (this)
495 {
496 for (HttpConnection connection : _connections)
497 {
498 if (connection._exchange != null)
499 {
500 b.append(connection.toDetailString());
501 if (_idle.contains(connection))
502 b.append(" IDLE");
503 b.append('\n');
504 }
505 }
506 }
507 b.append("--");
508 b.append('\n');
509
510 return b.toString();
511 }
512
513
514 public void setProxy(Address proxy)
515 {
516 _proxy = proxy;
517 }
518
519
520 public Address getProxy()
521 {
522 return _proxy;
523 }
524
525
526 public Authorization getProxyAuthentication()
527 {
528 return _proxyAuthentication;
529 }
530
531
532 public void setProxyAuthentication(Authorization authentication)
533 {
534 _proxyAuthentication = authentication;
535 }
536
537
538 public boolean isProxied()
539 {
540 return _proxy != null;
541 }
542
543
544 public void close() throws IOException
545 {
546 synchronized (this)
547 {
548 for (HttpConnection connection : _connections)
549 {
550 connection.close();
551 }
552 }
553 }
554
555 }