1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client.http;
20
21 import java.nio.channels.AsynchronousCloseException;
22 import java.util.concurrent.TimeoutException;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicInteger;
25
26 import org.eclipse.jetty.client.HttpConnection;
27 import org.eclipse.jetty.client.HttpDestination;
28 import org.eclipse.jetty.client.HttpExchange;
29 import org.eclipse.jetty.client.api.Connection;
30 import org.eclipse.jetty.client.api.Request;
31 import org.eclipse.jetty.client.api.Response;
32 import org.eclipse.jetty.io.AbstractConnection;
33 import org.eclipse.jetty.io.EndPoint;
34 import org.eclipse.jetty.util.Promise;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37 import org.eclipse.jetty.util.thread.Sweeper;
38
39 public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, Sweeper.Sweepable
40 {
41 private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class);
42
43 private final AtomicBoolean closed = new AtomicBoolean();
44 private final Promise<Connection> promise;
45 private final AtomicInteger sweeps = new AtomicInteger();
46 private final Delegate delegate;
47 private final HttpChannelOverHTTP channel;
48 private long idleTimeout;
49
50 public HttpConnectionOverHTTP(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
51 {
52 super(endPoint, destination.getHttpClient().getExecutor());
53 this.promise = promise;
54 this.delegate = new Delegate(destination);
55 this.channel = newHttpChannel();
56 }
57
58 protected HttpChannelOverHTTP newHttpChannel()
59 {
60 return new HttpChannelOverHTTP(this);
61 }
62
63 public HttpChannelOverHTTP getHttpChannel()
64 {
65 return channel;
66 }
67
68 public HttpDestinationOverHTTP getHttpDestination()
69 {
70 return (HttpDestinationOverHTTP)delegate.getHttpDestination();
71 }
72
73 @Override
74 public void send(Request request, Response.CompleteListener listener)
75 {
76 delegate.send(request, listener);
77 }
78
79 protected void send(HttpExchange exchange)
80 {
81 delegate.send(exchange);
82 }
83
84 @Override
85 public void onOpen()
86 {
87 super.onOpen();
88 fillInterested();
89 promise.succeeded(this);
90 }
91
92 public boolean isClosed()
93 {
94 return closed.get();
95 }
96
97 @Override
98 protected boolean onReadTimeout()
99 {
100 if (LOG.isDebugEnabled())
101 LOG.debug("Idle timeout {}", this);
102 close(new TimeoutException());
103 return false;
104 }
105
106 @Override
107 public void onFillable()
108 {
109 HttpExchange exchange = channel.getHttpExchange();
110 if (exchange != null)
111 {
112 channel.receive();
113 }
114 else
115 {
116
117
118 close();
119 }
120 }
121
122 public void release()
123 {
124
125 getEndPoint().setIdleTimeout(idleTimeout);
126 getHttpDestination().release(this);
127 }
128
129 @Override
130 public void close()
131 {
132 close(new AsynchronousCloseException());
133 }
134
135 protected void close(Throwable failure)
136 {
137 if (softClose())
138 {
139
140
141 getHttpDestination().close(this);
142 getEndPoint().shutdownOutput();
143 if (LOG.isDebugEnabled())
144 LOG.debug("Shutdown {}", this);
145 getEndPoint().close();
146 if (LOG.isDebugEnabled())
147 LOG.debug("Closed {}", this);
148
149 abort(failure);
150 }
151 }
152
153 public boolean softClose()
154 {
155 return closed.compareAndSet(false, true);
156 }
157
158 protected boolean abort(Throwable failure)
159 {
160 HttpExchange exchange = channel.getHttpExchange();
161 return exchange != null && exchange.getRequest().abort(failure);
162 }
163
164 @Override
165 public boolean sweep()
166 {
167 if (!closed.get())
168 return false;
169 if (sweeps.incrementAndGet() < 4)
170 return false;
171 return true;
172 }
173
174 @Override
175 public String toString()
176 {
177 return String.format("%s@%h(l:%s <-> r:%s,closed=%b)[%s]",
178 getClass().getSimpleName(),
179 this,
180 getEndPoint().getLocalAddress(),
181 getEndPoint().getRemoteAddress(),
182 closed.get(),
183 channel);
184 }
185
186 private class Delegate extends HttpConnection
187 {
188 private Delegate(HttpDestination destination)
189 {
190 super(destination);
191 }
192
193 @Override
194 protected void send(HttpExchange exchange)
195 {
196 Request request = exchange.getRequest();
197 normalizeRequest(request);
198
199
200 EndPoint endPoint = getEndPoint();
201 idleTimeout = endPoint.getIdleTimeout();
202 endPoint.setIdleTimeout(request.getIdleTimeout());
203
204
205 if (channel.associate(exchange))
206 channel.send();
207 else
208 channel.release();
209 }
210
211 @Override
212 public void close()
213 {
214 HttpConnectionOverHTTP.this.close();
215 }
216
217 @Override
218 public String toString()
219 {
220 return HttpConnectionOverHTTP.this.toString();
221 }
222 }
223 }