View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.util.Collections;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  
26  import org.eclipse.jetty.client.security.Authentication;
27  import org.eclipse.jetty.http.HttpFields;
28  import org.eclipse.jetty.http.HttpGenerator;
29  import org.eclipse.jetty.http.HttpHeaderValues;
30  import org.eclipse.jetty.http.HttpHeaders;
31  import org.eclipse.jetty.http.HttpMethods;
32  import org.eclipse.jetty.http.HttpParser;
33  import org.eclipse.jetty.http.HttpSchemes;
34  import org.eclipse.jetty.http.HttpStatus;
35  import org.eclipse.jetty.http.HttpVersions;
36  import org.eclipse.jetty.io.AbstractConnection;
37  import org.eclipse.jetty.io.Buffer;
38  import org.eclipse.jetty.io.Buffers;
39  import org.eclipse.jetty.io.Connection;
40  import org.eclipse.jetty.io.EndPoint;
41  import org.eclipse.jetty.io.EofException;
42  import org.eclipse.jetty.io.View;
43  import org.eclipse.jetty.util.component.AggregateLifeCycle;
44  import org.eclipse.jetty.util.component.Dumpable;
45  import org.eclipse.jetty.util.log.Log;
46  import org.eclipse.jetty.util.log.Logger;
47  import org.eclipse.jetty.util.thread.Timeout;
48  
49  /**
50   *
51   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
52   */
53  public abstract class AbstractHttpConnection extends AbstractConnection implements Dumpable
54  {
55      private static final Logger LOG = Log.getLogger(AbstractHttpConnection.class);
56  
57      protected HttpDestination _destination;
58      protected HttpGenerator _generator;
59      protected HttpParser _parser;
60      protected boolean _http11 = true;
61      protected int _status;
62      protected Buffer _connectionHeader;
63      protected boolean _reserved;
64  
65      // The current exchange waiting for a response
66      protected volatile HttpExchange _exchange;
67      protected HttpExchange _pipeline;
68      private final Timeout.Task _idleTimeout = new ConnectionIdleTask();
69      private AtomicBoolean _idle = new AtomicBoolean(false);
70  
71  
72      AbstractHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
73      {
74          super(endp);
75  
76          _generator = new HttpGenerator(requestBuffers,endp);
77          _parser = new HttpParser(responseBuffers,endp,new Handler());
78      }
79  
80      public void setReserved (boolean reserved)
81      {
82          _reserved = reserved;
83      }
84  
85      public boolean isReserved()
86      {
87          return _reserved;
88      }
89  
90      public HttpDestination getDestination()
91      {
92          return _destination;
93      }
94  
95      public void setDestination(HttpDestination destination)
96      {
97          _destination = destination;
98      }
99  
100     public boolean send(HttpExchange ex) throws IOException
101     {
102         LOG.debug("Send {} on {}",ex,this);
103         synchronized (this)
104         {
105             if (_exchange != null)
106             {
107                 if (_pipeline != null)
108                     throw new IllegalStateException(this + " PIPELINED!!!  _exchange=" + _exchange);
109                 _pipeline = ex;
110                 return true;
111             }
112 
113             _exchange = ex;
114             _exchange.associate(this);
115 
116             // The call to associate() may have closed the connection, check if it's the case
117             if (!_endp.isOpen())
118             {
119                 _exchange.disassociate();
120                 _exchange = null;
121                 return false;
122             }
123 
124             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
125 
126             adjustIdleTimeout();
127 
128             return true;
129         }
130     }
131 
132     private void adjustIdleTimeout() throws IOException
133     {
134         // Adjusts the idle timeout in case the default or exchange timeout
135         // are greater. This is needed for long polls, where one wants an
136         // aggressive releasing of idle connections (so idle timeout is small)
137         // but still allow long polls to complete normally
138 
139         long timeout = _exchange.getTimeout();
140         if (timeout <= 0)
141             timeout = _destination.getHttpClient().getTimeout();
142 
143         long endPointTimeout = _endp.getMaxIdleTime();
144 
145         if (timeout > 0 && timeout > endPointTimeout)
146         {
147             // Make it larger than the exchange timeout so that there are
148             // no races between the idle timeout and the exchange timeout
149             // when trying to close the endpoint
150             _endp.setMaxIdleTime(2 * (int)timeout);
151         }
152     }
153 
154     public abstract Connection handle() throws IOException;
155 
156 
157     public boolean isIdle()
158     {
159         synchronized (this)
160         {
161             return _exchange == null;
162         }
163     }
164 
165     public boolean isSuspended()
166     {
167         return false;
168     }
169 
170     public void onClose()
171     {
172     }
173 
174     /**
175      * @throws IOException
176      */
177     protected void commitRequest() throws IOException
178     {
179         synchronized (this)
180         {
181             _status=0;
182             if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
183                 throw new IllegalStateException();
184 
185             _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
186             _generator.setVersion(_exchange.getVersion());
187 
188             String method=_exchange.getMethod();
189             String uri = _exchange.getRequestURI();
190             if (_destination.isProxied())
191             {
192                 if (!HttpMethods.CONNECT.equals(method) && uri.startsWith("/"))
193                 {
194                     boolean secure = _destination.isSecure();
195                     String host = _destination.getAddress().getHost();
196                     int port = _destination.getAddress().getPort();
197                     StringBuilder absoluteURI = new StringBuilder();
198                     absoluteURI.append(secure ? HttpSchemes.HTTPS : HttpSchemes.HTTP);
199                     absoluteURI.append("://");
200                     absoluteURI.append(host);
201                     // Avoid adding default ports
202                     if (!(secure && port == 443 || !secure && port == 80))
203                         absoluteURI.append(":").append(port);
204                     absoluteURI.append(uri);
205                     uri = absoluteURI.toString();
206                 }
207                 Authentication auth = _destination.getProxyAuthentication();
208                 if (auth != null)
209                     auth.setCredentials(_exchange);
210             }
211 
212             _generator.setRequest(method, uri);
213             _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
214 
215             HttpFields requestHeaders = _exchange.getRequestFields();
216             if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
217             {
218                 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
219                     requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
220             }
221 
222             Buffer requestContent = _exchange.getRequestContent();
223             if (requestContent != null)
224             {
225                 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
226                 _generator.completeHeader(requestHeaders,false);
227                 _generator.addContent(new View(requestContent),true);
228                 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
229             }
230             else
231             {
232                 InputStream requestContentStream = _exchange.getRequestContentSource();
233                 if (requestContentStream != null)
234                 {
235                     _generator.completeHeader(requestHeaders, false);
236                 }
237                 else
238                 {
239                     requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
240                     _generator.completeHeader(requestHeaders, true);
241                     _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
242                 }
243             }
244         }
245     }
246 
247     protected void reset() throws IOException
248     {
249         _connectionHeader = null;
250         _parser.reset();
251         _generator.reset();
252         _http11 = true;
253     }
254 
255 
256     private class Handler extends HttpParser.EventHandler
257     {
258         @Override
259         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
260         {
261             // System.out.println( method.toString() + "///" + url.toString() +
262             // "///" + version.toString() );
263             // TODO validate this is acceptable, the <!DOCTYPE goop was coming
264             // out here
265             // throw new IllegalStateException();
266         }
267 
268         @Override
269         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
270         {
271             HttpExchange exchange = _exchange;
272             if (exchange==null)
273             {
274                 LOG.warn("No exchange for response");
275                 _endp.close();
276                 return;
277             }
278 
279             switch(status)
280             {
281                 case HttpStatus.CONTINUE_100:
282                 case HttpStatus.PROCESSING_102:
283                     // TODO check if appropriate expect was sent in the request.
284                     exchange.setEventListener(new NonFinalResponseListener(exchange));
285                     break;
286 
287                 case HttpStatus.OK_200:
288                     // handle special case for CONNECT 200 responses
289                     if (HttpMethods.CONNECT.equalsIgnoreCase(exchange.getMethod()))
290                         _parser.setHeadResponse(true);
291                     break;
292             }
293 
294             _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
295             _status=status;
296             exchange.getEventListener().onResponseStatus(version,status,reason);
297             exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
298 
299         }
300 
301         @Override
302         public void parsedHeader(Buffer name, Buffer value) throws IOException
303         {
304             HttpExchange exchange = _exchange;
305             if (exchange!=null)
306             {
307                 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
308                 {
309                     _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
310                 }
311                 exchange.getEventListener().onResponseHeader(name,value);
312             }
313         }
314 
315         @Override
316         public void headerComplete() throws IOException
317         {
318             HttpExchange exchange = _exchange;
319             if (exchange!=null)
320             {
321                 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
322                 if (HttpMethods.CONNECT.equalsIgnoreCase(exchange.getMethod()))
323                     _parser.setPersistent(true);
324             }
325         }
326 
327         @Override
328         public void content(Buffer ref) throws IOException
329         {
330             HttpExchange exchange = _exchange;
331             if (exchange!=null)
332                 exchange.getEventListener().onResponseContent(ref);
333         }
334 
335         @Override
336         public void messageComplete(long contextLength) throws IOException
337         {
338             HttpExchange exchange = _exchange;
339             if (exchange!=null)
340                 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
341         }
342 
343         @Override
344         public void earlyEOF()
345         {
346             HttpExchange exchange = _exchange;
347             if (exchange!=null)
348             {
349                 if (!exchange.isDone())
350                 {
351                     if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
352                         exchange.getEventListener().onException(new EofException("early EOF"));
353                 }
354             }
355         }
356     }
357 
358     @Override
359     public String toString()
360     {
361         return String.format("%s %s g=%s p=%s",
362                 super.toString(),
363                 _destination == null ? "?.?.?.?:??" : _destination.getAddress(),
364                 _generator,
365                 _parser);
366     }
367 
368     public String toDetailString()
369     {
370         return toString() + " ex=" + _exchange + " idle for " + _idleTimeout.getAge();
371     }
372 
373     public void close() throws IOException
374     {
375         //if there is a live, unfinished exchange, set its status to be
376         //excepted and wake up anyone waiting on waitForDone()
377 
378         HttpExchange exchange = _exchange;
379         if (exchange != null && !exchange.isDone())
380         {
381             switch (exchange.getStatus())
382             {
383                 case HttpExchange.STATUS_CANCELLED:
384                 case HttpExchange.STATUS_CANCELLING:
385                 case HttpExchange.STATUS_COMPLETED:
386                 case HttpExchange.STATUS_EXCEPTED:
387                 case HttpExchange.STATUS_EXPIRED:
388                     break;
389                 case HttpExchange.STATUS_PARSING_CONTENT:
390                     if (_endp.isInputShutdown() && _parser.isState(HttpParser.STATE_EOF_CONTENT))
391                         break;
392                 default:
393                     String exch= exchange.toString();
394                     String reason = _endp.isOpen()?(_endp.isInputShutdown()?"half closed: ":"local close: "):"closed: ";
395                     if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
396                         exchange.getEventListener().onException(new EofException(reason+exch));
397             }
398         }
399 
400         if (_endp.isOpen())
401         {
402             _endp.close();
403             _destination.returnConnection(this, true);
404         }
405     }
406 
407     public void setIdleTimeout()
408     {
409         synchronized (this)
410         {
411             if (_idle.compareAndSet(false, true))
412                 _destination.getHttpClient().scheduleIdle(_idleTimeout);
413             else
414                 throw new IllegalStateException();
415         }
416     }
417 
418     public boolean cancelIdleTimeout()
419     {
420         synchronized (this)
421         {
422             if (_idle.compareAndSet(true, false))
423             {
424                 _destination.getHttpClient().cancel(_idleTimeout);
425                 return true;
426             }
427         }
428 
429         return false;
430     }
431 
432     protected void exchangeExpired(HttpExchange exchange)
433     {
434         synchronized (this)
435         {
436             // We are expiring an exchange, but the exchange is pending
437             // Cannot reuse the connection because the reply may arrive, so close it
438             if (_exchange == exchange)
439             {
440                 try
441                 {
442                     _destination.returnConnection(this, true);
443                 }
444                 catch (IOException x)
445                 {
446                     LOG.ignore(x);
447                 }
448             }
449         }
450     }
451 
452     /* ------------------------------------------------------------ */
453     /**
454      * @see org.eclipse.jetty.util.component.Dumpable#dump()
455      */
456     public String dump()
457     {
458         return AggregateLifeCycle.dump(this);
459     }
460 
461     /* ------------------------------------------------------------ */
462     /**
463      * @see org.eclipse.jetty.util.component.Dumpable#dump(java.lang.Appendable, java.lang.String)
464      */
465     public void dump(Appendable out, String indent) throws IOException
466     {
467         synchronized (this)
468         {
469             out.append(String.valueOf(this)).append("\n");
470             AggregateLifeCycle.dump(out,indent,Collections.singletonList(_endp));
471         }
472     }
473 
474     /* ------------------------------------------------------------ */
475     private class ConnectionIdleTask extends Timeout.Task
476     {
477         /* ------------------------------------------------------------ */
478         @Override
479         public void expired()
480         {
481             // Connection idle, close it
482             if (_idle.compareAndSet(true, false))
483             {
484                 _destination.returnIdleConnection(AbstractHttpConnection.this);
485             }
486         }
487     }
488 
489 
490     /* ------------------------------------------------------------ */
491     private class NonFinalResponseListener implements HttpEventListener
492     {
493         final HttpExchange _exchange;
494         final HttpEventListener _next;
495 
496         /* ------------------------------------------------------------ */
497         public NonFinalResponseListener(HttpExchange exchange)
498         {
499             _exchange=exchange;
500             _next=exchange.getEventListener();
501         }
502 
503         /* ------------------------------------------------------------ */
504         public void onRequestCommitted() throws IOException
505         {
506         }
507 
508         /* ------------------------------------------------------------ */
509         public void onRequestComplete() throws IOException
510         {
511         }
512 
513         /* ------------------------------------------------------------ */
514         public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
515         {
516         }
517 
518         /* ------------------------------------------------------------ */
519         public void onResponseHeader(Buffer name, Buffer value) throws IOException
520         {
521             _next.onResponseHeader(name,value);
522         }
523 
524         /* ------------------------------------------------------------ */
525         public void onResponseHeaderComplete() throws IOException
526         {
527             _next.onResponseHeaderComplete();
528         }
529 
530         /* ------------------------------------------------------------ */
531         public void onResponseContent(Buffer content) throws IOException
532         {
533         }
534 
535         /* ------------------------------------------------------------ */
536         public void onResponseComplete() throws IOException
537         {
538             _exchange.setEventListener(_next);
539             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
540             _parser.reset();
541         }
542 
543         /* ------------------------------------------------------------ */
544         public void onConnectionFailed(Throwable ex)
545         {
546             _exchange.setEventListener(_next);
547             _next.onConnectionFailed(ex);
548         }
549 
550         /* ------------------------------------------------------------ */
551         public void onException(Throwable ex)
552         {
553             _exchange.setEventListener(_next);
554             _next.onException(ex);
555         }
556 
557         /* ------------------------------------------------------------ */
558         public void onExpire()
559         {
560             _exchange.setEventListener(_next);
561             _next.onExpire();
562         }
563 
564         /* ------------------------------------------------------------ */
565         public void onRetry()
566         {
567             _exchange.setEventListener(_next);
568             _next.onRetry();
569         }
570     }
571 }