1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22
23 import org.eclipse.jetty.http.AbstractGenerator;
24 import org.eclipse.jetty.http.HttpStatus;
25 import org.eclipse.jetty.io.AsyncEndPoint;
26 import org.eclipse.jetty.io.Buffer;
27 import org.eclipse.jetty.io.Buffers;
28 import org.eclipse.jetty.io.Connection;
29 import org.eclipse.jetty.io.EndPoint;
30 import org.eclipse.jetty.io.nio.AsyncConnection;
31 import org.eclipse.jetty.util.log.Log;
32 import org.eclipse.jetty.util.log.Logger;
33
34
35
36
37
38
39 public class AsyncHttpConnection extends AbstractHttpConnection implements AsyncConnection
40 {
41 private static final Logger LOG = Log.getLogger(AsyncHttpConnection.class);
42
43 private boolean _requestComplete;
44 private Buffer _requestContentChunk;
45 private final AsyncEndPoint _asyncEndp;
46
47 AsyncHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
48 {
49 super(requestBuffers,responseBuffers,endp);
50 _asyncEndp=(AsyncEndPoint)endp;
51 }
52
53 protected void reset() throws IOException
54 {
55 _requestComplete = false;
56 super.reset();
57 }
58
59 public Connection handle() throws IOException
60 {
61 Connection connection = this;
62 boolean progress=true;
63
64 try
65 {
66 boolean failed = false;
67
68
69 while (progress && connection==this)
70 {
71 LOG.debug("while open={} more={} progress={}",_endp.isOpen(),_parser.isMoreInBuffer(),progress);
72
73 progress=false;
74 HttpExchange exchange=_exchange;
75
76 LOG.debug("exchange {} on {}",exchange,this);
77
78 try
79 {
80
81 if (!_generator.isCommitted() && exchange!=null && exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
82 {
83 LOG.debug("commit {}",exchange);
84 progress=true;
85 commitRequest();
86 }
87
88
89 if (_generator.isCommitted() && !_generator.isComplete())
90 {
91 if (_generator.flushBuffer()>0)
92 {
93 LOG.debug("flushed");
94 progress=true;
95 }
96
97
98 if (_generator.isState(AbstractGenerator.STATE_CONTENT))
99 {
100
101 if (_requestContentChunk==null)
102 _requestContentChunk = exchange.getRequestContentChunk(null);
103
104 if (_requestContentChunk==null)
105 {
106 LOG.debug("complete {}",exchange);
107 progress=true;
108 _generator.complete();
109 }
110 else if (_generator.isEmpty())
111 {
112 LOG.debug("addChunk");
113 progress=true;
114 Buffer chunk=_requestContentChunk;
115 _requestContentChunk=exchange.getRequestContentChunk(null);
116 _generator.addContent(chunk,_requestContentChunk==null);
117 if (_requestContentChunk==null)
118 exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
119 }
120 }
121 }
122
123
124 if (_generator.isComplete() && !_requestComplete)
125 {
126 LOG.debug("requestComplete {}",exchange);
127 progress=true;
128 _requestComplete = true;
129 exchange.getEventListener().onRequestComplete();
130 }
131
132
133 if (!_parser.isComplete() && _parser.parseAvailable())
134 {
135 LOG.debug("parsed {}",exchange);
136 progress=true;
137 }
138
139
140 _endp.flush();
141
142
143 if (_asyncEndp.hasProgressed())
144 {
145 LOG.debug("hasProgressed {}",exchange);
146 progress=true;
147 }
148 }
149 catch (Throwable e)
150 {
151 LOG.debug("Failure on " + _exchange, e);
152
153 failed = true;
154
155 synchronized (this)
156 {
157 if (exchange != null)
158 {
159
160
161 if (exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
162 exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
163 !exchange.isDone())
164 {
165 if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
166 exchange.getEventListener().onException(e);
167 }
168 }
169 else
170 {
171 if (e instanceof IOException)
172 throw (IOException)e;
173 if (e instanceof Error)
174 throw (Error)e;
175 if (e instanceof RuntimeException)
176 throw (RuntimeException)e;
177 throw new RuntimeException(e);
178 }
179 }
180 }
181 finally
182 {
183 LOG.debug("finally {} on {} progress={} {}",exchange,this,progress,_endp);
184
185 boolean complete = failed || _generator.isComplete() && _parser.isComplete();
186
187 if (complete)
188 {
189 boolean persistent = !failed && _parser.isPersistent() && _generator.isPersistent();
190 _generator.setPersistent(persistent);
191 reset();
192 if (persistent)
193 _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
194
195 synchronized (this)
196 {
197 exchange=_exchange;
198 _exchange = null;
199
200
201 if (exchange!=null)
202 {
203 exchange.cancelTimeout(_destination.getHttpClient());
204
205
206 }
207
208
209 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
210 {
211 Connection switched=exchange.onSwitchProtocol(_endp);
212 if (switched!=null)
213 {
214
215 if (_pipeline!=null)
216 {
217 _destination.send(_pipeline);
218 }
219 _pipeline = null;
220
221 connection=switched;
222 }
223 }
224
225
226 if (_pipeline!=null)
227 {
228 if (!persistent || connection!=this)
229 _destination.send(_pipeline);
230 else
231 _exchange=_pipeline;
232 _pipeline=null;
233 }
234
235 if (_exchange==null && !isReserved())
236 _destination.returnConnection(this, !persistent);
237 }
238
239 }
240 }
241 }
242 }
243 finally
244 {
245 _parser.returnBuffers();
246 _generator.returnBuffers();
247 LOG.debug("unhandle {} on {}",_exchange,_endp);
248 }
249
250 return connection;
251 }
252
253 public void onInputShutdown() throws IOException
254 {
255 if (_generator.isIdle())
256 _endp.shutdownOutput();
257 }
258
259 @Override
260 public boolean send(HttpExchange ex) throws IOException
261 {
262 boolean sent=super.send(ex);
263 if (sent)
264 _asyncEndp.asyncDispatch();
265 return sent;
266 }
267 }