1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17
18 import org.eclipse.jetty.http.AbstractGenerator;
19 import org.eclipse.jetty.http.HttpStatus;
20 import org.eclipse.jetty.io.AsyncEndPoint;
21 import org.eclipse.jetty.io.Buffer;
22 import org.eclipse.jetty.io.Buffers;
23 import org.eclipse.jetty.io.Connection;
24 import org.eclipse.jetty.io.EndPoint;
25 import org.eclipse.jetty.io.nio.AsyncConnection;
26 import org.eclipse.jetty.util.log.Log;
27 import org.eclipse.jetty.util.log.Logger;
28
29
30
31
32
33
34 public class AsyncHttpConnection extends AbstractHttpConnection implements AsyncConnection
35 {
36 private static final Logger LOG = Log.getLogger(AsyncHttpConnection.class);
37
38 private boolean _requestComplete;
39 private Buffer _requestContentChunk;
40 private final AsyncEndPoint _asyncEndp;
41
42 AsyncHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
43 {
44 super(requestBuffers,responseBuffers,endp);
45 _asyncEndp=(AsyncEndPoint)endp;
46 }
47
48 protected void reset() throws IOException
49 {
50 _requestComplete = false;
51 super.reset();
52 }
53
54 public Connection handle() throws IOException
55 {
56 Connection connection = this;
57 boolean progress=true;
58
59 try
60 {
61 boolean failed = false;
62
63
64 while (progress && connection==this)
65 {
66 LOG.debug("while open={} more={} progress={}",_endp.isOpen(),_parser.isMoreInBuffer(),progress);
67
68 progress=false;
69 HttpExchange exchange=_exchange;
70
71 LOG.debug("exchange {} on {}",exchange,this);
72
73 try
74 {
75
76 if (!_generator.isCommitted() && exchange!=null && exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
77 {
78 LOG.debug("commit {}",exchange);
79 progress=true;
80 commitRequest();
81 }
82
83
84 if (_generator.isCommitted() && !_generator.isComplete())
85 {
86 if (_generator.flushBuffer()>0)
87 {
88 LOG.debug("flushed");
89 progress=true;
90 }
91
92
93 if (_generator.isState(AbstractGenerator.STATE_CONTENT))
94 {
95
96 if (_requestContentChunk==null)
97 _requestContentChunk = exchange.getRequestContentChunk(null);
98
99 if (_requestContentChunk==null)
100 {
101 LOG.debug("complete {}",exchange);
102 progress=true;
103 _generator.complete();
104 }
105 else if (_generator.isEmpty())
106 {
107 LOG.debug("addChunk");
108 progress=true;
109 Buffer chunk=_requestContentChunk;
110 _requestContentChunk=exchange.getRequestContentChunk(null);
111 _generator.addContent(chunk,_requestContentChunk==null);
112 }
113 }
114 }
115
116
117 if (_generator.isComplete() && !_requestComplete)
118 {
119 LOG.debug("requestComplete {}",exchange);
120 progress=true;
121 _requestComplete = true;
122 exchange.getEventListener().onRequestComplete();
123 }
124
125
126 if (!_parser.isComplete() && _parser.parseAvailable())
127 {
128 LOG.debug("parsed {}",exchange);
129 progress=true;
130 }
131
132
133 _endp.flush();
134
135
136 if (_asyncEndp.hasProgressed())
137 {
138 LOG.debug("hasProgressed {}",exchange);
139 progress=true;
140 }
141 }
142 catch (Throwable e)
143 {
144 LOG.debug("Failure on " + _exchange, e);
145
146 failed = true;
147
148 synchronized (this)
149 {
150 if (exchange != null)
151 {
152
153
154 if (exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
155 exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
156 !exchange.isDone())
157 {
158 if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
159 exchange.getEventListener().onException(e);
160 }
161 }
162 else
163 {
164 if (e instanceof IOException)
165 throw (IOException)e;
166 if (e instanceof Error)
167 throw (Error)e;
168 if (e instanceof RuntimeException)
169 throw (RuntimeException)e;
170 throw new RuntimeException(e);
171 }
172 }
173 }
174 finally
175 {
176 LOG.debug("finally {} on {} progress={} {}",exchange,this,progress,_endp);
177
178 boolean complete = failed || _generator.isComplete() && _parser.isComplete();
179
180 if (complete)
181 {
182 boolean persistent = !failed && _parser.isPersistent() && _generator.isPersistent();
183 _generator.setPersistent(persistent);
184 reset();
185 if (persistent)
186 _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
187
188 synchronized (this)
189 {
190 exchange=_exchange;
191 _exchange = null;
192
193
194 if (exchange!=null)
195 {
196 exchange.cancelTimeout(_destination.getHttpClient());
197
198
199 }
200
201
202 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
203 {
204 Connection switched=exchange.onSwitchProtocol(_endp);
205 if (switched!=null)
206 connection=switched;
207 {
208
209 _pipeline = null;
210 if (_pipeline!=null)
211 _destination.send(_pipeline);
212 _pipeline = null;
213
214 connection=switched;
215 }
216 }
217
218
219 if (_pipeline!=null)
220 {
221 if (!persistent || connection!=this)
222 _destination.send(_pipeline);
223 else
224 _exchange=_pipeline;
225 _pipeline=null;
226 }
227
228 if (_exchange==null && !isReserved())
229 _destination.returnConnection(this, !persistent);
230 }
231
232 }
233 }
234 }
235 }
236 finally
237 {
238 _parser.returnBuffers();
239 _generator.returnBuffers();
240 LOG.debug("unhandle {} on {}",_exchange,_endp);
241 }
242
243 return connection;
244 }
245
246 public void onInputShutdown() throws IOException
247 {
248 if (_generator.isIdle())
249 _endp.shutdownOutput();
250 }
251
252 @Override
253 public boolean send(HttpExchange ex) throws IOException
254 {
255 boolean sent=super.send(ex);
256 if (sent)
257 _asyncEndp.asyncDispatch();
258 return sent;
259 }
260 }