View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.fcgi.client.http;
20  
21  import java.io.EOFException;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.AsynchronousCloseException;
24  import java.util.LinkedList;
25  import java.util.Map;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.TimeoutException;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  
30  import org.eclipse.jetty.client.HttpClient;
31  import org.eclipse.jetty.client.HttpConnection;
32  import org.eclipse.jetty.client.HttpDestination;
33  import org.eclipse.jetty.client.HttpExchange;
34  import org.eclipse.jetty.client.api.Connection;
35  import org.eclipse.jetty.client.api.Request;
36  import org.eclipse.jetty.client.api.Response;
37  import org.eclipse.jetty.fcgi.FCGI;
38  import org.eclipse.jetty.fcgi.generator.Flusher;
39  import org.eclipse.jetty.fcgi.parser.ClientParser;
40  import org.eclipse.jetty.http.HttpField;
41  import org.eclipse.jetty.http.HttpFields;
42  import org.eclipse.jetty.http.HttpHeader;
43  import org.eclipse.jetty.http.HttpHeaderValue;
44  import org.eclipse.jetty.io.AbstractConnection;
45  import org.eclipse.jetty.io.ByteBufferPool;
46  import org.eclipse.jetty.io.EndPoint;
47  import org.eclipse.jetty.util.BufferUtil;
48  import org.eclipse.jetty.util.CompletableCallback;
49  import org.eclipse.jetty.util.log.Log;
50  import org.eclipse.jetty.util.log.Logger;
51  
52  public class HttpConnectionOverFCGI extends AbstractConnection implements Connection
53  {
54      private static final Logger LOG = Log.getLogger(HttpConnectionOverFCGI.class);
55  
56      private final LinkedList<Integer> requests = new LinkedList<>();
57      private final Map<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>();
58      private final AtomicBoolean closed = new AtomicBoolean();
59      private final Flusher flusher;
60      private final HttpDestination destination;
61      private final boolean multiplexed;
62      private final Delegate delegate;
63      private final ClientParser parser;
64      private ByteBuffer buffer;
65  
66      public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, boolean multiplexed)
67      {
68          super(endPoint, destination.getHttpClient().getExecutor(), destination.getHttpClient().isDispatchIO());
69          this.destination = destination;
70          this.multiplexed = multiplexed;
71          this.flusher = new Flusher(endPoint);
72          this.delegate = new Delegate(destination);
73          this.parser = new ClientParser(new ResponseListener());
74          requests.addLast(0);
75      }
76  
77      public HttpDestination getHttpDestination()
78      {
79          return destination;
80      }
81  
82      @Override
83      public void send(Request request, Response.CompleteListener listener)
84      {
85          delegate.send(request, listener);
86      }
87  
88      protected void send(HttpExchange exchange)
89      {
90          delegate.send(exchange);
91      }
92  
93      @Override
94      public void onOpen()
95      {
96          super.onOpen();
97          fillInterested();
98      }
99  
100     @Override
101     public void onFillable()
102     {
103         HttpClient client = destination.getHttpClient();
104         ByteBufferPool bufferPool = client.getByteBufferPool();
105         buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
106         process();
107     }
108 
109     private void process()
110     {
111         if (readAndParse())
112         {
113             HttpClient client = destination.getHttpClient();
114             ByteBufferPool bufferPool = client.getByteBufferPool();
115             bufferPool.release(buffer);
116             // Don't linger the buffer around if we are idle.
117             buffer = null;
118         }
119     }
120 
121     private boolean readAndParse()
122     {
123         EndPoint endPoint = getEndPoint();
124         ByteBuffer buffer = this.buffer;
125         while (true)
126         {
127             try
128             {
129                 if (parse(buffer))
130                     return false;
131 
132                 int read = endPoint.fill(buffer);
133                 if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'.
134                     LOG.debug("Read {} bytes from {}", read, endPoint);
135                 if (read > 0)
136                 {
137                     if (parse(buffer))
138                         return false;
139                 }
140                 else if (read == 0)
141                 {
142                     fillInterested();
143                     return true;
144                 }
145                 else
146                 {
147                     shutdown();
148                     return true;
149                 }
150             }
151             catch (Exception x)
152             {
153                 if (LOG.isDebugEnabled())
154                     LOG.debug(x);
155                 close(x);
156                 return false;
157             }
158         }
159     }
160 
161     private boolean parse(ByteBuffer buffer)
162     {
163         return parser.parse(buffer);
164     }
165 
166     private void shutdown()
167     {
168         // Close explicitly only if we are idle, since the request may still
169         // be in progress, otherwise close only if we can fail the responses.
170         if (channels.isEmpty())
171             close();
172         else
173             failAndClose(new EOFException());
174     }
175 
176     @Override
177     protected boolean onReadTimeout()
178     {
179         close(new TimeoutException());
180         return false;
181     }
182 
183     protected void release(HttpChannelOverFCGI channel)
184     {
185         channels.remove(channel.getRequest());
186         destination.release(this);
187     }
188 
189     @Override
190     public void close()
191     {
192         close(new AsynchronousCloseException());
193     }
194 
195     protected void close(Throwable failure)
196     {
197         if (closed.compareAndSet(false, true))
198         {
199             // First close then abort, to be sure that the connection cannot be reused
200             // from an onFailure() handler or by blocking code waiting for completion.
201             getHttpDestination().close(this);
202             getEndPoint().shutdownOutput();
203             if (LOG.isDebugEnabled())
204                 LOG.debug("{} oshut", this);
205             getEndPoint().close();
206             if (LOG.isDebugEnabled())
207                 LOG.debug("{} closed", this);
208 
209             abort(failure);
210         }
211     }
212 
213     protected boolean closeByHTTP(HttpFields fields)
214     {
215         if (multiplexed)
216             return false;
217         if (!fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()))
218             return false;
219         close();
220         return true;
221     }
222 
223     protected void abort(Throwable failure)
224     {
225         for (HttpChannelOverFCGI channel : channels.values())
226         {
227             HttpExchange exchange = channel.getHttpExchange();
228             if (exchange != null)
229                 exchange.getRequest().abort(failure);
230         }
231         channels.clear();
232     }
233 
234     private void failAndClose(Throwable failure)
235     {
236         boolean result = false;
237         for (HttpChannelOverFCGI channel : channels.values())
238             result |= channel.responseFailure(failure);
239         if (result)
240             close(failure);
241     }
242 
243     private int acquireRequest()
244     {
245         synchronized (requests)
246         {
247             int last = requests.getLast();
248             int request = last + 1;
249             requests.addLast(request);
250             return request;
251         }
252     }
253 
254     private void releaseRequest(int request)
255     {
256         synchronized (requests)
257         {
258             requests.removeFirstOccurrence(request);
259         }
260     }
261 
262     @Override
263     public String toString()
264     {
265         return String.format("%s@%h(l:%s <-> r:%s)",
266                 getClass().getSimpleName(),
267                 this,
268                 getEndPoint().getLocalAddress(),
269                 getEndPoint().getRemoteAddress());
270     }
271 
272     private class Delegate extends HttpConnection
273     {
274         private Delegate(HttpDestination destination)
275         {
276             super(destination);
277         }
278 
279         @Override
280         protected void send(HttpExchange exchange)
281         {
282             Request request = exchange.getRequest();
283             normalizeRequest(request);
284 
285             // FCGI may be multiplexed, so create one channel for each request.
286             int id = acquireRequest();
287             HttpChannelOverFCGI channel = new HttpChannelOverFCGI(HttpConnectionOverFCGI.this, flusher, id, request.getIdleTimeout());
288             channels.put(id, channel);
289             channel.associate(exchange);
290             channel.send();
291         }
292 
293         @Override
294         public void close()
295         {
296             HttpConnectionOverFCGI.this.close();
297         }
298 
299         @Override
300         public String toString()
301         {
302             return HttpConnectionOverFCGI.this.toString();
303         }
304     }
305 
306     private class ResponseListener implements ClientParser.Listener
307     {
308         @Override
309         public void onBegin(int request, int code, String reason)
310         {
311             HttpChannelOverFCGI channel = channels.get(request);
312             if (channel != null)
313                 channel.responseBegin(code, reason);
314             else
315                 noChannel(request);
316         }
317 
318         @Override
319         public void onHeader(int request, HttpField field)
320         {
321             HttpChannelOverFCGI channel = channels.get(request);
322             if (channel != null)
323                 channel.responseHeader(field);
324             else
325                 noChannel(request);
326         }
327 
328         @Override
329         public void onHeaders(int request)
330         {
331             HttpChannelOverFCGI channel = channels.get(request);
332             if (channel != null)
333                 channel.responseHeaders();
334             else
335                 noChannel(request);
336         }
337 
338         @Override
339         public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
340         {
341             switch (stream)
342             {
343                 case STD_OUT:
344                 {
345                     HttpChannelOverFCGI channel = channels.get(request);
346                     if (channel != null)
347                     {
348                         CompletableCallback callback = new CompletableCallback()
349                         {
350                             @Override
351                             public void resume()
352                             {
353                                 if (LOG.isDebugEnabled())
354                                     LOG.debug("Content consumed asynchronously, resuming processing");
355                                 process();
356                             }
357 
358                             @Override
359                             public void abort(Throwable x)
360                             {
361                                 close(x);
362                             }
363                         };
364                         if (!channel.content(buffer, callback))
365                             return true;
366                         return callback.tryComplete();
367                     }
368                     else
369                     {
370                         noChannel(request);
371                     }
372                     break;
373                 }
374                 case STD_ERR:
375                 {
376                     LOG.info(BufferUtil.toUTF8String(buffer));
377                     break;
378                 }
379                 default:
380                 {
381                     throw new IllegalArgumentException();
382                 }
383             }
384             return false;
385         }
386 
387         @Override
388         public void onEnd(int request)
389         {
390             HttpChannelOverFCGI channel = channels.get(request);
391             if (channel != null)
392             {
393                 if (channel.responseSuccess())
394                     releaseRequest(request);
395             }
396             else
397             {
398                 noChannel(request);
399             }
400         }
401 
402         @Override
403         public void onFailure(int request, Throwable failure)
404         {
405             HttpChannelOverFCGI channel = channels.get(request);
406             if (channel != null)
407             {
408                 if (channel.responseFailure(failure))
409                     releaseRequest(request);
410             }
411             else
412             {
413                 noChannel(request);
414             }
415         }
416 
417         private void noChannel(int request)
418         {
419             if (LOG.isDebugEnabled())
420                 LOG.debug("Channel not found for request {}", request);
421         }
422     }
423 }