1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.fcgi.client.http;
20
21 import java.io.EOFException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.AsynchronousCloseException;
24 import java.util.LinkedList;
25 import java.util.Map;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.atomic.AtomicBoolean;
29
30 import org.eclipse.jetty.client.HttpClient;
31 import org.eclipse.jetty.client.HttpConnection;
32 import org.eclipse.jetty.client.HttpDestination;
33 import org.eclipse.jetty.client.HttpExchange;
34 import org.eclipse.jetty.client.api.Connection;
35 import org.eclipse.jetty.client.api.Request;
36 import org.eclipse.jetty.client.api.Response;
37 import org.eclipse.jetty.fcgi.FCGI;
38 import org.eclipse.jetty.fcgi.generator.Flusher;
39 import org.eclipse.jetty.fcgi.parser.ClientParser;
40 import org.eclipse.jetty.http.HttpField;
41 import org.eclipse.jetty.http.HttpFields;
42 import org.eclipse.jetty.http.HttpHeader;
43 import org.eclipse.jetty.http.HttpHeaderValue;
44 import org.eclipse.jetty.io.AbstractConnection;
45 import org.eclipse.jetty.io.ByteBufferPool;
46 import org.eclipse.jetty.io.EndPoint;
47 import org.eclipse.jetty.util.BufferUtil;
48 import org.eclipse.jetty.util.log.Log;
49 import org.eclipse.jetty.util.log.Logger;
50
51 public class HttpConnectionOverFCGI extends AbstractConnection implements Connection
52 {
53 private static final Logger LOG = Log.getLogger(HttpConnectionOverFCGI.class);
54
55 private final LinkedList<Integer> requests = new LinkedList<>();
56 private final Map<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>();
57 private final AtomicBoolean closed = new AtomicBoolean();
58 private final Flusher flusher;
59 private final HttpDestination destination;
60 private final boolean multiplexed;
61 private final Delegate delegate;
62 private final ClientParser parser;
63
64 public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, boolean multiplexed)
65 {
66 super(endPoint, destination.getHttpClient().getExecutor(), destination.getHttpClient().isDispatchIO());
67 this.destination = destination;
68 this.multiplexed = multiplexed;
69 this.flusher = new Flusher(endPoint);
70 this.delegate = new Delegate(destination);
71 this.parser = new ClientParser(new ResponseListener());
72 requests.addLast(0);
73 }
74
75 public HttpDestination getHttpDestination()
76 {
77 return destination;
78 }
79
80 @Override
81 public void send(Request request, Response.CompleteListener listener)
82 {
83 delegate.send(request, listener);
84 }
85
86 protected void send(HttpExchange exchange)
87 {
88 delegate.send(exchange);
89 }
90
91 @Override
92 public void onOpen()
93 {
94 super.onOpen();
95 fillInterested();
96 }
97
98 @Override
99 public void onFillable()
100 {
101 EndPoint endPoint = getEndPoint();
102 HttpClient client = destination.getHttpClient();
103 ByteBufferPool bufferPool = client.getByteBufferPool();
104 ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
105 try
106 {
107 while (true)
108 {
109 int read = endPoint.fill(buffer);
110 if (LOG.isDebugEnabled())
111 LOG.debug("Read {} bytes from {}", read, endPoint);
112 if (read > 0)
113 {
114 parse(buffer);
115 }
116 else if (read == 0)
117 {
118 fillInterested();
119 break;
120 }
121 else
122 {
123 shutdown();
124 break;
125 }
126 }
127 }
128 catch (Exception x)
129 {
130 LOG.debug(x);
131 close(x);
132 }
133 finally
134 {
135 bufferPool.release(buffer);
136 }
137 }
138
139 private void parse(ByteBuffer buffer)
140 {
141 while (buffer.hasRemaining())
142 parser.parse(buffer);
143 }
144
145 private void shutdown()
146 {
147
148
149 if (channels.isEmpty())
150 close();
151 else
152 failAndClose(new EOFException());
153 }
154
155 @Override
156 protected boolean onReadTimeout()
157 {
158 close(new TimeoutException());
159 return false;
160 }
161
162 protected void release(HttpChannelOverFCGI channel)
163 {
164 channels.remove(channel.getRequest());
165 destination.release(this);
166 }
167
168 @Override
169 public void close()
170 {
171 close(new AsynchronousCloseException());
172 }
173
174 protected void close(Throwable failure)
175 {
176 if (closed.compareAndSet(false, true))
177 {
178
179
180 getHttpDestination().close(this);
181 getEndPoint().shutdownOutput();
182 LOG.debug("{} oshut", this);
183 getEndPoint().close();
184 LOG.debug("{} closed", this);
185
186 abort(failure);
187 }
188 }
189
190 protected boolean closeByHTTP(HttpFields fields)
191 {
192 if (multiplexed)
193 return false;
194 if (!fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()))
195 return false;
196 close();
197 return true;
198 }
199
200 protected void abort(Throwable failure)
201 {
202 for (HttpChannelOverFCGI channel : channels.values())
203 {
204 HttpExchange exchange = channel.getHttpExchange();
205 if (exchange != null)
206 exchange.getRequest().abort(failure);
207 }
208 channels.clear();
209 }
210
211 private void failAndClose(Throwable failure)
212 {
213 boolean result = false;
214 for (HttpChannelOverFCGI channel : channels.values())
215 result |= channel.responseFailure(failure);
216 if (result)
217 close(failure);
218 }
219
220 private int acquireRequest()
221 {
222 synchronized (requests)
223 {
224 int last = requests.getLast();
225 int request = last + 1;
226 requests.addLast(request);
227 return request;
228 }
229 }
230
231 private void releaseRequest(int request)
232 {
233 synchronized (requests)
234 {
235 requests.removeFirstOccurrence(request);
236 }
237 }
238
239 @Override
240 public String toString()
241 {
242 return String.format("%s@%h(l:%s <-> r:%s)",
243 getClass().getSimpleName(),
244 this,
245 getEndPoint().getLocalAddress(),
246 getEndPoint().getRemoteAddress());
247 }
248
249 private class Delegate extends HttpConnection
250 {
251 private Delegate(HttpDestination destination)
252 {
253 super(destination);
254 }
255
256 @Override
257 protected void send(HttpExchange exchange)
258 {
259 Request request = exchange.getRequest();
260 normalizeRequest(request);
261
262
263 int id = acquireRequest();
264 HttpChannelOverFCGI channel = new HttpChannelOverFCGI(HttpConnectionOverFCGI.this, flusher, id, request.getIdleTimeout());
265 channels.put(id, channel);
266 channel.associate(exchange);
267 channel.send();
268 }
269
270 @Override
271 public void close()
272 {
273 HttpConnectionOverFCGI.this.close();
274 }
275
276 @Override
277 public String toString()
278 {
279 return HttpConnectionOverFCGI.this.toString();
280 }
281 }
282
283 private class ResponseListener implements ClientParser.Listener
284 {
285 @Override
286 public void onBegin(int request, int code, String reason)
287 {
288 HttpChannelOverFCGI channel = channels.get(request);
289 if (channel != null)
290 channel.responseBegin(code, reason);
291 else
292 noChannel(request);
293 }
294
295 @Override
296 public void onHeader(int request, HttpField field)
297 {
298 HttpChannelOverFCGI channel = channels.get(request);
299 if (channel != null)
300 channel.responseHeader(field);
301 else
302 noChannel(request);
303 }
304
305 @Override
306 public void onHeaders(int request)
307 {
308 HttpChannelOverFCGI channel = channels.get(request);
309 if (channel != null)
310 channel.responseHeaders();
311 else
312 noChannel(request);
313 }
314
315 @Override
316 public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
317 {
318 switch (stream)
319 {
320 case STD_OUT:
321 {
322 HttpChannelOverFCGI channel = channels.get(request);
323 if (channel != null)
324 channel.content(buffer);
325 else
326 noChannel(request);
327 break;
328 }
329 case STD_ERR:
330 {
331 LOG.info(BufferUtil.toUTF8String(buffer));
332 break;
333 }
334 default:
335 {
336 throw new IllegalArgumentException();
337 }
338 }
339 }
340
341 @Override
342 public void onEnd(int request)
343 {
344 HttpChannelOverFCGI channel = channels.get(request);
345 if (channel != null)
346 {
347 if (channel.responseSuccess())
348 releaseRequest(request);
349 }
350 else
351 {
352 noChannel(request);
353 }
354 }
355
356 @Override
357 public void onFailure(int request, Throwable failure)
358 {
359 HttpChannelOverFCGI channel = channels.get(request);
360 if (channel != null)
361 {
362 if (channel.responseFailure(failure))
363 releaseRequest(request);
364 }
365 else
366 {
367 noChannel(request);
368 }
369 }
370
371 private void noChannel(int request)
372 {
373
374 }
375 }
376 }