1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.fcgi.client.http;
20
21 import java.io.EOFException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.AsynchronousCloseException;
24 import java.util.LinkedList;
25 import java.util.Map;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.atomic.AtomicBoolean;
29
30 import org.eclipse.jetty.client.HttpClient;
31 import org.eclipse.jetty.client.HttpConnection;
32 import org.eclipse.jetty.client.HttpDestination;
33 import org.eclipse.jetty.client.HttpExchange;
34 import org.eclipse.jetty.client.SendFailure;
35 import org.eclipse.jetty.client.api.Connection;
36 import org.eclipse.jetty.client.api.Request;
37 import org.eclipse.jetty.client.api.Response;
38 import org.eclipse.jetty.fcgi.FCGI;
39 import org.eclipse.jetty.fcgi.generator.Flusher;
40 import org.eclipse.jetty.fcgi.parser.ClientParser;
41 import org.eclipse.jetty.http.HttpField;
42 import org.eclipse.jetty.http.HttpFields;
43 import org.eclipse.jetty.http.HttpHeader;
44 import org.eclipse.jetty.http.HttpHeaderValue;
45 import org.eclipse.jetty.io.AbstractConnection;
46 import org.eclipse.jetty.io.ByteBufferPool;
47 import org.eclipse.jetty.io.EndPoint;
48 import org.eclipse.jetty.util.BufferUtil;
49 import org.eclipse.jetty.util.CompletableCallback;
50 import org.eclipse.jetty.util.Promise;
51 import org.eclipse.jetty.util.log.Log;
52 import org.eclipse.jetty.util.log.Logger;
53
54 public class HttpConnectionOverFCGI extends AbstractConnection implements Connection
55 {
56 private static final Logger LOG = Log.getLogger(HttpConnectionOverFCGI.class);
57
58 private final LinkedList<Integer> requests = new LinkedList<>();
59 private final Map<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>();
60 private final AtomicBoolean closed = new AtomicBoolean();
61 private final HttpDestination destination;
62 private final Promise<Connection> promise;
63 private final boolean multiplexed;
64 private final Flusher flusher;
65 private final Delegate delegate;
66 private final ClientParser parser;
67 private ByteBuffer buffer;
68
69 public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise, boolean multiplexed)
70 {
71 super(endPoint, destination.getHttpClient().getExecutor());
72 this.destination = destination;
73 this.promise = promise;
74 this.multiplexed = multiplexed;
75 this.flusher = new Flusher(endPoint);
76 this.delegate = new Delegate(destination);
77 this.parser = new ClientParser(new ResponseListener());
78 requests.addLast(0);
79 }
80
81 public HttpDestination getHttpDestination()
82 {
83 return destination;
84 }
85
86 protected Flusher getFlusher()
87 {
88 return flusher;
89 }
90
91 @Override
92 public void send(Request request, Response.CompleteListener listener)
93 {
94 delegate.send(request, listener);
95 }
96
97 protected SendFailure send(HttpExchange exchange)
98 {
99 return delegate.send(exchange);
100 }
101
102 @Override
103 public void onOpen()
104 {
105 super.onOpen();
106 fillInterested();
107 promise.succeeded(this);
108 }
109
110 @Override
111 public void onFillable()
112 {
113 buffer = acquireBuffer();
114 process(buffer);
115 }
116
117 private ByteBuffer acquireBuffer()
118 {
119 HttpClient client = destination.getHttpClient();
120 ByteBufferPool bufferPool = client.getByteBufferPool();
121 return bufferPool.acquire(client.getResponseBufferSize(), true);
122 }
123
124 private void releaseBuffer(ByteBuffer buffer)
125 {
126 assert this.buffer == buffer;
127 HttpClient client = destination.getHttpClient();
128 ByteBufferPool bufferPool = client.getByteBufferPool();
129 bufferPool.release(buffer);
130 this.buffer = null;
131 }
132
133 private void process(ByteBuffer buffer)
134 {
135 try
136 {
137 EndPoint endPoint = getEndPoint();
138 boolean looping = false;
139 while (true)
140 {
141 if (!looping && parse(buffer))
142 return;
143
144 int read = endPoint.fill(buffer);
145 if (LOG.isDebugEnabled())
146 LOG.debug("Read {} bytes from {}", read, endPoint);
147
148 if (read > 0)
149 {
150 if (parse(buffer))
151 return;
152 }
153 else if (read == 0)
154 {
155 releaseBuffer(buffer);
156 fillInterested();
157 return;
158 }
159 else
160 {
161 releaseBuffer(buffer);
162 shutdown();
163 return;
164 }
165
166 looping = true;
167 }
168 }
169 catch (Exception x)
170 {
171 if (LOG.isDebugEnabled())
172 LOG.debug(x);
173 releaseBuffer(buffer);
174 close(x);
175 }
176 }
177
178 private boolean parse(ByteBuffer buffer)
179 {
180 return parser.parse(buffer);
181 }
182
183 private void shutdown()
184 {
185
186
187 if (channels.isEmpty())
188 close();
189 else
190 failAndClose(new EOFException(String.valueOf(getEndPoint())));
191 }
192
193 @Override
194 public boolean onIdleExpired()
195 {
196 long idleTimeout = getEndPoint().getIdleTimeout();
197 boolean close = delegate.onIdleTimeout(idleTimeout);
198 if (multiplexed)
199 close &= isFillInterested();
200 if (close)
201 close(new TimeoutException("Idle timeout " + idleTimeout + "ms"));
202 return false;
203 }
204
205 protected void release(HttpChannelOverFCGI channel)
206 {
207 channels.remove(channel.getRequest());
208 destination.release(this);
209 }
210
211 public boolean isClosed()
212 {
213 return closed.get();
214 }
215
216 @Override
217 public void close()
218 {
219 close(new AsynchronousCloseException());
220 }
221
222 protected void close(Throwable failure)
223 {
224 if (closed.compareAndSet(false, true))
225 {
226
227
228 getHttpDestination().close(this);
229 getEndPoint().shutdownOutput();
230 if (LOG.isDebugEnabled())
231 LOG.debug("Shutdown {}", this);
232 getEndPoint().close();
233 if (LOG.isDebugEnabled())
234 LOG.debug("Closed {}", this);
235
236 abort(failure);
237 }
238 }
239
240 protected boolean closeByHTTP(HttpFields fields)
241 {
242 if (multiplexed)
243 return false;
244 if (!fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()))
245 return false;
246 close();
247 return true;
248 }
249
250 protected void abort(Throwable failure)
251 {
252 for (HttpChannelOverFCGI channel : channels.values())
253 {
254 HttpExchange exchange = channel.getHttpExchange();
255 if (exchange != null)
256 exchange.getRequest().abort(failure);
257 }
258 channels.clear();
259 }
260
261 private void failAndClose(Throwable failure)
262 {
263 boolean result = false;
264 for (HttpChannelOverFCGI channel : channels.values())
265 result |= channel.responseFailure(failure);
266 if (result)
267 close(failure);
268 }
269
270 private int acquireRequest()
271 {
272 synchronized (requests)
273 {
274 int last = requests.getLast();
275 int request = last + 1;
276 requests.addLast(request);
277 return request;
278 }
279 }
280
281 private void releaseRequest(int request)
282 {
283 synchronized (requests)
284 {
285 requests.removeFirstOccurrence(request);
286 }
287 }
288
289 protected HttpChannelOverFCGI newHttpChannel(int id, Request request)
290 {
291 return new HttpChannelOverFCGI(this, getFlusher(), id, request.getIdleTimeout());
292 }
293
294 @Override
295 public String toString()
296 {
297 return String.format("%s@%h(l:%s <-> r:%s)",
298 getClass().getSimpleName(),
299 this,
300 getEndPoint().getLocalAddress(),
301 getEndPoint().getRemoteAddress());
302 }
303
304 private class Delegate extends HttpConnection
305 {
306 private Delegate(HttpDestination destination)
307 {
308 super(destination);
309 }
310
311 @Override
312 protected SendFailure send(HttpExchange exchange)
313 {
314 Request request = exchange.getRequest();
315 normalizeRequest(request);
316
317
318 int id = acquireRequest();
319 HttpChannelOverFCGI channel = newHttpChannel(id, request);
320 channels.put(id, channel);
321
322 return send(channel, exchange);
323 }
324
325 @Override
326 public void close()
327 {
328 HttpConnectionOverFCGI.this.close();
329 }
330
331 protected void close(Throwable failure)
332 {
333 HttpConnectionOverFCGI.this.close(failure);
334 }
335
336 @Override
337 public String toString()
338 {
339 return HttpConnectionOverFCGI.this.toString();
340 }
341 }
342
343 private class ResponseListener implements ClientParser.Listener
344 {
345 @Override
346 public void onBegin(int request, int code, String reason)
347 {
348 HttpChannelOverFCGI channel = channels.get(request);
349 if (channel != null)
350 channel.responseBegin(code, reason);
351 else
352 noChannel(request);
353 }
354
355 @Override
356 public void onHeader(int request, HttpField field)
357 {
358 HttpChannelOverFCGI channel = channels.get(request);
359 if (channel != null)
360 channel.responseHeader(field);
361 else
362 noChannel(request);
363 }
364
365 @Override
366 public void onHeaders(int request)
367 {
368 HttpChannelOverFCGI channel = channels.get(request);
369 if (channel != null)
370 channel.responseHeaders();
371 else
372 noChannel(request);
373 }
374
375 @Override
376 public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
377 {
378 switch (stream)
379 {
380 case STD_OUT:
381 {
382 HttpChannelOverFCGI channel = channels.get(request);
383 if (channel != null)
384 {
385 CompletableCallback callback = new CompletableCallback()
386 {
387 @Override
388 public void resume()
389 {
390 if (LOG.isDebugEnabled())
391 LOG.debug("Content consumed asynchronously, resuming processing");
392 process(HttpConnectionOverFCGI.this.buffer);
393 }
394
395 @Override
396 public void abort(Throwable x)
397 {
398 close(x);
399 }
400 };
401
402 boolean proceed = channel.content(buffer, callback);
403 boolean async = callback.tryComplete();
404 return !proceed || async;
405 }
406 else
407 {
408 noChannel(request);
409 }
410 break;
411 }
412 case STD_ERR:
413 {
414 LOG.info(BufferUtil.toUTF8String(buffer));
415 break;
416 }
417 default:
418 {
419 throw new IllegalArgumentException();
420 }
421 }
422 return false;
423 }
424
425 @Override
426 public void onEnd(int request)
427 {
428 HttpChannelOverFCGI channel = channels.get(request);
429 if (channel != null)
430 {
431 if (channel.responseSuccess())
432 releaseRequest(request);
433 }
434 else
435 {
436 noChannel(request);
437 }
438 }
439
440 @Override
441 public void onFailure(int request, Throwable failure)
442 {
443 HttpChannelOverFCGI channel = channels.get(request);
444 if (channel != null)
445 {
446 if (channel.responseFailure(failure))
447 releaseRequest(request);
448 }
449 else
450 {
451 noChannel(request);
452 }
453 }
454
455 private void noChannel(int request)
456 {
457 if (LOG.isDebugEnabled())
458 LOG.debug("Channel not found for request {}", request);
459 }
460 }
461 }