View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.fcgi.client.http;
20  
21  import java.io.EOFException;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.AsynchronousCloseException;
24  import java.util.LinkedList;
25  import java.util.Map;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.TimeoutException;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  
30  import org.eclipse.jetty.client.HttpClient;
31  import org.eclipse.jetty.client.HttpConnection;
32  import org.eclipse.jetty.client.HttpDestination;
33  import org.eclipse.jetty.client.HttpExchange;
34  import org.eclipse.jetty.client.SendFailure;
35  import org.eclipse.jetty.client.api.Connection;
36  import org.eclipse.jetty.client.api.Request;
37  import org.eclipse.jetty.client.api.Response;
38  import org.eclipse.jetty.fcgi.FCGI;
39  import org.eclipse.jetty.fcgi.generator.Flusher;
40  import org.eclipse.jetty.fcgi.parser.ClientParser;
41  import org.eclipse.jetty.http.HttpField;
42  import org.eclipse.jetty.http.HttpFields;
43  import org.eclipse.jetty.http.HttpHeader;
44  import org.eclipse.jetty.http.HttpHeaderValue;
45  import org.eclipse.jetty.io.AbstractConnection;
46  import org.eclipse.jetty.io.ByteBufferPool;
47  import org.eclipse.jetty.io.EndPoint;
48  import org.eclipse.jetty.util.BufferUtil;
49  import org.eclipse.jetty.util.CompletableCallback;
50  import org.eclipse.jetty.util.Promise;
51  import org.eclipse.jetty.util.log.Log;
52  import org.eclipse.jetty.util.log.Logger;
53  
54  public class HttpConnectionOverFCGI extends AbstractConnection implements Connection
55  {
56      private static final Logger LOG = Log.getLogger(HttpConnectionOverFCGI.class);
57  
58      private final LinkedList<Integer> requests = new LinkedList<>();
59      private final Map<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>();
60      private final AtomicBoolean closed = new AtomicBoolean();
61      private final HttpDestination destination;
62      private final Promise<Connection> promise;
63      private final boolean multiplexed;
64      private final Flusher flusher;
65      private final Delegate delegate;
66      private final ClientParser parser;
67      private ByteBuffer buffer;
68  
69      public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise, boolean multiplexed)
70      {
71          super(endPoint, destination.getHttpClient().getExecutor());
72          this.destination = destination;
73          this.promise = promise;
74          this.multiplexed = multiplexed;
75          this.flusher = new Flusher(endPoint);
76          this.delegate = new Delegate(destination);
77          this.parser = new ClientParser(new ResponseListener());
78          requests.addLast(0);
79      }
80  
81      public HttpDestination getHttpDestination()
82      {
83          return destination;
84      }
85  
86      protected Flusher getFlusher()
87      {
88          return flusher;
89      }
90  
91      @Override
92      public void send(Request request, Response.CompleteListener listener)
93      {
94          delegate.send(request, listener);
95      }
96  
97      protected SendFailure send(HttpExchange exchange)
98      {
99          return delegate.send(exchange);
100     }
101 
102     @Override
103     public void onOpen()
104     {
105         super.onOpen();
106         fillInterested();
107         promise.succeeded(this);
108     }
109 
110     @Override
111     public void onFillable()
112     {
113         buffer = acquireBuffer();
114         process(buffer);
115     }
116 
117     private ByteBuffer acquireBuffer()
118     {
119         HttpClient client = destination.getHttpClient();
120         ByteBufferPool bufferPool = client.getByteBufferPool();
121         return bufferPool.acquire(client.getResponseBufferSize(), true);
122     }
123 
124     private void releaseBuffer(ByteBuffer buffer)
125     {
126         assert this.buffer == buffer;
127         HttpClient client = destination.getHttpClient();
128         ByteBufferPool bufferPool = client.getByteBufferPool();
129         bufferPool.release(buffer);
130         this.buffer = null;
131     }
132 
133     private void process(ByteBuffer buffer)
134     {
135         try
136         {
137             EndPoint endPoint = getEndPoint();
138             boolean looping = false;
139             while (true)
140             {
141                 if (!looping && parse(buffer))
142                     return;
143 
144                 int read = endPoint.fill(buffer);
145                 if (LOG.isDebugEnabled())
146                     LOG.debug("Read {} bytes from {}", read, endPoint);
147 
148                 if (read > 0)
149                 {
150                     if (parse(buffer))
151                         return;
152                 }
153                 else if (read == 0)
154                 {
155                     releaseBuffer(buffer);
156                     fillInterested();
157                     return;
158                 }
159                 else
160                 {
161                     releaseBuffer(buffer);
162                     shutdown();
163                     return;
164                 }
165 
166                 looping = true;
167             }
168         }
169         catch (Exception x)
170         {
171             if (LOG.isDebugEnabled())
172                 LOG.debug(x);
173             releaseBuffer(buffer);
174             close(x);
175         }
176     }
177 
178     private boolean parse(ByteBuffer buffer)
179     {
180         return parser.parse(buffer);
181     }
182 
183     private void shutdown()
184     {
185         // Close explicitly only if we are idle, since the request may still
186         // be in progress, otherwise close only if we can fail the responses.
187         if (channels.isEmpty())
188             close();
189         else
190             failAndClose(new EOFException(String.valueOf(getEndPoint())));
191     }
192 
193     @Override
194     public boolean onIdleExpired()
195     {
196         boolean close = delegate.onIdleTimeout();
197         if (multiplexed)
198             close &= isFillInterested();
199         if (close)
200             close(new TimeoutException("Idle timeout " + getEndPoint().getIdleTimeout() + "ms"));
201         return false;
202     }
203 
204     protected void release(HttpChannelOverFCGI channel)
205     {
206         channels.remove(channel.getRequest());
207         destination.release(this);
208     }
209 
210     public boolean isClosed()
211     {
212         return closed.get();
213     }
214 
215     @Override
216     public void close()
217     {
218         close(new AsynchronousCloseException());
219     }
220 
221     protected void close(Throwable failure)
222     {
223         if (closed.compareAndSet(false, true))
224         {
225             // First close then abort, to be sure that the connection cannot be reused
226             // from an onFailure() handler or by blocking code waiting for completion.
227             getHttpDestination().close(this);
228             getEndPoint().shutdownOutput();
229             if (LOG.isDebugEnabled())
230                 LOG.debug("Shutdown {}", this);
231             getEndPoint().close();
232             if (LOG.isDebugEnabled())
233                 LOG.debug("Closed {}", this);
234 
235             abort(failure);
236         }
237     }
238 
239     protected boolean closeByHTTP(HttpFields fields)
240     {
241         if (multiplexed)
242             return false;
243         if (!fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()))
244             return false;
245         close();
246         return true;
247     }
248 
249     protected void abort(Throwable failure)
250     {
251         for (HttpChannelOverFCGI channel : channels.values())
252         {
253             HttpExchange exchange = channel.getHttpExchange();
254             if (exchange != null)
255                 exchange.getRequest().abort(failure);
256         }
257         channels.clear();
258     }
259 
260     private void failAndClose(Throwable failure)
261     {
262         boolean result = false;
263         for (HttpChannelOverFCGI channel : channels.values())
264             result |= channel.responseFailure(failure);
265         if (result)
266             close(failure);
267     }
268 
269     private int acquireRequest()
270     {
271         synchronized (requests)
272         {
273             int last = requests.getLast();
274             int request = last + 1;
275             requests.addLast(request);
276             return request;
277         }
278     }
279 
280     private void releaseRequest(int request)
281     {
282         synchronized (requests)
283         {
284             requests.removeFirstOccurrence(request);
285         }
286     }
287 
288     protected HttpChannelOverFCGI newHttpChannel(int id, Request request)
289     {
290         return new HttpChannelOverFCGI(this, getFlusher(), id, request.getIdleTimeout());
291     }
292 
293     @Override
294     public String toString()
295     {
296         return String.format("%s@%h(l:%s <-> r:%s)",
297                 getClass().getSimpleName(),
298                 this,
299                 getEndPoint().getLocalAddress(),
300                 getEndPoint().getRemoteAddress());
301     }
302 
303     private class Delegate extends HttpConnection
304     {
305         private Delegate(HttpDestination destination)
306         {
307             super(destination);
308         }
309 
310         @Override
311         protected SendFailure send(HttpExchange exchange)
312         {
313             Request request = exchange.getRequest();
314             normalizeRequest(request);
315 
316             // FCGI may be multiplexed, so create one channel for each request.
317             int id = acquireRequest();
318             HttpChannelOverFCGI channel = newHttpChannel(id, request);
319             channels.put(id, channel);
320 
321             return send(channel, exchange);
322         }
323 
324         @Override
325         public void close()
326         {
327             HttpConnectionOverFCGI.this.close();
328         }
329 
330         protected void close(Throwable failure)
331         {
332             HttpConnectionOverFCGI.this.close(failure);
333         }
334 
335         @Override
336         public String toString()
337         {
338             return HttpConnectionOverFCGI.this.toString();
339         }
340     }
341 
342     private class ResponseListener implements ClientParser.Listener
343     {
344         @Override
345         public void onBegin(int request, int code, String reason)
346         {
347             HttpChannelOverFCGI channel = channels.get(request);
348             if (channel != null)
349                 channel.responseBegin(code, reason);
350             else
351                 noChannel(request);
352         }
353 
354         @Override
355         public void onHeader(int request, HttpField field)
356         {
357             HttpChannelOverFCGI channel = channels.get(request);
358             if (channel != null)
359                 channel.responseHeader(field);
360             else
361                 noChannel(request);
362         }
363 
364         @Override
365         public void onHeaders(int request)
366         {
367             HttpChannelOverFCGI channel = channels.get(request);
368             if (channel != null)
369                 channel.responseHeaders();
370             else
371                 noChannel(request);
372         }
373 
374         @Override
375         public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
376         {
377             switch (stream)
378             {
379                 case STD_OUT:
380                 {
381                     HttpChannelOverFCGI channel = channels.get(request);
382                     if (channel != null)
383                     {
384                         CompletableCallback callback = new CompletableCallback()
385                         {
386                             @Override
387                             public void resume()
388                             {
389                                 if (LOG.isDebugEnabled())
390                                     LOG.debug("Content consumed asynchronously, resuming processing");
391                                 process(HttpConnectionOverFCGI.this.buffer);
392                             }
393 
394                             @Override
395                             public void abort(Throwable x)
396                             {
397                                 close(x);
398                             }
399                         };
400                         // Do not short circuit these calls.
401                         boolean proceed = channel.content(buffer, callback);
402                         boolean async = callback.tryComplete();
403                         return !proceed || async;
404                     }
405                     else
406                     {
407                         noChannel(request);
408                     }
409                     break;
410                 }
411                 case STD_ERR:
412                 {
413                     LOG.info(BufferUtil.toUTF8String(buffer));
414                     break;
415                 }
416                 default:
417                 {
418                     throw new IllegalArgumentException();
419                 }
420             }
421             return false;
422         }
423 
424         @Override
425         public void onEnd(int request)
426         {
427             HttpChannelOverFCGI channel = channels.get(request);
428             if (channel != null)
429             {
430                 if (channel.responseSuccess())
431                     releaseRequest(request);
432             }
433             else
434             {
435                 noChannel(request);
436             }
437         }
438 
439         @Override
440         public void onFailure(int request, Throwable failure)
441         {
442             HttpChannelOverFCGI channel = channels.get(request);
443             if (channel != null)
444             {
445                 if (channel.responseFailure(failure))
446                     releaseRequest(request);
447             }
448             else
449             {
450                 noChannel(request);
451             }
452         }
453 
454         private void noChannel(int request)
455         {
456             if (LOG.isDebugEnabled())
457                 LOG.debug("Channel not found for request {}", request);
458         }
459     }
460 }