1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.io.InterruptedIOException;
18
19 import org.eclipse.jetty.http.AbstractGenerator;
20 import org.eclipse.jetty.http.HttpStatus;
21 import org.eclipse.jetty.io.Buffer;
22 import org.eclipse.jetty.io.Buffers;
23 import org.eclipse.jetty.io.Connection;
24 import org.eclipse.jetty.io.EndPoint;
25 import org.eclipse.jetty.util.log.Log;
26 import org.eclipse.jetty.util.log.Logger;
27
28
29
30
31
32 public class BlockingHttpConnection extends AbstractHttpConnection
33 {
34 private static final Logger LOG = Log.getLogger(BlockingHttpConnection.class);
35
36 private boolean _requestComplete;
37 private Buffer _requestContentChunk;
38
39 BlockingHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endPoint)
40 {
41 super(requestBuffers, responseBuffers, endPoint);
42 }
43
44 protected void reset() throws IOException
45 {
46 _requestComplete = false;
47 super.reset();
48 }
49
50 @Override
51 public Connection handle() throws IOException
52 {
53 Connection connection = this;
54
55 try
56 {
57 boolean failed = false;
58
59
60
61 while (_endp.isOpen() && connection==this)
62 {
63 LOG.debug("open={} more={}",_endp.isOpen(),_parser.isMoreInBuffer());
64
65 HttpExchange exchange;
66 synchronized (this)
67 {
68 exchange=_exchange;
69
70 while (exchange == null)
71 {
72 try
73 {
74 this.wait();
75 exchange=_exchange;
76 }
77 catch (InterruptedException e)
78 {
79 throw new InterruptedIOException();
80 }
81 }
82 }
83 LOG.debug("exchange {}",exchange);
84
85 try
86 {
87
88 if (!_generator.isCommitted() && exchange!=null && exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
89 {
90 LOG.debug("commit");
91 commitRequest();
92 }
93
94
95 while (_generator.isCommitted() && !_generator.isComplete())
96 {
97 if (_generator.flushBuffer()>0)
98 {
99 LOG.debug("flushed");
100 }
101
102
103 if (_generator.isState(AbstractGenerator.STATE_CONTENT))
104 {
105
106 if (_requestContentChunk==null)
107 _requestContentChunk = exchange.getRequestContentChunk(null);
108
109 if (_requestContentChunk==null)
110 {
111 LOG.debug("complete");
112 _generator.complete();
113 }
114 else if (_generator.isEmpty())
115 {
116 LOG.debug("addChunk");
117 Buffer chunk=_requestContentChunk;
118 _requestContentChunk=exchange.getRequestContentChunk(null);
119 _generator.addContent(chunk,_requestContentChunk==null);
120 }
121 }
122 }
123
124
125 if (_generator.isComplete() && !_requestComplete)
126 {
127 LOG.debug("requestComplete");
128 _requestComplete = true;
129 exchange.getEventListener().onRequestComplete();
130 }
131
132
133 if (!_parser.isComplete() && _parser.parseAvailable())
134 {
135 LOG.debug("parsed");
136 }
137
138
139 _endp.flush();
140 }
141 catch (Throwable e)
142 {
143 LOG.debug("Failure on " + _exchange, e);
144
145 failed = true;
146
147 synchronized (this)
148 {
149 if (exchange != null)
150 {
151
152
153 if (exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
154 exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
155 !exchange.isDone())
156 {
157 if(exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
158 exchange.getEventListener().onException(e);
159 }
160 }
161 else
162 {
163 if (e instanceof IOException)
164 throw (IOException)e;
165 if (e instanceof Error)
166 throw (Error)e;
167 if (e instanceof RuntimeException)
168 throw (RuntimeException)e;
169 throw new RuntimeException(e);
170 }
171 }
172 }
173 finally
174 {
175 LOG.debug("{} {}",_generator, _parser);
176 LOG.debug("{}",_endp);
177
178 boolean complete = failed || _generator.isComplete() && _parser.isComplete();
179
180 if (complete)
181 {
182 boolean persistent = !failed && _parser.isPersistent() && _generator.isPersistent();
183 _generator.setPersistent(persistent);
184 reset();
185 if (persistent)
186 _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
187
188 synchronized (this)
189 {
190 exchange=_exchange;
191 _exchange = null;
192
193
194 if (exchange!=null)
195 {
196 exchange.cancelTimeout(_destination.getHttpClient());
197
198
199 }
200
201
202 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
203 {
204 Connection switched=exchange.onSwitchProtocol(_endp);
205 if (switched!=null)
206 connection=switched;
207 {
208
209 _pipeline = null;
210 if (_pipeline!=null)
211 _destination.send(_pipeline);
212 _pipeline = null;
213
214 connection=switched;
215 }
216 }
217
218
219 if (_pipeline!=null)
220 {
221 if (!persistent || connection!=this)
222 _destination.send(_pipeline);
223 else
224 _exchange=_pipeline;
225 _pipeline=null;
226 }
227
228 if (_exchange==null && !isReserved())
229 _destination.returnConnection(this, !persistent);
230 }
231 }
232 }
233 }
234 }
235 finally
236 {
237 _parser.returnBuffers();
238 _generator.returnBuffers();
239 }
240
241 return connection;
242 }
243
244 @Override
245 public boolean send(HttpExchange ex) throws IOException
246 {
247 boolean sent=super.send(ex);
248 if (sent)
249 {
250 synchronized (this)
251 {
252 notifyAll();
253 }
254 }
255 return sent;
256 }
257 }