View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.fcgi.client.http;
20  
21  import java.io.EOFException;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.AsynchronousCloseException;
24  import java.util.LinkedList;
25  import java.util.Map;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.TimeoutException;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  
30  import org.eclipse.jetty.client.HttpClient;
31  import org.eclipse.jetty.client.HttpConnection;
32  import org.eclipse.jetty.client.HttpDestination;
33  import org.eclipse.jetty.client.HttpExchange;
34  import org.eclipse.jetty.client.api.Connection;
35  import org.eclipse.jetty.client.api.Request;
36  import org.eclipse.jetty.client.api.Response;
37  import org.eclipse.jetty.fcgi.FCGI;
38  import org.eclipse.jetty.fcgi.generator.Flusher;
39  import org.eclipse.jetty.fcgi.parser.ClientParser;
40  import org.eclipse.jetty.http.HttpField;
41  import org.eclipse.jetty.http.HttpFields;
42  import org.eclipse.jetty.http.HttpHeader;
43  import org.eclipse.jetty.http.HttpHeaderValue;
44  import org.eclipse.jetty.io.AbstractConnection;
45  import org.eclipse.jetty.io.ByteBufferPool;
46  import org.eclipse.jetty.io.EndPoint;
47  import org.eclipse.jetty.util.BufferUtil;
48  import org.eclipse.jetty.util.log.Log;
49  import org.eclipse.jetty.util.log.Logger;
50  
51  public class HttpConnectionOverFCGI extends AbstractConnection implements Connection
52  {
53      private static final Logger LOG = Log.getLogger(HttpConnectionOverFCGI.class);
54  
55      private final LinkedList<Integer> requests = new LinkedList<>();
56      private final Map<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>();
57      private final AtomicBoolean closed = new AtomicBoolean();
58      private final Flusher flusher;
59      private final HttpDestination destination;
60      private final boolean multiplexed;
61      private final Delegate delegate;
62      private final ClientParser parser;
63  
64      public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, boolean multiplexed)
65      {
66          super(endPoint, destination.getHttpClient().getExecutor(), destination.getHttpClient().isDispatchIO());
67          this.destination = destination;
68          this.multiplexed = multiplexed;
69          this.flusher = new Flusher(endPoint);
70          this.delegate = new Delegate(destination);
71          this.parser = new ClientParser(new ResponseListener());
72          requests.addLast(0);
73      }
74  
75      public HttpDestination getHttpDestination()
76      {
77          return destination;
78      }
79  
80      @Override
81      public void send(Request request, Response.CompleteListener listener)
82      {
83          delegate.send(request, listener);
84      }
85  
86      protected void send(HttpExchange exchange)
87      {
88          delegate.send(exchange);
89      }
90  
91      @Override
92      public void onOpen()
93      {
94          super.onOpen();
95          fillInterested();
96      }
97  
98      @Override
99      public void onFillable()
100     {
101         EndPoint endPoint = getEndPoint();
102         HttpClient client = destination.getHttpClient();
103         ByteBufferPool bufferPool = client.getByteBufferPool();
104         ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
105         try
106         {
107             while (true)
108             {
109                 int read = endPoint.fill(buffer);
110                 if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'.
111                     LOG.debug("Read {} bytes from {}", read, endPoint);
112                 if (read > 0)
113                 {
114                     parse(buffer);
115                 }
116                 else if (read == 0)
117                 {
118                     fillInterested();
119                     break;
120                 }
121                 else
122                 {
123                     shutdown();
124                     break;
125                 }
126             }
127         }
128         catch (Exception x)
129         {
130             LOG.debug(x);
131             close(x);
132         }
133         finally
134         {
135             bufferPool.release(buffer);
136         }
137     }
138 
139     private void parse(ByteBuffer buffer)
140     {
141         while (buffer.hasRemaining())
142             parser.parse(buffer);
143     }
144 
145     private void shutdown()
146     {
147         // Close explicitly only if we are idle, since the request may still
148         // be in progress, otherwise close only if we can fail the responses.
149         if (channels.isEmpty())
150             close();
151         else
152             failAndClose(new EOFException());
153     }
154 
155     @Override
156     protected boolean onReadTimeout()
157     {
158         close(new TimeoutException());
159         return false;
160     }
161 
162     protected void release(HttpChannelOverFCGI channel)
163     {
164         channels.remove(channel.getRequest());
165         destination.release(this);
166     }
167 
168     @Override
169     public void close()
170     {
171         close(new AsynchronousCloseException());
172     }
173 
174     protected void close(Throwable failure)
175     {
176         if (closed.compareAndSet(false, true))
177         {
178             // First close then abort, to be sure that the connection cannot be reused
179             // from an onFailure() handler or by blocking code waiting for completion.
180             getHttpDestination().close(this);
181             getEndPoint().shutdownOutput();
182             LOG.debug("{} oshut", this);
183             getEndPoint().close();
184             LOG.debug("{} closed", this);
185 
186             abort(failure);
187         }
188     }
189 
190     protected boolean closeByHTTP(HttpFields fields)
191     {
192         if (multiplexed)
193             return false;
194         if (!fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()))
195             return false;
196         close();
197         return true;
198     }
199 
200     protected void abort(Throwable failure)
201     {
202         for (HttpChannelOverFCGI channel : channels.values())
203         {
204             HttpExchange exchange = channel.getHttpExchange();
205             if (exchange != null)
206                 exchange.getRequest().abort(failure);
207         }
208         channels.clear();
209     }
210 
211     private void failAndClose(Throwable failure)
212     {
213         boolean result = false;
214         for (HttpChannelOverFCGI channel : channels.values())
215             result |= channel.responseFailure(failure);
216         if (result)
217             close(failure);
218     }
219 
220     private int acquireRequest()
221     {
222         synchronized (requests)
223         {
224             int last = requests.getLast();
225             int request = last + 1;
226             requests.addLast(request);
227             return request;
228         }
229     }
230 
231     private void releaseRequest(int request)
232     {
233         synchronized (requests)
234         {
235             requests.removeFirstOccurrence(request);
236         }
237     }
238 
239     @Override
240     public String toString()
241     {
242         return String.format("%s@%h(l:%s <-> r:%s)",
243                 getClass().getSimpleName(),
244                 this,
245                 getEndPoint().getLocalAddress(),
246                 getEndPoint().getRemoteAddress());
247     }
248 
249     private class Delegate extends HttpConnection
250     {
251         private Delegate(HttpDestination destination)
252         {
253             super(destination);
254         }
255 
256         @Override
257         protected void send(HttpExchange exchange)
258         {
259             Request request = exchange.getRequest();
260             normalizeRequest(request);
261 
262             // FCGI may be multiplexed, so create one channel for each request.
263             int id = acquireRequest();
264             HttpChannelOverFCGI channel = new HttpChannelOverFCGI(HttpConnectionOverFCGI.this, flusher, id, request.getIdleTimeout());
265             channels.put(id, channel);
266             channel.associate(exchange);
267             channel.send();
268         }
269 
270         @Override
271         public void close()
272         {
273             HttpConnectionOverFCGI.this.close();
274         }
275 
276         @Override
277         public String toString()
278         {
279             return HttpConnectionOverFCGI.this.toString();
280         }
281     }
282 
283     private class ResponseListener implements ClientParser.Listener
284     {
285         @Override
286         public void onBegin(int request, int code, String reason)
287         {
288             HttpChannelOverFCGI channel = channels.get(request);
289             if (channel != null)
290                 channel.responseBegin(code, reason);
291             else
292                 noChannel(request);
293         }
294 
295         @Override
296         public void onHeader(int request, HttpField field)
297         {
298             HttpChannelOverFCGI channel = channels.get(request);
299             if (channel != null)
300                 channel.responseHeader(field);
301             else
302                 noChannel(request);
303         }
304 
305         @Override
306         public void onHeaders(int request)
307         {
308             HttpChannelOverFCGI channel = channels.get(request);
309             if (channel != null)
310                 channel.responseHeaders();
311             else
312                 noChannel(request);
313         }
314 
315         @Override
316         public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
317         {
318             switch (stream)
319             {
320                 case STD_OUT:
321                 {
322                     HttpChannelOverFCGI channel = channels.get(request);
323                     if (channel != null)
324                         channel.content(buffer);
325                     else
326                         noChannel(request);
327                     break;
328                 }
329                 case STD_ERR:
330                 {
331                     LOG.info(BufferUtil.toUTF8String(buffer));
332                     break;
333                 }
334                 default:
335                 {
336                     throw new IllegalArgumentException();
337                 }
338             }
339         }
340 
341         @Override
342         public void onEnd(int request)
343         {
344             HttpChannelOverFCGI channel = channels.get(request);
345             if (channel != null)
346             {
347                 if (channel.responseSuccess())
348                     releaseRequest(request);
349             }
350             else
351             {
352                 noChannel(request);
353             }
354         }
355 
356         @Override
357         public void onFailure(int request, Throwable failure)
358         {
359             HttpChannelOverFCGI channel = channels.get(request);
360             if (channel != null)
361             {
362                 if (channel.responseFailure(failure))
363                     releaseRequest(request);
364             }
365             else
366             {
367                 noChannel(request);
368             }
369         }
370 
371         private void noChannel(int request)
372         {
373             // TODO: what here ?
374         }
375     }
376 }