1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.fcgi.client.http;
20
21 import java.io.EOFException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.AsynchronousCloseException;
24 import java.util.LinkedList;
25 import java.util.Map;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.atomic.AtomicBoolean;
29
30 import org.eclipse.jetty.client.HttpClient;
31 import org.eclipse.jetty.client.HttpConnection;
32 import org.eclipse.jetty.client.HttpDestination;
33 import org.eclipse.jetty.client.HttpExchange;
34 import org.eclipse.jetty.client.SendFailure;
35 import org.eclipse.jetty.client.api.Connection;
36 import org.eclipse.jetty.client.api.Request;
37 import org.eclipse.jetty.client.api.Response;
38 import org.eclipse.jetty.fcgi.FCGI;
39 import org.eclipse.jetty.fcgi.generator.Flusher;
40 import org.eclipse.jetty.fcgi.parser.ClientParser;
41 import org.eclipse.jetty.http.HttpField;
42 import org.eclipse.jetty.http.HttpFields;
43 import org.eclipse.jetty.http.HttpHeader;
44 import org.eclipse.jetty.http.HttpHeaderValue;
45 import org.eclipse.jetty.io.AbstractConnection;
46 import org.eclipse.jetty.io.ByteBufferPool;
47 import org.eclipse.jetty.io.EndPoint;
48 import org.eclipse.jetty.util.BufferUtil;
49 import org.eclipse.jetty.util.CompletableCallback;
50 import org.eclipse.jetty.util.Promise;
51 import org.eclipse.jetty.util.log.Log;
52 import org.eclipse.jetty.util.log.Logger;
53
54 public class HttpConnectionOverFCGI extends AbstractConnection implements Connection
55 {
56 private static final Logger LOG = Log.getLogger(HttpConnectionOverFCGI.class);
57
58 private final LinkedList<Integer> requests = new LinkedList<>();
59 private final Map<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>();
60 private final AtomicBoolean closed = new AtomicBoolean();
61 private final HttpDestination destination;
62 private final Promise<Connection> promise;
63 private final boolean multiplexed;
64 private final Flusher flusher;
65 private final Delegate delegate;
66 private final ClientParser parser;
67 private ByteBuffer buffer;
68
69 public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise, boolean multiplexed)
70 {
71 super(endPoint, destination.getHttpClient().getExecutor());
72 this.destination = destination;
73 this.promise = promise;
74 this.multiplexed = multiplexed;
75 this.flusher = new Flusher(endPoint);
76 this.delegate = new Delegate(destination);
77 this.parser = new ClientParser(new ResponseListener());
78 requests.addLast(0);
79 }
80
81 public HttpDestination getHttpDestination()
82 {
83 return destination;
84 }
85
86 protected Flusher getFlusher()
87 {
88 return flusher;
89 }
90
91 @Override
92 public void send(Request request, Response.CompleteListener listener)
93 {
94 delegate.send(request, listener);
95 }
96
97 protected SendFailure send(HttpExchange exchange)
98 {
99 return delegate.send(exchange);
100 }
101
102 @Override
103 public void onOpen()
104 {
105 super.onOpen();
106 fillInterested();
107 promise.succeeded(this);
108 }
109
110 @Override
111 public void onFillable()
112 {
113 buffer = acquireBuffer();
114 process(buffer);
115 }
116
117 private ByteBuffer acquireBuffer()
118 {
119 HttpClient client = destination.getHttpClient();
120 ByteBufferPool bufferPool = client.getByteBufferPool();
121 return bufferPool.acquire(client.getResponseBufferSize(), true);
122 }
123
124 private void releaseBuffer(ByteBuffer buffer)
125 {
126 assert this.buffer == buffer;
127 HttpClient client = destination.getHttpClient();
128 ByteBufferPool bufferPool = client.getByteBufferPool();
129 bufferPool.release(buffer);
130 this.buffer = null;
131 }
132
133 private void process(ByteBuffer buffer)
134 {
135 try
136 {
137 EndPoint endPoint = getEndPoint();
138 boolean looping = false;
139 while (true)
140 {
141 if (!looping && parse(buffer))
142 return;
143
144 int read = endPoint.fill(buffer);
145 if (LOG.isDebugEnabled())
146 LOG.debug("Read {} bytes from {}", read, endPoint);
147
148 if (read > 0)
149 {
150 if (parse(buffer))
151 return;
152 }
153 else if (read == 0)
154 {
155 releaseBuffer(buffer);
156 fillInterested();
157 return;
158 }
159 else
160 {
161 releaseBuffer(buffer);
162 shutdown();
163 return;
164 }
165
166 looping = true;
167 }
168 }
169 catch (Exception x)
170 {
171 if (LOG.isDebugEnabled())
172 LOG.debug(x);
173 releaseBuffer(buffer);
174 close(x);
175 }
176 }
177
178 private boolean parse(ByteBuffer buffer)
179 {
180 return parser.parse(buffer);
181 }
182
183 private void shutdown()
184 {
185
186
187 if (channels.isEmpty())
188 close();
189 else
190 failAndClose(new EOFException(String.valueOf(getEndPoint())));
191 }
192
193 @Override
194 public boolean onIdleExpired()
195 {
196 boolean close = delegate.onIdleTimeout();
197 if (multiplexed)
198 close &= isFillInterested();
199 if (close)
200 close(new TimeoutException("Idle timeout " + getEndPoint().getIdleTimeout() + "ms"));
201 return false;
202 }
203
204 protected void release(HttpChannelOverFCGI channel)
205 {
206 channels.remove(channel.getRequest());
207 destination.release(this);
208 }
209
210 public boolean isClosed()
211 {
212 return closed.get();
213 }
214
215 @Override
216 public void close()
217 {
218 close(new AsynchronousCloseException());
219 }
220
221 protected void close(Throwable failure)
222 {
223 if (closed.compareAndSet(false, true))
224 {
225
226
227 getHttpDestination().close(this);
228 getEndPoint().shutdownOutput();
229 if (LOG.isDebugEnabled())
230 LOG.debug("Shutdown {}", this);
231 getEndPoint().close();
232 if (LOG.isDebugEnabled())
233 LOG.debug("Closed {}", this);
234
235 abort(failure);
236 }
237 }
238
239 protected boolean closeByHTTP(HttpFields fields)
240 {
241 if (multiplexed)
242 return false;
243 if (!fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()))
244 return false;
245 close();
246 return true;
247 }
248
249 protected void abort(Throwable failure)
250 {
251 for (HttpChannelOverFCGI channel : channels.values())
252 {
253 HttpExchange exchange = channel.getHttpExchange();
254 if (exchange != null)
255 exchange.getRequest().abort(failure);
256 }
257 channels.clear();
258 }
259
260 private void failAndClose(Throwable failure)
261 {
262 boolean result = false;
263 for (HttpChannelOverFCGI channel : channels.values())
264 result |= channel.responseFailure(failure);
265 if (result)
266 close(failure);
267 }
268
269 private int acquireRequest()
270 {
271 synchronized (requests)
272 {
273 int last = requests.getLast();
274 int request = last + 1;
275 requests.addLast(request);
276 return request;
277 }
278 }
279
280 private void releaseRequest(int request)
281 {
282 synchronized (requests)
283 {
284 requests.removeFirstOccurrence(request);
285 }
286 }
287
288 protected HttpChannelOverFCGI newHttpChannel(int id, Request request)
289 {
290 return new HttpChannelOverFCGI(this, getFlusher(), id, request.getIdleTimeout());
291 }
292
293 @Override
294 public String toString()
295 {
296 return String.format("%s@%h(l:%s <-> r:%s)",
297 getClass().getSimpleName(),
298 this,
299 getEndPoint().getLocalAddress(),
300 getEndPoint().getRemoteAddress());
301 }
302
303 private class Delegate extends HttpConnection
304 {
305 private Delegate(HttpDestination destination)
306 {
307 super(destination);
308 }
309
310 @Override
311 protected SendFailure send(HttpExchange exchange)
312 {
313 Request request = exchange.getRequest();
314 normalizeRequest(request);
315
316
317 int id = acquireRequest();
318 HttpChannelOverFCGI channel = newHttpChannel(id, request);
319 channels.put(id, channel);
320
321 return send(channel, exchange);
322 }
323
324 @Override
325 public void close()
326 {
327 HttpConnectionOverFCGI.this.close();
328 }
329
330 protected void close(Throwable failure)
331 {
332 HttpConnectionOverFCGI.this.close(failure);
333 }
334
335 @Override
336 public String toString()
337 {
338 return HttpConnectionOverFCGI.this.toString();
339 }
340 }
341
342 private class ResponseListener implements ClientParser.Listener
343 {
344 @Override
345 public void onBegin(int request, int code, String reason)
346 {
347 HttpChannelOverFCGI channel = channels.get(request);
348 if (channel != null)
349 channel.responseBegin(code, reason);
350 else
351 noChannel(request);
352 }
353
354 @Override
355 public void onHeader(int request, HttpField field)
356 {
357 HttpChannelOverFCGI channel = channels.get(request);
358 if (channel != null)
359 channel.responseHeader(field);
360 else
361 noChannel(request);
362 }
363
364 @Override
365 public void onHeaders(int request)
366 {
367 HttpChannelOverFCGI channel = channels.get(request);
368 if (channel != null)
369 channel.responseHeaders();
370 else
371 noChannel(request);
372 }
373
374 @Override
375 public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
376 {
377 switch (stream)
378 {
379 case STD_OUT:
380 {
381 HttpChannelOverFCGI channel = channels.get(request);
382 if (channel != null)
383 {
384 CompletableCallback callback = new CompletableCallback()
385 {
386 @Override
387 public void resume()
388 {
389 if (LOG.isDebugEnabled())
390 LOG.debug("Content consumed asynchronously, resuming processing");
391 process(HttpConnectionOverFCGI.this.buffer);
392 }
393
394 @Override
395 public void abort(Throwable x)
396 {
397 close(x);
398 }
399 };
400
401 boolean proceed = channel.content(buffer, callback);
402 boolean async = callback.tryComplete();
403 return !proceed || async;
404 }
405 else
406 {
407 noChannel(request);
408 }
409 break;
410 }
411 case STD_ERR:
412 {
413 LOG.info(BufferUtil.toUTF8String(buffer));
414 break;
415 }
416 default:
417 {
418 throw new IllegalArgumentException();
419 }
420 }
421 return false;
422 }
423
424 @Override
425 public void onEnd(int request)
426 {
427 HttpChannelOverFCGI channel = channels.get(request);
428 if (channel != null)
429 {
430 if (channel.responseSuccess())
431 releaseRequest(request);
432 }
433 else
434 {
435 noChannel(request);
436 }
437 }
438
439 @Override
440 public void onFailure(int request, Throwable failure)
441 {
442 HttpChannelOverFCGI channel = channels.get(request);
443 if (channel != null)
444 {
445 if (channel.responseFailure(failure))
446 releaseRequest(request);
447 }
448 else
449 {
450 noChannel(request);
451 }
452 }
453
454 private void noChannel(int request)
455 {
456 if (LOG.isDebugEnabled())
457 LOG.debug("Channel not found for request {}", request);
458 }
459 }
460 }