View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.fcgi.client.http;
20  
21  import java.io.EOFException;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.AsynchronousCloseException;
24  import java.util.LinkedList;
25  import java.util.Map;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.TimeoutException;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  
30  import org.eclipse.jetty.client.HttpClient;
31  import org.eclipse.jetty.client.HttpConnection;
32  import org.eclipse.jetty.client.HttpDestination;
33  import org.eclipse.jetty.client.HttpExchange;
34  import org.eclipse.jetty.client.SendFailure;
35  import org.eclipse.jetty.client.api.Connection;
36  import org.eclipse.jetty.client.api.Request;
37  import org.eclipse.jetty.client.api.Response;
38  import org.eclipse.jetty.fcgi.FCGI;
39  import org.eclipse.jetty.fcgi.generator.Flusher;
40  import org.eclipse.jetty.fcgi.parser.ClientParser;
41  import org.eclipse.jetty.http.HttpField;
42  import org.eclipse.jetty.http.HttpFields;
43  import org.eclipse.jetty.http.HttpHeader;
44  import org.eclipse.jetty.http.HttpHeaderValue;
45  import org.eclipse.jetty.io.AbstractConnection;
46  import org.eclipse.jetty.io.ByteBufferPool;
47  import org.eclipse.jetty.io.EndPoint;
48  import org.eclipse.jetty.util.BufferUtil;
49  import org.eclipse.jetty.util.CompletableCallback;
50  import org.eclipse.jetty.util.Promise;
51  import org.eclipse.jetty.util.log.Log;
52  import org.eclipse.jetty.util.log.Logger;
53  
54  public class HttpConnectionOverFCGI extends AbstractConnection implements Connection
55  {
56      private static final Logger LOG = Log.getLogger(HttpConnectionOverFCGI.class);
57  
58      private final LinkedList<Integer> requests = new LinkedList<>();
59      private final Map<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>();
60      private final AtomicBoolean closed = new AtomicBoolean();
61      private final HttpDestination destination;
62      private final Promise<Connection> promise;
63      private final boolean multiplexed;
64      private final Flusher flusher;
65      private final Delegate delegate;
66      private final ClientParser parser;
67      private ByteBuffer buffer;
68  
69      public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise, boolean multiplexed)
70      {
71          super(endPoint, destination.getHttpClient().getExecutor());
72          this.destination = destination;
73          this.promise = promise;
74          this.multiplexed = multiplexed;
75          this.flusher = new Flusher(endPoint);
76          this.delegate = new Delegate(destination);
77          this.parser = new ClientParser(new ResponseListener());
78          requests.addLast(0);
79      }
80  
81      public HttpDestination getHttpDestination()
82      {
83          return destination;
84      }
85  
86      protected Flusher getFlusher()
87      {
88          return flusher;
89      }
90  
91      @Override
92      public void send(Request request, Response.CompleteListener listener)
93      {
94          delegate.send(request, listener);
95      }
96  
97      protected SendFailure send(HttpExchange exchange)
98      {
99          return delegate.send(exchange);
100     }
101 
102     @Override
103     public void onOpen()
104     {
105         super.onOpen();
106         fillInterested();
107         promise.succeeded(this);
108     }
109 
110     @Override
111     public void onFillable()
112     {
113         buffer = acquireBuffer();
114         process(buffer);
115     }
116 
117     private ByteBuffer acquireBuffer()
118     {
119         HttpClient client = destination.getHttpClient();
120         ByteBufferPool bufferPool = client.getByteBufferPool();
121         return bufferPool.acquire(client.getResponseBufferSize(), true);
122     }
123 
124     private void releaseBuffer(ByteBuffer buffer)
125     {
126         assert this.buffer == buffer;
127         HttpClient client = destination.getHttpClient();
128         ByteBufferPool bufferPool = client.getByteBufferPool();
129         bufferPool.release(buffer);
130         this.buffer = null;
131     }
132 
133     private void process(ByteBuffer buffer)
134     {
135         try
136         {
137             EndPoint endPoint = getEndPoint();
138             boolean looping = false;
139             while (true)
140             {
141                 if (!looping && parse(buffer))
142                     return;
143 
144                 int read = endPoint.fill(buffer);
145                 if (LOG.isDebugEnabled())
146                     LOG.debug("Read {} bytes from {}", read, endPoint);
147 
148                 if (read > 0)
149                 {
150                     if (parse(buffer))
151                         return;
152                 }
153                 else if (read == 0)
154                 {
155                     releaseBuffer(buffer);
156                     fillInterested();
157                     return;
158                 }
159                 else
160                 {
161                     releaseBuffer(buffer);
162                     shutdown();
163                     return;
164                 }
165 
166                 looping = true;
167             }
168         }
169         catch (Exception x)
170         {
171             if (LOG.isDebugEnabled())
172                 LOG.debug(x);
173             releaseBuffer(buffer);
174             close(x);
175         }
176     }
177 
178     private boolean parse(ByteBuffer buffer)
179     {
180         return parser.parse(buffer);
181     }
182 
183     private void shutdown()
184     {
185         // Close explicitly only if we are idle, since the request may still
186         // be in progress, otherwise close only if we can fail the responses.
187         if (channels.isEmpty())
188             close();
189         else
190             failAndClose(new EOFException(String.valueOf(getEndPoint())));
191     }
192 
193     @Override
194     public boolean onIdleExpired()
195     {
196         long idleTimeout = getEndPoint().getIdleTimeout();
197         boolean close = delegate.onIdleTimeout(idleTimeout);
198         if (multiplexed)
199             close &= isFillInterested();
200         if (close)
201             close(new TimeoutException("Idle timeout " + idleTimeout + "ms"));
202         return false;
203     }
204 
205     protected void release(HttpChannelOverFCGI channel)
206     {
207         channels.remove(channel.getRequest());
208         destination.release(this);
209     }
210 
211     public boolean isClosed()
212     {
213         return closed.get();
214     }
215 
216     @Override
217     public void close()
218     {
219         close(new AsynchronousCloseException());
220     }
221 
222     protected void close(Throwable failure)
223     {
224         if (closed.compareAndSet(false, true))
225         {
226             // First close then abort, to be sure that the connection cannot be reused
227             // from an onFailure() handler or by blocking code waiting for completion.
228             getHttpDestination().close(this);
229             getEndPoint().shutdownOutput();
230             if (LOG.isDebugEnabled())
231                 LOG.debug("Shutdown {}", this);
232             getEndPoint().close();
233             if (LOG.isDebugEnabled())
234                 LOG.debug("Closed {}", this);
235 
236             abort(failure);
237         }
238     }
239 
240     protected boolean closeByHTTP(HttpFields fields)
241     {
242         if (multiplexed)
243             return false;
244         if (!fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()))
245             return false;
246         close();
247         return true;
248     }
249 
250     protected void abort(Throwable failure)
251     {
252         for (HttpChannelOverFCGI channel : channels.values())
253         {
254             HttpExchange exchange = channel.getHttpExchange();
255             if (exchange != null)
256                 exchange.getRequest().abort(failure);
257         }
258         channels.clear();
259     }
260 
261     private void failAndClose(Throwable failure)
262     {
263         boolean result = false;
264         for (HttpChannelOverFCGI channel : channels.values())
265             result |= channel.responseFailure(failure);
266         if (result)
267             close(failure);
268     }
269 
270     private int acquireRequest()
271     {
272         synchronized (requests)
273         {
274             int last = requests.getLast();
275             int request = last + 1;
276             requests.addLast(request);
277             return request;
278         }
279     }
280 
281     private void releaseRequest(int request)
282     {
283         synchronized (requests)
284         {
285             requests.removeFirstOccurrence(request);
286         }
287     }
288 
289     protected HttpChannelOverFCGI newHttpChannel(int id, Request request)
290     {
291         return new HttpChannelOverFCGI(this, getFlusher(), id, request.getIdleTimeout());
292     }
293 
294     @Override
295     public String toString()
296     {
297         return String.format("%s@%h(l:%s <-> r:%s)",
298                 getClass().getSimpleName(),
299                 this,
300                 getEndPoint().getLocalAddress(),
301                 getEndPoint().getRemoteAddress());
302     }
303 
304     private class Delegate extends HttpConnection
305     {
306         private Delegate(HttpDestination destination)
307         {
308             super(destination);
309         }
310 
311         @Override
312         protected SendFailure send(HttpExchange exchange)
313         {
314             Request request = exchange.getRequest();
315             normalizeRequest(request);
316 
317             // FCGI may be multiplexed, so create one channel for each request.
318             int id = acquireRequest();
319             HttpChannelOverFCGI channel = newHttpChannel(id, request);
320             channels.put(id, channel);
321 
322             return send(channel, exchange);
323         }
324 
325         @Override
326         public void close()
327         {
328             HttpConnectionOverFCGI.this.close();
329         }
330 
331         protected void close(Throwable failure)
332         {
333             HttpConnectionOverFCGI.this.close(failure);
334         }
335 
336         @Override
337         public String toString()
338         {
339             return HttpConnectionOverFCGI.this.toString();
340         }
341     }
342 
343     private class ResponseListener implements ClientParser.Listener
344     {
345         @Override
346         public void onBegin(int request, int code, String reason)
347         {
348             HttpChannelOverFCGI channel = channels.get(request);
349             if (channel != null)
350                 channel.responseBegin(code, reason);
351             else
352                 noChannel(request);
353         }
354 
355         @Override
356         public void onHeader(int request, HttpField field)
357         {
358             HttpChannelOverFCGI channel = channels.get(request);
359             if (channel != null)
360                 channel.responseHeader(field);
361             else
362                 noChannel(request);
363         }
364 
365         @Override
366         public void onHeaders(int request)
367         {
368             HttpChannelOverFCGI channel = channels.get(request);
369             if (channel != null)
370                 channel.responseHeaders();
371             else
372                 noChannel(request);
373         }
374 
375         @Override
376         public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
377         {
378             switch (stream)
379             {
380                 case STD_OUT:
381                 {
382                     HttpChannelOverFCGI channel = channels.get(request);
383                     if (channel != null)
384                     {
385                         CompletableCallback callback = new CompletableCallback()
386                         {
387                             @Override
388                             public void resume()
389                             {
390                                 if (LOG.isDebugEnabled())
391                                     LOG.debug("Content consumed asynchronously, resuming processing");
392                                 process(HttpConnectionOverFCGI.this.buffer);
393                             }
394 
395                             @Override
396                             public void abort(Throwable x)
397                             {
398                                 close(x);
399                             }
400                         };
401                         // Do not short circuit these calls.
402                         boolean proceed = channel.content(buffer, callback);
403                         boolean async = callback.tryComplete();
404                         return !proceed || async;
405                     }
406                     else
407                     {
408                         noChannel(request);
409                     }
410                     break;
411                 }
412                 case STD_ERR:
413                 {
414                     LOG.info(BufferUtil.toUTF8String(buffer));
415                     break;
416                 }
417                 default:
418                 {
419                     throw new IllegalArgumentException();
420                 }
421             }
422             return false;
423         }
424 
425         @Override
426         public void onEnd(int request)
427         {
428             HttpChannelOverFCGI channel = channels.get(request);
429             if (channel != null)
430             {
431                 if (channel.responseSuccess())
432                     releaseRequest(request);
433             }
434             else
435             {
436                 noChannel(request);
437             }
438         }
439 
440         @Override
441         public void onFailure(int request, Throwable failure)
442         {
443             HttpChannelOverFCGI channel = channels.get(request);
444             if (channel != null)
445             {
446                 if (channel.responseFailure(failure))
447                     releaseRequest(request);
448             }
449             else
450             {
451                 noChannel(request);
452             }
453         }
454 
455         private void noChannel(int request)
456         {
457             if (LOG.isDebugEnabled())
458                 LOG.debug("Channel not found for request {}", request);
459         }
460     }
461 }