1
2
3
4
5
6
7
8
9
10
11
12
13 package org.eclipse.jetty.client;
14
15
16 import java.io.IOException;
17 import java.lang.reflect.Constructor;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22
23 import org.eclipse.jetty.client.security.Authorization;
24 import org.eclipse.jetty.client.security.SecurityListener;
25 import org.eclipse.jetty.http.HttpCookie;
26 import org.eclipse.jetty.http.HttpHeaders;
27 import org.eclipse.jetty.http.PathMap;
28 import org.eclipse.jetty.io.Buffer;
29 import org.eclipse.jetty.io.ByteArrayBuffer;
30 import org.eclipse.jetty.util.log.Log;
31
32
33
34
35
36 public class HttpDestination
37 {
38 private final ByteArrayBuffer _hostHeader;
39 private final Address _address;
40 private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
41 private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
42 private final HttpClient _client;
43 private final boolean _ssl;
44 private final int _maxConnections;
45 private int _pendingConnections = 0;
46 private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
47 private int _newConnection = 0;
48 private Address _proxy;
49 private Authorization _proxyAuthentication;
50 private PathMap _authorizations;
51 private List<HttpCookie> _cookies;
52
53 public void dump() throws IOException
54 {
55 synchronized (this)
56 {
57 Log.info(this.toString());
58 Log.info("connections=" + _connections.size());
59 Log.info("idle=" + _idle.size());
60 Log.info("pending=" + _pendingConnections);
61 for (HttpConnection c : _connections)
62 {
63 if (!c.isIdle())
64 c.dump();
65 }
66 }
67 }
68
69
70 private LinkedList<HttpExchange> _queue = new LinkedList<HttpExchange>();
71
72 HttpDestination(HttpClient client, Address address, boolean ssl, int maxConnections)
73 {
74 _client = client;
75 _address = address;
76 _ssl = ssl;
77 _maxConnections = maxConnections;
78 String addressString = address.getHost();
79 if (address.getPort() != (_ssl ? 443 : 80)) addressString += ":" + address.getPort();
80 _hostHeader = new ByteArrayBuffer(addressString);
81 }
82
83 public Address getAddress()
84 {
85 return _address;
86 }
87
88 public Buffer getHostHeader()
89 {
90 return _hostHeader;
91 }
92
93 public HttpClient getHttpClient()
94 {
95 return _client;
96 }
97
98 public boolean isSecure()
99 {
100 return _ssl;
101 }
102
103 public void addAuthorization(String pathSpec, Authorization authorization)
104 {
105 synchronized (this)
106 {
107 if (_authorizations == null)
108 _authorizations = new PathMap();
109 _authorizations.put(pathSpec, authorization);
110 }
111
112
113 }
114
115 public void addCookie(HttpCookie cookie)
116 {
117 synchronized (this)
118 {
119 if (_cookies == null)
120 _cookies = new ArrayList<HttpCookie>();
121 _cookies.add(cookie);
122 }
123
124
125 }
126
127
128
129
130
131
132
133
134
135 private HttpConnection getConnection(long timeout) throws IOException
136 {
137 HttpConnection connection = null;
138
139 while ((connection == null) && (connection = getIdleConnection()) == null && timeout>0)
140 {
141 int totalConnections = 0;
142 boolean starting = false;
143 synchronized (this)
144 {
145 totalConnections = _connections.size() + _pendingConnections;
146 if (totalConnections < _maxConnections)
147 {
148 _newConnection++;
149 startNewConnection();
150 starting = true;
151 }
152 }
153
154 if (!starting)
155 {
156 try
157 {
158 Thread.currentThread().sleep(200);
159 timeout-=200;
160 }
161 catch (InterruptedException e)
162 {
163 Log.ignore(e);
164 }
165 }
166 else
167 {
168 try
169 {
170 Object o = _newQueue.take();
171 if (o instanceof HttpConnection)
172 {
173 connection = (HttpConnection)o;
174 }
175 else
176 throw (IOException)o;
177 }
178 catch (InterruptedException e)
179 {
180 Log.ignore(e);
181 }
182 }
183 }
184 return connection;
185 }
186
187 public HttpConnection reserveConnection(long timeout) throws IOException
188 {
189 HttpConnection connection = getConnection(timeout);
190 if (connection != null)
191 connection.setReserved(true);
192 return connection;
193 }
194
195 public HttpConnection getIdleConnection() throws IOException
196 {
197 long now = System.currentTimeMillis();
198 long idleTimeout=_client.getIdleTimeout();
199 HttpConnection connection = null;
200 while (true)
201 {
202 synchronized (this)
203 {
204 if (connection!=null)
205 {
206 _connections.remove(connection);
207 connection.getEndPoint().close();
208 connection=null;
209 }
210 if (_idle.size() > 0)
211 connection = _idle.remove(_idle.size()-1);
212 }
213
214 if (connection==null)
215 return null;
216
217 long last = connection.getLast();
218 if (connection.getEndPoint().isOpen() && (last==0 || ((now-last)<idleTimeout)) )
219 return connection;
220
221 }
222 }
223
224 protected void startNewConnection()
225 {
226 try
227 {
228 synchronized (this)
229 {
230 _pendingConnections++;
231 }
232 _client._connector.startConnection(this);
233 }
234 catch (Exception e)
235 {
236 Log.debug(e);
237 onConnectionFailed(e);
238 }
239 }
240
241 public void onConnectionFailed(Throwable throwable)
242 {
243 Throwable connect_failure = null;
244
245 synchronized (this)
246 {
247 _pendingConnections--;
248 if (_newConnection > 0)
249 {
250 connect_failure = throwable;
251 _newConnection--;
252 }
253 else if (_queue.size() > 0)
254 {
255 HttpExchange ex = _queue.removeFirst();
256 ex.getEventListener().onConnectionFailed(throwable);
257 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
258 }
259 }
260
261 if (connect_failure != null)
262 {
263 try
264 {
265 _newQueue.put(connect_failure);
266 }
267 catch (InterruptedException e)
268 {
269 Log.ignore(e);
270 }
271 }
272 }
273
274 public void onException(Throwable throwable)
275 {
276 synchronized (this)
277 {
278 _pendingConnections--;
279 if (_queue.size() > 0)
280 {
281 HttpExchange ex = _queue.removeFirst();
282 ex.getEventListener().onException(throwable);
283 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
284 }
285 }
286 }
287
288 public void onNewConnection(HttpConnection connection) throws IOException
289 {
290 HttpConnection q_connection = null;
291
292 synchronized (this)
293 {
294 _pendingConnections--;
295 _connections.add(connection);
296
297 if (_newConnection > 0)
298 {
299 q_connection = connection;
300 _newConnection--;
301 }
302 else if (_queue.size() == 0)
303 {
304 _idle.add(connection);
305 }
306 else
307 {
308 HttpExchange ex = _queue.removeFirst();
309 connection.send(ex);
310 }
311 }
312
313 if (q_connection != null)
314 {
315 try
316 {
317 _newQueue.put(q_connection);
318 }
319 catch (InterruptedException e)
320 {
321 Log.ignore(e);
322 }
323 }
324 }
325
326 public void returnConnection(HttpConnection connection, boolean close) throws IOException
327 {
328 if (connection.isReserved())
329 connection.setReserved(false);
330
331 if (close)
332 {
333 try
334 {
335 connection.close();
336 }
337 catch (IOException e)
338 {
339 Log.ignore(e);
340 }
341 }
342
343 if (!_client.isStarted())
344 return;
345
346 if (!close && connection.getEndPoint().isOpen())
347 {
348 synchronized (this)
349 {
350 if (_queue.size() == 0)
351 {
352 connection.setLast(System.currentTimeMillis());
353 _idle.add(connection);
354 }
355 else
356 {
357 HttpExchange ex = _queue.removeFirst();
358 connection.send(ex);
359 }
360 this.notifyAll();
361 }
362 }
363 else
364 {
365 synchronized (this)
366 {
367 _connections.remove(connection);
368 if (!_queue.isEmpty())
369 startNewConnection();
370 }
371 }
372 }
373
374 public void send(HttpExchange ex) throws IOException
375 {
376 LinkedList<String> listeners = _client.getRegisteredListeners();
377
378 if (listeners != null)
379 {
380
381 for (int i = listeners.size(); i > 0; --i)
382 {
383 String listenerClass = listeners.get(i - 1);
384
385 try
386 {
387 Class listener = Class.forName(listenerClass);
388 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
389 HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
390 ex.setEventListener(elistener);
391 }
392 catch (Exception e)
393 {
394 e.printStackTrace();
395 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass);
396 }
397 }
398 }
399
400
401 if (_client.hasRealms())
402 {
403 ex.setEventListener(new SecurityListener(this, ex));
404 }
405
406 doSend(ex);
407 }
408
409 public void resend(HttpExchange ex) throws IOException
410 {
411 ex.getEventListener().onRetry();
412 ex.reset();
413 doSend(ex);
414 }
415
416 protected void doSend(HttpExchange ex) throws IOException
417 {
418
419
420 if (_cookies != null)
421 {
422 StringBuilder buf = null;
423 for (HttpCookie cookie : _cookies)
424 {
425 if (buf == null)
426 buf = new StringBuilder();
427 else
428 buf.append("; ");
429 buf.append(cookie.getName());
430 buf.append("=");
431 buf.append(cookie.getValue());
432 }
433 if (buf != null)
434 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
435 }
436
437
438 if (_authorizations != null)
439 {
440 Authorization auth = (Authorization)_authorizations.match(ex.getURI());
441 if (auth != null)
442 ((Authorization)auth).setCredentials(ex);
443 }
444
445 HttpConnection connection = getIdleConnection();
446 if (connection != null)
447 {
448 boolean sent = connection.send(ex);
449 if (!sent) connection = null;
450 }
451
452 if (connection == null)
453 {
454 synchronized (this)
455 {
456 _queue.add(ex);
457 if (_connections.size() + _pendingConnections < _maxConnections)
458 {
459 startNewConnection();
460 }
461 }
462 }
463 }
464
465 public synchronized String toString()
466 {
467 return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
468 }
469
470 public synchronized String toDetailString()
471 {
472 StringBuilder b = new StringBuilder();
473 b.append(toString());
474 b.append('\n');
475 synchronized (this)
476 {
477 for (HttpConnection connection : _connections)
478 {
479 b.append(connection.toDetailString());
480 if (_idle.contains(connection))
481 b.append(" IDLE");
482 b.append('\n');
483 }
484 }
485 b.append("--");
486 b.append('\n');
487
488 return b.toString();
489 }
490
491 public void setProxy(Address proxy)
492 {
493 _proxy = proxy;
494 }
495
496 public Address getProxy()
497 {
498 return _proxy;
499 }
500
501 public Authorization getProxyAuthentication()
502 {
503 return _proxyAuthentication;
504 }
505
506 public void setProxyAuthentication(Authorization authentication)
507 {
508 _proxyAuthentication = authentication;
509 }
510
511 public boolean isProxied()
512 {
513 return _proxy != null;
514 }
515
516 public void close() throws IOException
517 {
518 synchronized (this)
519 {
520 for (HttpConnection connection : _connections)
521 {
522 connection.close();
523 }
524 }
525 }
526 }