View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.util.Collections;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  
26  import org.eclipse.jetty.client.security.Authentication;
27  import org.eclipse.jetty.http.HttpFields;
28  import org.eclipse.jetty.http.HttpGenerator;
29  import org.eclipse.jetty.http.HttpHeaderValues;
30  import org.eclipse.jetty.http.HttpHeaders;
31  import org.eclipse.jetty.http.HttpMethods;
32  import org.eclipse.jetty.http.HttpParser;
33  import org.eclipse.jetty.http.HttpSchemes;
34  import org.eclipse.jetty.http.HttpStatus;
35  import org.eclipse.jetty.http.HttpVersions;
36  import org.eclipse.jetty.io.AbstractConnection;
37  import org.eclipse.jetty.io.Buffer;
38  import org.eclipse.jetty.io.Buffers;
39  import org.eclipse.jetty.io.ByteArrayBuffer;
40  import org.eclipse.jetty.io.Connection;
41  import org.eclipse.jetty.io.EndPoint;
42  import org.eclipse.jetty.io.EofException;
43  import org.eclipse.jetty.io.View;
44  import org.eclipse.jetty.util.component.AggregateLifeCycle;
45  import org.eclipse.jetty.util.component.Dumpable;
46  import org.eclipse.jetty.util.log.Log;
47  import org.eclipse.jetty.util.log.Logger;
48  import org.eclipse.jetty.util.thread.Timeout;
49  
50  /**
51   *
52   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
53   */
54  public abstract class AbstractHttpConnection extends AbstractConnection implements Dumpable
55  {
56      private static final Logger LOG = Log.getLogger(AbstractHttpConnection.class);
57  
58      protected HttpDestination _destination;
59      protected HttpGenerator _generator;
60      protected HttpParser _parser;
61      protected boolean _http11 = true;
62      protected int _status;
63      protected Buffer _connectionHeader;
64      protected boolean _reserved;
65  
66      // The current exchange waiting for a response
67      protected volatile HttpExchange _exchange;
68      protected HttpExchange _pipeline;
69      private final Timeout.Task _idleTimeout = new ConnectionIdleTask();
70      private AtomicBoolean _idle = new AtomicBoolean(false);
71  
72  
73      AbstractHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
74      {
75          super(endp);
76  
77          _generator = new HttpGenerator(requestBuffers,endp);
78          _parser = new HttpParser(responseBuffers,endp,new Handler());
79      }
80  
81      public void setReserved (boolean reserved)
82      {
83          _reserved = reserved;
84      }
85  
86      public boolean isReserved()
87      {
88          return _reserved;
89      }
90  
91      public HttpDestination getDestination()
92      {
93          return _destination;
94      }
95  
96      public void setDestination(HttpDestination destination)
97      {
98          _destination = destination;
99      }
100 
101     public boolean send(HttpExchange ex) throws IOException
102     {
103         LOG.debug("Send {} on {}",ex,this);
104         synchronized (this)
105         {
106             if (_exchange != null)
107             {
108                 if (_pipeline != null)
109                     throw new IllegalStateException(this + " PIPELINED!!!  _exchange=" + _exchange);
110                 _pipeline = ex;
111                 return true;
112             }
113 
114             _exchange = ex;
115             _exchange.associate(this);
116 
117             // The call to associate() may have closed the connection, check if it's the case
118             if (!_endp.isOpen())
119             {
120                 _exchange.disassociate();
121                 _exchange = null;
122                 return false;
123             }
124 
125             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
126 
127             adjustIdleTimeout();
128 
129             return true;
130         }
131     }
132 
133     private void adjustIdleTimeout() throws IOException
134     {
135         // Adjusts the idle timeout in case the default or exchange timeout
136         // are greater. This is needed for long polls, where one wants an
137         // aggressive releasing of idle connections (so idle timeout is small)
138         // but still allow long polls to complete normally
139 
140         long timeout = _exchange.getTimeout();
141         if (timeout <= 0)
142             timeout = _destination.getHttpClient().getTimeout();
143 
144         long endPointTimeout = _endp.getMaxIdleTime();
145 
146         if (timeout > 0 && timeout > endPointTimeout)
147         {
148             // Make it larger than the exchange timeout so that there are
149             // no races between the idle timeout and the exchange timeout
150             // when trying to close the endpoint
151             _endp.setMaxIdleTime(2 * (int)timeout);
152         }
153     }
154 
155     public abstract Connection handle() throws IOException;
156 
157 
158     public boolean isIdle()
159     {
160         synchronized (this)
161         {
162             return _exchange == null;
163         }
164     }
165 
166     public boolean isSuspended()
167     {
168         return false;
169     }
170 
171     public void onClose()
172     {
173     }
174 
175     /**
176      * @throws IOException
177      */
178     protected void commitRequest() throws IOException
179     {
180         synchronized (this)
181         {
182             _status=0;
183             if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
184                 throw new IllegalStateException();
185 
186             _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
187             _generator.setVersion(_exchange.getVersion());
188 
189             String method=_exchange.getMethod();
190             String uri = _exchange.getRequestURI();
191             if (_destination.isProxied())
192             {
193                 if (!HttpMethods.CONNECT.equals(method) && uri.startsWith("/"))
194                 {
195                     boolean secure = _destination.isSecure();
196                     String host = _destination.getAddress().getHost();
197                     int port = _destination.getAddress().getPort();
198                     StringBuilder absoluteURI = new StringBuilder();
199                     absoluteURI.append(secure ? HttpSchemes.HTTPS : HttpSchemes.HTTP);
200                     absoluteURI.append("://");
201                     absoluteURI.append(host);
202                     // Avoid adding default ports
203                     if (!(secure && port == 443 || !secure && port == 80))
204                         absoluteURI.append(":").append(port);
205                     absoluteURI.append(uri);
206                     uri = absoluteURI.toString();
207                 }
208                 Authentication auth = _destination.getProxyAuthentication();
209                 if (auth != null)
210                     auth.setCredentials(_exchange);
211             }
212 
213             _generator.setRequest(method, uri);
214             _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
215 
216             HttpFields requestHeaders = _exchange.getRequestFields();
217             if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
218             {
219                 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
220                     requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
221             }
222 
223             Buffer requestContent = _exchange.getRequestContent();
224             if (requestContent != null)
225             {
226                 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
227                 _generator.completeHeader(requestHeaders,false);
228                 _generator.addContent(new View(requestContent),true);
229                 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
230             }
231             else
232             {
233                 InputStream requestContentStream = _exchange.getRequestContentSource();
234                 if (requestContentStream != null)
235                 {
236                     _generator.completeHeader(requestHeaders, false);
237                 }
238                 else
239                 {
240                     requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
241                     _generator.completeHeader(requestHeaders, true);
242                     _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
243                 }
244             }
245         }
246     }
247 
248     protected void reset() throws IOException
249     {
250         _connectionHeader = null;
251         _parser.reset();
252         _generator.reset();
253         _http11 = true;
254     }
255 
256 
257     private class Handler extends HttpParser.EventHandler
258     {
259         @Override
260         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
261         {
262             // System.out.println( method.toString() + "///" + url.toString() +
263             // "///" + version.toString() );
264             // TODO validate this is acceptable, the <!DOCTYPE goop was coming
265             // out here
266             // throw new IllegalStateException();
267         }
268 
269         @Override
270         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
271         {
272             HttpExchange exchange = _exchange;
273             if (exchange==null)
274             {
275                 LOG.warn("No exchange for response");
276                 _endp.close();
277                 return;
278             }
279 
280             switch(status)
281             {
282                 case HttpStatus.CONTINUE_100:
283                 case HttpStatus.PROCESSING_102:
284                     // TODO check if appropriate expect was sent in the request.
285                     exchange.setEventListener(new NonFinalResponseListener(exchange));
286                     break;
287 
288                 case HttpStatus.OK_200:
289                     // handle special case for CONNECT 200 responses
290                     if (HttpMethods.CONNECT.equalsIgnoreCase(exchange.getMethod()))
291                         _parser.setHeadResponse(true);
292                     break;
293             }
294 
295             _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
296             _status=status;
297             exchange.getEventListener().onResponseStatus(version,status,reason);
298             exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
299 
300         }
301 
302         @Override
303         public void parsedHeader(Buffer name, Buffer value) throws IOException
304         {
305             HttpExchange exchange = _exchange;
306             if (exchange!=null)
307             {
308                 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
309                 {
310                     _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
311                 }
312                 exchange.getEventListener().onResponseHeader(name,value);
313             }
314         }
315 
316         @Override
317         public void headerComplete() throws IOException
318         {
319             HttpExchange exchange = _exchange;
320             if (exchange!=null)
321                 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
322         }
323 
324         @Override
325         public void content(Buffer ref) throws IOException
326         {
327             HttpExchange exchange = _exchange;
328             if (exchange!=null)
329                 exchange.getEventListener().onResponseContent(ref);
330         }
331 
332         @Override
333         public void messageComplete(long contextLength) throws IOException
334         {
335             HttpExchange exchange = _exchange;
336             if (exchange!=null)
337                 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
338         }
339 
340         @Override
341         public void earlyEOF()
342         {
343             HttpExchange exchange = _exchange;
344             if (exchange!=null)
345             {
346                 if (!exchange.isDone())
347                 {
348                     if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
349                         exchange.getEventListener().onException(new EofException("early EOF"));
350                 }
351             }
352         }
353 
354 
355     }
356 
357     @Override
358     public String toString()
359     {
360         return String.format("%s %s g=%s p=%s",
361                 super.toString(),
362                 _destination == null ? "?.?.?.?:??" : _destination.getAddress(),
363                 _generator,
364                 _parser);
365     }
366 
367     public String toDetailString()
368     {
369         return toString() + " ex=" + _exchange + " idle for " + _idleTimeout.getAge();
370     }
371 
372     public void close() throws IOException
373     {
374         //if there is a live, unfinished exchange, set its status to be
375         //excepted and wake up anyone waiting on waitForDone()
376 
377         HttpExchange exchange = _exchange;
378         if (exchange != null && !exchange.isDone())
379         {
380             switch (exchange.getStatus())
381             {
382                 case HttpExchange.STATUS_CANCELLED:
383                 case HttpExchange.STATUS_CANCELLING:
384                 case HttpExchange.STATUS_COMPLETED:
385                 case HttpExchange.STATUS_EXCEPTED:
386                 case HttpExchange.STATUS_EXPIRED:
387                     break;
388                 case HttpExchange.STATUS_PARSING_CONTENT:
389                     if (_endp.isInputShutdown() && _parser.isState(HttpParser.STATE_EOF_CONTENT))
390                         break;
391                 default:
392                     String exch= exchange.toString();
393                     String reason = _endp.isOpen()?(_endp.isInputShutdown()?"half closed: ":"local close: "):"closed: ";
394                     if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
395                         exchange.getEventListener().onException(new EofException(reason+exch));
396             }
397         }
398 
399         if (_endp.isOpen())
400         {
401             _endp.close();
402             _destination.returnConnection(this, true);
403         }
404     }
405 
406     public void setIdleTimeout()
407     {
408         synchronized (this)
409         {
410             if (_idle.compareAndSet(false, true))
411                 _destination.getHttpClient().scheduleIdle(_idleTimeout);
412             else
413                 throw new IllegalStateException();
414         }
415     }
416 
417     public boolean cancelIdleTimeout()
418     {
419         synchronized (this)
420         {
421             if (_idle.compareAndSet(true, false))
422             {
423                 _destination.getHttpClient().cancel(_idleTimeout);
424                 return true;
425             }
426         }
427 
428         return false;
429     }
430 
431     protected void exchangeExpired(HttpExchange exchange)
432     {
433         synchronized (this)
434         {
435             // We are expiring an exchange, but the exchange is pending
436             // Cannot reuse the connection because the reply may arrive, so close it
437             if (_exchange == exchange)
438             {
439                 try
440                 {
441                     _destination.returnConnection(this, true);
442                 }
443                 catch (IOException x)
444                 {
445                     LOG.ignore(x);
446                 }
447             }
448         }
449     }
450 
451     /* ------------------------------------------------------------ */
452     /**
453      * @see org.eclipse.jetty.util.component.Dumpable#dump()
454      */
455     public String dump()
456     {
457         return AggregateLifeCycle.dump(this);
458     }
459 
460     /* ------------------------------------------------------------ */
461     /**
462      * @see org.eclipse.jetty.util.component.Dumpable#dump(java.lang.Appendable, java.lang.String)
463      */
464     public void dump(Appendable out, String indent) throws IOException
465     {
466         synchronized (this)
467         {
468             out.append(String.valueOf(this)).append("\n");
469             AggregateLifeCycle.dump(out,indent,Collections.singletonList(_endp));
470         }
471     }
472 
473     /* ------------------------------------------------------------ */
474     private class ConnectionIdleTask extends Timeout.Task
475     {
476         /* ------------------------------------------------------------ */
477         @Override
478         public void expired()
479         {
480             // Connection idle, close it
481             if (_idle.compareAndSet(true, false))
482             {
483                 _destination.returnIdleConnection(AbstractHttpConnection.this);
484             }
485         }
486     }
487 
488 
489     /* ------------------------------------------------------------ */
490     private class NonFinalResponseListener implements HttpEventListener
491     {
492         final HttpExchange _exchange;
493         final HttpEventListener _next;
494 
495         /* ------------------------------------------------------------ */
496         public NonFinalResponseListener(HttpExchange exchange)
497         {
498             _exchange=exchange;
499             _next=exchange.getEventListener();
500         }
501 
502         /* ------------------------------------------------------------ */
503         public void onRequestCommitted() throws IOException
504         {
505         }
506 
507         /* ------------------------------------------------------------ */
508         public void onRequestComplete() throws IOException
509         {
510         }
511 
512         /* ------------------------------------------------------------ */
513         public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
514         {
515         }
516 
517         /* ------------------------------------------------------------ */
518         public void onResponseHeader(Buffer name, Buffer value) throws IOException
519         {
520             _next.onResponseHeader(name,value);
521         }
522 
523         /* ------------------------------------------------------------ */
524         public void onResponseHeaderComplete() throws IOException
525         {
526             _next.onResponseHeaderComplete();
527         }
528 
529         /* ------------------------------------------------------------ */
530         public void onResponseContent(Buffer content) throws IOException
531         {
532         }
533 
534         /* ------------------------------------------------------------ */
535         public void onResponseComplete() throws IOException
536         {
537             _exchange.setEventListener(_next);
538             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
539             _parser.reset();
540         }
541 
542         /* ------------------------------------------------------------ */
543         public void onConnectionFailed(Throwable ex)
544         {
545             _exchange.setEventListener(_next);
546             _next.onConnectionFailed(ex);
547         }
548 
549         /* ------------------------------------------------------------ */
550         public void onException(Throwable ex)
551         {
552             _exchange.setEventListener(_next);
553             _next.onException(ex);
554         }
555 
556         /* ------------------------------------------------------------ */
557         public void onExpire()
558         {
559             _exchange.setEventListener(_next);
560             _next.onExpire();
561         }
562 
563         /* ------------------------------------------------------------ */
564         public void onRetry()
565         {
566             _exchange.setEventListener(_next);
567             _next.onRetry();
568         }
569     }
570 }