1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.fcgi.client.http;
20
21 import java.io.EOFException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.AsynchronousCloseException;
24 import java.util.LinkedList;
25 import java.util.Map;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.atomic.AtomicBoolean;
29
30 import org.eclipse.jetty.client.HttpClient;
31 import org.eclipse.jetty.client.HttpConnection;
32 import org.eclipse.jetty.client.HttpDestination;
33 import org.eclipse.jetty.client.HttpExchange;
34 import org.eclipse.jetty.client.api.Connection;
35 import org.eclipse.jetty.client.api.Request;
36 import org.eclipse.jetty.client.api.Response;
37 import org.eclipse.jetty.fcgi.FCGI;
38 import org.eclipse.jetty.fcgi.generator.Flusher;
39 import org.eclipse.jetty.fcgi.parser.ClientParser;
40 import org.eclipse.jetty.http.HttpField;
41 import org.eclipse.jetty.http.HttpFields;
42 import org.eclipse.jetty.http.HttpHeader;
43 import org.eclipse.jetty.http.HttpHeaderValue;
44 import org.eclipse.jetty.io.AbstractConnection;
45 import org.eclipse.jetty.io.ByteBufferPool;
46 import org.eclipse.jetty.io.EndPoint;
47 import org.eclipse.jetty.util.BufferUtil;
48 import org.eclipse.jetty.util.CompletableCallback;
49 import org.eclipse.jetty.util.log.Log;
50 import org.eclipse.jetty.util.log.Logger;
51
52 public class HttpConnectionOverFCGI extends AbstractConnection implements Connection
53 {
54 private static final Logger LOG = Log.getLogger(HttpConnectionOverFCGI.class);
55
56 private final LinkedList<Integer> requests = new LinkedList<>();
57 private final Map<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>();
58 private final AtomicBoolean closed = new AtomicBoolean();
59 private final Flusher flusher;
60 private final HttpDestination destination;
61 private final boolean multiplexed;
62 private final Delegate delegate;
63 private final ClientParser parser;
64 private ByteBuffer buffer;
65
66 public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, boolean multiplexed)
67 {
68 super(endPoint, destination.getHttpClient().getExecutor(), destination.getHttpClient().isDispatchIO());
69 this.destination = destination;
70 this.multiplexed = multiplexed;
71 this.flusher = new Flusher(endPoint);
72 this.delegate = new Delegate(destination);
73 this.parser = new ClientParser(new ResponseListener());
74 requests.addLast(0);
75 }
76
77 public HttpDestination getHttpDestination()
78 {
79 return destination;
80 }
81
82 @Override
83 public void send(Request request, Response.CompleteListener listener)
84 {
85 delegate.send(request, listener);
86 }
87
88 protected void send(HttpExchange exchange)
89 {
90 delegate.send(exchange);
91 }
92
93 @Override
94 public void onOpen()
95 {
96 super.onOpen();
97 fillInterested();
98 }
99
100 @Override
101 public void onFillable()
102 {
103 HttpClient client = destination.getHttpClient();
104 ByteBufferPool bufferPool = client.getByteBufferPool();
105 buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
106 process();
107 }
108
109 private void process()
110 {
111 if (readAndParse())
112 {
113 HttpClient client = destination.getHttpClient();
114 ByteBufferPool bufferPool = client.getByteBufferPool();
115 bufferPool.release(buffer);
116
117 buffer = null;
118 }
119 }
120
121 private boolean readAndParse()
122 {
123 EndPoint endPoint = getEndPoint();
124 ByteBuffer buffer = this.buffer;
125 while (true)
126 {
127 try
128 {
129 if (parse(buffer))
130 return false;
131
132 int read = endPoint.fill(buffer);
133 if (LOG.isDebugEnabled())
134 LOG.debug("Read {} bytes from {}", read, endPoint);
135 if (read > 0)
136 {
137 if (parse(buffer))
138 return false;
139 }
140 else if (read == 0)
141 {
142 fillInterested();
143 return true;
144 }
145 else
146 {
147 shutdown();
148 return true;
149 }
150 }
151 catch (Exception x)
152 {
153 if (LOG.isDebugEnabled())
154 LOG.debug(x);
155 close(x);
156 return false;
157 }
158 }
159 }
160
161 private boolean parse(ByteBuffer buffer)
162 {
163 return parser.parse(buffer);
164 }
165
166 private void shutdown()
167 {
168
169
170 if (channels.isEmpty())
171 close();
172 else
173 failAndClose(new EOFException());
174 }
175
176 @Override
177 protected boolean onReadTimeout()
178 {
179 close(new TimeoutException());
180 return false;
181 }
182
183 protected void release(HttpChannelOverFCGI channel)
184 {
185 channels.remove(channel.getRequest());
186 destination.release(this);
187 }
188
189 @Override
190 public void close()
191 {
192 close(new AsynchronousCloseException());
193 }
194
195 protected void close(Throwable failure)
196 {
197 if (closed.compareAndSet(false, true))
198 {
199
200
201 getHttpDestination().close(this);
202 getEndPoint().shutdownOutput();
203 if (LOG.isDebugEnabled())
204 LOG.debug("{} oshut", this);
205 getEndPoint().close();
206 if (LOG.isDebugEnabled())
207 LOG.debug("{} closed", this);
208
209 abort(failure);
210 }
211 }
212
213 protected boolean closeByHTTP(HttpFields fields)
214 {
215 if (multiplexed)
216 return false;
217 if (!fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()))
218 return false;
219 close();
220 return true;
221 }
222
223 protected void abort(Throwable failure)
224 {
225 for (HttpChannelOverFCGI channel : channels.values())
226 {
227 HttpExchange exchange = channel.getHttpExchange();
228 if (exchange != null)
229 exchange.getRequest().abort(failure);
230 }
231 channels.clear();
232 }
233
234 private void failAndClose(Throwable failure)
235 {
236 boolean result = false;
237 for (HttpChannelOverFCGI channel : channels.values())
238 result |= channel.responseFailure(failure);
239 if (result)
240 close(failure);
241 }
242
243 private int acquireRequest()
244 {
245 synchronized (requests)
246 {
247 int last = requests.getLast();
248 int request = last + 1;
249 requests.addLast(request);
250 return request;
251 }
252 }
253
254 private void releaseRequest(int request)
255 {
256 synchronized (requests)
257 {
258 requests.removeFirstOccurrence(request);
259 }
260 }
261
262 @Override
263 public String toString()
264 {
265 return String.format("%s@%h(l:%s <-> r:%s)",
266 getClass().getSimpleName(),
267 this,
268 getEndPoint().getLocalAddress(),
269 getEndPoint().getRemoteAddress());
270 }
271
272 private class Delegate extends HttpConnection
273 {
274 private Delegate(HttpDestination destination)
275 {
276 super(destination);
277 }
278
279 @Override
280 protected void send(HttpExchange exchange)
281 {
282 Request request = exchange.getRequest();
283 normalizeRequest(request);
284
285
286 int id = acquireRequest();
287 HttpChannelOverFCGI channel = new HttpChannelOverFCGI(HttpConnectionOverFCGI.this, flusher, id, request.getIdleTimeout());
288 channels.put(id, channel);
289 channel.associate(exchange);
290 channel.send();
291 }
292
293 @Override
294 public void close()
295 {
296 HttpConnectionOverFCGI.this.close();
297 }
298
299 @Override
300 public String toString()
301 {
302 return HttpConnectionOverFCGI.this.toString();
303 }
304 }
305
306 private class ResponseListener implements ClientParser.Listener
307 {
308 @Override
309 public void onBegin(int request, int code, String reason)
310 {
311 HttpChannelOverFCGI channel = channels.get(request);
312 if (channel != null)
313 channel.responseBegin(code, reason);
314 else
315 noChannel(request);
316 }
317
318 @Override
319 public void onHeader(int request, HttpField field)
320 {
321 HttpChannelOverFCGI channel = channels.get(request);
322 if (channel != null)
323 channel.responseHeader(field);
324 else
325 noChannel(request);
326 }
327
328 @Override
329 public void onHeaders(int request)
330 {
331 HttpChannelOverFCGI channel = channels.get(request);
332 if (channel != null)
333 channel.responseHeaders();
334 else
335 noChannel(request);
336 }
337
338 @Override
339 public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
340 {
341 switch (stream)
342 {
343 case STD_OUT:
344 {
345 HttpChannelOverFCGI channel = channels.get(request);
346 if (channel != null)
347 {
348 CompletableCallback callback = new CompletableCallback()
349 {
350 @Override
351 public void resume()
352 {
353 if (LOG.isDebugEnabled())
354 LOG.debug("Content consumed asynchronously, resuming processing");
355 process();
356 }
357
358 @Override
359 public void abort(Throwable x)
360 {
361 close(x);
362 }
363 };
364 if (!channel.content(buffer, callback))
365 return true;
366 return callback.tryComplete();
367 }
368 else
369 {
370 noChannel(request);
371 }
372 break;
373 }
374 case STD_ERR:
375 {
376 LOG.info(BufferUtil.toUTF8String(buffer));
377 break;
378 }
379 default:
380 {
381 throw new IllegalArgumentException();
382 }
383 }
384 return false;
385 }
386
387 @Override
388 public void onEnd(int request)
389 {
390 HttpChannelOverFCGI channel = channels.get(request);
391 if (channel != null)
392 {
393 if (channel.responseSuccess())
394 releaseRequest(request);
395 }
396 else
397 {
398 noChannel(request);
399 }
400 }
401
402 @Override
403 public void onFailure(int request, Throwable failure)
404 {
405 HttpChannelOverFCGI channel = channels.get(request);
406 if (channel != null)
407 {
408 if (channel.responseFailure(failure))
409 releaseRequest(request);
410 }
411 else
412 {
413 noChannel(request);
414 }
415 }
416
417 private void noChannel(int request)
418 {
419 if (LOG.isDebugEnabled())
420 LOG.debug("Channel not found for request {}", request);
421 }
422 }
423 }