1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client.http;
20
21 import java.nio.channels.AsynchronousCloseException;
22 import java.util.concurrent.TimeoutException;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicInteger;
25
26 import org.eclipse.jetty.client.HttpConnection;
27 import org.eclipse.jetty.client.HttpDestination;
28 import org.eclipse.jetty.client.HttpExchange;
29 import org.eclipse.jetty.client.SendFailure;
30 import org.eclipse.jetty.client.api.Connection;
31 import org.eclipse.jetty.client.api.Request;
32 import org.eclipse.jetty.client.api.Response;
33 import org.eclipse.jetty.io.AbstractConnection;
34 import org.eclipse.jetty.io.EndPoint;
35 import org.eclipse.jetty.util.Promise;
36 import org.eclipse.jetty.util.log.Log;
37 import org.eclipse.jetty.util.log.Logger;
38 import org.eclipse.jetty.util.thread.Sweeper;
39
40 public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, Sweeper.Sweepable
41 {
42 private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class);
43
44 private final AtomicBoolean closed = new AtomicBoolean();
45 private final AtomicInteger sweeps = new AtomicInteger();
46 private final Promise<Connection> promise;
47 private final Delegate delegate;
48 private final HttpChannelOverHTTP channel;
49 private long idleTimeout;
50
51 public HttpConnectionOverHTTP(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
52 {
53 super(endPoint, destination.getHttpClient().getExecutor());
54 this.promise = promise;
55 this.delegate = new Delegate(destination);
56 this.channel = newHttpChannel();
57 }
58
59 protected HttpChannelOverHTTP newHttpChannel()
60 {
61 return new HttpChannelOverHTTP(this);
62 }
63
64 public HttpChannelOverHTTP getHttpChannel()
65 {
66 return channel;
67 }
68
69 public HttpDestinationOverHTTP getHttpDestination()
70 {
71 return (HttpDestinationOverHTTP)delegate.getHttpDestination();
72 }
73
74 @Override
75 public void send(Request request, Response.CompleteListener listener)
76 {
77 delegate.send(request, listener);
78 }
79
80 protected SendFailure send(HttpExchange exchange)
81 {
82 return delegate.send(exchange);
83 }
84
85 @Override
86 public void onOpen()
87 {
88 super.onOpen();
89 fillInterested();
90 promise.succeeded(this);
91 }
92
93 public boolean isClosed()
94 {
95 return closed.get();
96 }
97
98 @Override
99 public boolean onIdleExpired()
100 {
101 boolean close = delegate.onIdleTimeout();
102 if (close)
103 close(new TimeoutException("Idle timeout " + getEndPoint().getIdleTimeout() + "ms"));
104 return false;
105 }
106
107 @Override
108 public void onFillable()
109 {
110 HttpExchange exchange = channel.getHttpExchange();
111 if (exchange != null)
112 {
113 channel.receive();
114 }
115 else
116 {
117
118
119 close();
120 }
121 }
122
123 public void release()
124 {
125
126 getEndPoint().setIdleTimeout(idleTimeout);
127 getHttpDestination().release(this);
128 }
129
130 @Override
131 public void close()
132 {
133 close(new AsynchronousCloseException());
134 }
135
136 protected void close(Throwable failure)
137 {
138 if (closed.compareAndSet(false, true))
139 {
140
141
142 getHttpDestination().close(this);
143 getEndPoint().shutdownOutput();
144 if (LOG.isDebugEnabled())
145 LOG.debug("Shutdown {}", this);
146 getEndPoint().close();
147 if (LOG.isDebugEnabled())
148 LOG.debug("Closed {}", this);
149
150 abort(failure);
151 }
152 }
153
154 protected boolean abort(Throwable failure)
155 {
156 HttpExchange exchange = channel.getHttpExchange();
157 return exchange != null && exchange.getRequest().abort(failure);
158 }
159
160 @Override
161 public boolean sweep()
162 {
163 if (!closed.get())
164 return false;
165 if (sweeps.incrementAndGet() < 4)
166 return false;
167 return true;
168 }
169
170 @Override
171 public String toString()
172 {
173 return String.format("%s@%h(l:%s <-> r:%s,closed=%b)[%s]",
174 getClass().getSimpleName(),
175 this,
176 getEndPoint().getLocalAddress(),
177 getEndPoint().getRemoteAddress(),
178 closed.get(),
179 channel);
180 }
181
182 private class Delegate extends HttpConnection
183 {
184 private Delegate(HttpDestination destination)
185 {
186 super(destination);
187 }
188
189 @Override
190 protected SendFailure send(HttpExchange exchange)
191 {
192 Request request = exchange.getRequest();
193 normalizeRequest(request);
194
195
196 EndPoint endPoint = getEndPoint();
197 idleTimeout = endPoint.getIdleTimeout();
198 endPoint.setIdleTimeout(request.getIdleTimeout());
199
200
201 return send(channel, exchange);
202 }
203
204 @Override
205 public void close()
206 {
207 HttpConnectionOverHTTP.this.close();
208 }
209
210 @Override
211 public String toString()
212 {
213 return HttpConnectionOverHTTP.this.toString();
214 }
215 }
216 }