1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client.http;
20
21 import java.nio.channels.AsynchronousCloseException;
22 import java.util.concurrent.TimeoutException;
23 import java.util.concurrent.atomic.AtomicBoolean;
24
25 import org.eclipse.jetty.client.HttpConnection;
26 import org.eclipse.jetty.client.HttpDestination;
27 import org.eclipse.jetty.client.HttpExchange;
28 import org.eclipse.jetty.client.api.Connection;
29 import org.eclipse.jetty.client.api.Request;
30 import org.eclipse.jetty.client.api.Response;
31 import org.eclipse.jetty.io.AbstractConnection;
32 import org.eclipse.jetty.io.EndPoint;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35
36 public class HttpConnectionOverHTTP extends AbstractConnection implements Connection
37 {
38 private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class);
39
40 private final AtomicBoolean closed = new AtomicBoolean();
41 private final Delegate delegate;
42 private final HttpChannelOverHTTP channel;
43 private long idleTimeout;
44
45 public HttpConnectionOverHTTP(EndPoint endPoint, HttpDestination destination)
46 {
47 super(endPoint, destination.getHttpClient().getExecutor(), destination.getHttpClient().isDispatchIO());
48 this.delegate = new Delegate(destination);
49 this.channel = new HttpChannelOverHTTP(this);
50 }
51
52 public HttpChannelOverHTTP getHttpChannel()
53 {
54 return channel;
55 }
56
57 public HttpDestinationOverHTTP getHttpDestination()
58 {
59 return (HttpDestinationOverHTTP)delegate.getHttpDestination();
60 }
61
62 @Override
63 public void send(Request request, Response.CompleteListener listener)
64 {
65 delegate.send(request, listener);
66 }
67
68 protected void send(HttpExchange exchange)
69 {
70 delegate.send(exchange);
71 }
72
73 @Override
74 public void onOpen()
75 {
76 super.onOpen();
77 fillInterested();
78 }
79
80 public boolean isClosed()
81 {
82 return closed.get();
83 }
84
85 @Override
86 protected boolean onReadTimeout()
87 {
88 LOG.debug("{} idle timeout", this);
89 close(new TimeoutException());
90 return false;
91 }
92
93 @Override
94 public void onFillable()
95 {
96 HttpExchange exchange = channel.getHttpExchange();
97 if (exchange != null)
98 {
99 channel.receive();
100 }
101 else
102 {
103
104
105 close();
106 }
107 }
108
109 public void release()
110 {
111
112 getEndPoint().setIdleTimeout(idleTimeout);
113 getHttpDestination().release(this);
114 }
115
116 @Override
117 public void close()
118 {
119 close(new AsynchronousCloseException());
120 }
121
122 protected void close(Throwable failure)
123 {
124 if (softClose())
125 {
126
127
128 getHttpDestination().close(this);
129 getEndPoint().shutdownOutput();
130 LOG.debug("{} oshut", this);
131 getEndPoint().close();
132 LOG.debug("{} closed", this);
133
134 abort(failure);
135 }
136 }
137
138 public boolean softClose()
139 {
140 return closed.compareAndSet(false, true);
141 }
142
143 private boolean abort(Throwable failure)
144 {
145 HttpExchange exchange = channel.getHttpExchange();
146 return exchange != null && exchange.getRequest().abort(failure);
147 }
148
149 @Override
150 public String toString()
151 {
152 return String.format("%s@%h(l:%s <-> r:%s)",
153 getClass().getSimpleName(),
154 this,
155 getEndPoint().getLocalAddress(),
156 getEndPoint().getRemoteAddress());
157 }
158
159 private class Delegate extends HttpConnection
160 {
161 private Delegate(HttpDestination destination)
162 {
163 super(destination);
164 }
165
166 @Override
167 protected void send(HttpExchange exchange)
168 {
169 Request request = exchange.getRequest();
170 normalizeRequest(request);
171
172
173 EndPoint endPoint = getEndPoint();
174 idleTimeout = endPoint.getIdleTimeout();
175 endPoint.setIdleTimeout(request.getIdleTimeout());
176
177
178 channel.associate(exchange);
179 channel.send();
180 }
181
182 @Override
183 public void close()
184 {
185 HttpConnectionOverHTTP.this.close();
186 }
187
188 @Override
189 public String toString()
190 {
191 return HttpConnectionOverHTTP.this.toString();
192 }
193 }
194 }