View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2011 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.io.InputStream;
18  import java.util.Collections;
19  import java.util.concurrent.atomic.AtomicBoolean;
20  
21  import org.eclipse.jetty.client.security.Authentication;
22  import org.eclipse.jetty.http.HttpFields;
23  import org.eclipse.jetty.http.HttpGenerator;
24  import org.eclipse.jetty.http.HttpHeaderValues;
25  import org.eclipse.jetty.http.HttpHeaders;
26  import org.eclipse.jetty.http.HttpMethods;
27  import org.eclipse.jetty.http.HttpParser;
28  import org.eclipse.jetty.http.HttpSchemes;
29  import org.eclipse.jetty.http.HttpStatus;
30  import org.eclipse.jetty.http.HttpVersions;
31  import org.eclipse.jetty.io.AbstractConnection;
32  import org.eclipse.jetty.io.Buffer;
33  import org.eclipse.jetty.io.Buffers;
34  import org.eclipse.jetty.io.ByteArrayBuffer;
35  import org.eclipse.jetty.io.Connection;
36  import org.eclipse.jetty.io.EndPoint;
37  import org.eclipse.jetty.io.EofException;
38  import org.eclipse.jetty.io.View;
39  import org.eclipse.jetty.util.component.AggregateLifeCycle;
40  import org.eclipse.jetty.util.component.Dumpable;
41  import org.eclipse.jetty.util.log.Log;
42  import org.eclipse.jetty.util.log.Logger;
43  import org.eclipse.jetty.util.thread.Timeout;
44  
45  /**
46   *
47   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
48   */
49  public abstract class AbstractHttpConnection extends AbstractConnection implements Dumpable
50  {
51      private static final Logger LOG = Log.getLogger(AbstractHttpConnection.class);
52  
53      protected HttpDestination _destination;
54      protected HttpGenerator _generator;
55      protected HttpParser _parser;
56      protected boolean _http11 = true;
57      protected int _status;
58      protected Buffer _connectionHeader;
59      protected boolean _reserved;
60  
61      // The current exchange waiting for a response
62      protected volatile HttpExchange _exchange;
63      protected HttpExchange _pipeline;
64      private final Timeout.Task _idleTimeout = new ConnectionIdleTask();
65      private AtomicBoolean _idle = new AtomicBoolean(false);
66  
67  
68      AbstractHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
69      {
70          super(endp);
71  
72          _generator = new HttpGenerator(requestBuffers,endp);
73          _parser = new HttpParser(responseBuffers,endp,new Handler());
74      }
75  
76      public void setReserved (boolean reserved)
77      {
78          _reserved = reserved;
79      }
80  
81      public boolean isReserved()
82      {
83          return _reserved;
84      }
85  
86      public HttpDestination getDestination()
87      {
88          return _destination;
89      }
90  
91      public void setDestination(HttpDestination destination)
92      {
93          _destination = destination;
94      }
95  
96      public boolean send(HttpExchange ex) throws IOException
97      {
98          LOG.debug("Send {} on {}",ex,this);
99          synchronized (this)
100         {
101             if (_exchange != null)
102             {
103                 if (_pipeline != null)
104                     throw new IllegalStateException(this + " PIPELINED!!!  _exchange=" + _exchange);
105                 _pipeline = ex;
106                 return true;
107             }
108 
109             _exchange = ex;
110             _exchange.associate(this);
111 
112             // The call to associate() may have closed the connection, check if it's the case
113             if (!_endp.isOpen())
114             {
115                 _exchange.disassociate();
116                 _exchange = null;
117                 return false;
118             }
119 
120             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
121 
122             adjustIdleTimeout();
123 
124             return true;
125         }
126     }
127 
128     private void adjustIdleTimeout() throws IOException
129     {
130         // Adjusts the idle timeout in case the default or exchange timeout
131         // are greater. This is needed for long polls, where one wants an
132         // aggressive releasing of idle connections (so idle timeout is small)
133         // but still allow long polls to complete normally
134 
135         long timeout = _exchange.getTimeout();
136         if (timeout <= 0)
137             timeout = _destination.getHttpClient().getTimeout();
138 
139         long endPointTimeout = _endp.getMaxIdleTime();
140 
141         if (timeout > 0 && timeout > endPointTimeout)
142         {
143             // Make it larger than the exchange timeout so that there are
144             // no races between the idle timeout and the exchange timeout
145             // when trying to close the endpoint
146             _endp.setMaxIdleTime(2 * (int)timeout);
147         }
148     }
149 
150     public abstract Connection handle() throws IOException;
151 
152 
153     public boolean isIdle()
154     {
155         synchronized (this)
156         {
157             return _exchange == null;
158         }
159     }
160 
161     public boolean isSuspended()
162     {
163         return false;
164     }
165 
166     public void onClose()
167     {
168     }
169 
170     protected void commitRequest() throws IOException
171     {
172         synchronized (this)
173         {
174             _status=0;
175             if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
176                 throw new IllegalStateException();
177 
178             _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
179             _generator.setVersion(_exchange.getVersion());
180 
181             String method=_exchange.getMethod();
182             String uri = _exchange.getRequestURI();
183             if (_destination.isProxied())
184             {
185                 if (!HttpMethods.CONNECT.equals(method) && uri.startsWith("/"))
186                 {
187                     boolean secure = _destination.isSecure();
188                     String host = _destination.getAddress().getHost();
189                     int port = _destination.getAddress().getPort();
190                     StringBuilder absoluteURI = new StringBuilder();
191                     absoluteURI.append(secure ? HttpSchemes.HTTPS : HttpSchemes.HTTP);
192                     absoluteURI.append("://");
193                     absoluteURI.append(host);
194                     // Avoid adding default ports
195                     if (!(secure && port == 443 || !secure && port == 80))
196                         absoluteURI.append(":").append(port);
197                     absoluteURI.append(uri);
198                     uri = absoluteURI.toString();
199                 }
200                 Authentication auth = _destination.getProxyAuthentication();
201                 if (auth != null)
202                     auth.setCredentials(_exchange);
203             }
204 
205             _generator.setRequest(method, uri);
206             _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
207 
208             HttpFields requestHeaders = _exchange.getRequestFields();
209             if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
210             {
211                 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
212                     requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
213             }
214 
215             Buffer requestContent = _exchange.getRequestContent();
216             if (requestContent != null)
217             {
218                 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
219                 _generator.completeHeader(requestHeaders,false);
220                 _generator.addContent(new View(requestContent),true);
221             }
222             else
223             {
224                 InputStream requestContentStream = _exchange.getRequestContentSource();
225                 if (requestContentStream != null)
226                 {
227                     _generator.completeHeader(requestHeaders, false);
228                     int available = requestContentStream.available();
229                     if (available > 0)
230                     {
231                         // TODO deal with any known content length
232                         // TODO reuse this buffer!
233                         byte[] buf = new byte[available];
234                         int length = requestContentStream.read(buf);
235                         _generator.addContent(new ByteArrayBuffer(buf, 0, length), false);
236                     }
237                 }
238                 else
239                 {
240                     requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
241                     _generator.completeHeader(requestHeaders, true);
242                 }
243             }
244 
245             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
246         }
247     }
248 
249     protected void reset() throws IOException
250     {
251         _connectionHeader = null;
252         _parser.reset();
253         _generator.reset();
254         _http11 = true;
255     }
256 
257 
258     private class Handler extends HttpParser.EventHandler
259     {
260         @Override
261         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
262         {
263             // System.out.println( method.toString() + "///" + url.toString() +
264             // "///" + version.toString() );
265             // TODO validate this is acceptable, the <!DOCTYPE goop was coming
266             // out here
267             // throw new IllegalStateException();
268         }
269 
270         @Override
271         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
272         {
273             HttpExchange exchange = _exchange;
274             if (exchange==null)
275             {
276                 LOG.warn("No exchange for response");
277                 _endp.close();
278                 return;
279             }
280 
281             switch(status)
282             {
283                 case HttpStatus.CONTINUE_100:
284                 case HttpStatus.PROCESSING_102:
285                     // TODO check if appropriate expect was sent in the request.
286                     exchange.setEventListener(new NonFinalResponseListener(exchange));
287                     break;
288 
289                 case HttpStatus.OK_200:
290                     // handle special case for CONNECT 200 responses
291                     if (HttpMethods.CONNECT.equalsIgnoreCase(exchange.getMethod()))
292                         _parser.setHeadResponse(true);
293                     break;
294             }
295 
296             _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
297             _status=status;
298             exchange.getEventListener().onResponseStatus(version,status,reason);
299             exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
300 
301         }
302 
303         @Override
304         public void parsedHeader(Buffer name, Buffer value) throws IOException
305         {
306             HttpExchange exchange = _exchange;
307             if (exchange!=null)
308             {
309                 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
310                 {
311                     _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
312                 }
313                 exchange.getEventListener().onResponseHeader(name,value);
314             }
315         }
316 
317         @Override
318         public void headerComplete() throws IOException
319         {
320             HttpExchange exchange = _exchange;
321             if (exchange!=null)
322                 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
323         }
324 
325         @Override
326         public void content(Buffer ref) throws IOException
327         {
328             HttpExchange exchange = _exchange;
329             if (exchange!=null)
330                 exchange.getEventListener().onResponseContent(ref);
331         }
332 
333         @Override
334         public void messageComplete(long contextLength) throws IOException
335         {
336             HttpExchange exchange = _exchange;
337             if (exchange!=null)
338                 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
339         }
340 
341         @Override
342         public void earlyEOF()
343         {
344             HttpExchange exchange = _exchange;
345             if (exchange!=null)
346             {
347                 if (!exchange.isDone())
348                 {
349                     if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
350                         exchange.getEventListener().onException(new EofException("early EOF"));
351                 }
352             }
353         }
354 
355 
356     }
357 
358     @Override
359     public String toString()
360     {
361         return String.format("%s %s g=%s p=%s",
362                 super.toString(),
363                 _destination == null ? "?.?.?.?:??" : _destination.getAddress(),
364                 _generator,
365                 _parser);
366     }
367 
368     public String toDetailString()
369     {
370         return toString() + " ex=" + _exchange + " idle for " + _idleTimeout.getAge();
371     }
372 
373     public void close() throws IOException
374     {
375         //if there is a live, unfinished exchange, set its status to be
376         //excepted and wake up anyone waiting on waitForDone()
377 
378         HttpExchange exchange = _exchange;
379         if (exchange != null && !exchange.isDone())
380         {
381             switch (exchange.getStatus())
382             {
383                 case HttpExchange.STATUS_CANCELLED:
384                 case HttpExchange.STATUS_CANCELLING:
385                 case HttpExchange.STATUS_COMPLETED:
386                 case HttpExchange.STATUS_EXCEPTED:
387                 case HttpExchange.STATUS_EXPIRED:
388                     break;
389                 case HttpExchange.STATUS_PARSING_CONTENT:
390                     if (_endp.isInputShutdown() && _parser.isState(HttpParser.STATE_EOF_CONTENT))
391                         break;
392                 default:
393                     String exch= exchange.toString();
394                     String reason = _endp.isOpen()?(_endp.isInputShutdown()?"half closed: ":"local close: "):"closed: ";
395                     if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
396                         exchange.getEventListener().onException(new EofException(reason+exch));
397             }
398         }
399 
400         if (_endp.isOpen())
401         {
402             _endp.close();
403             _destination.returnConnection(this, true);
404         }
405     }
406 
407     public void setIdleTimeout()
408     {
409         synchronized (this)
410         {
411             if (_idle.compareAndSet(false, true))
412                 _destination.getHttpClient().scheduleIdle(_idleTimeout);
413             else
414                 throw new IllegalStateException();
415         }
416     }
417 
418     public boolean cancelIdleTimeout()
419     {
420         synchronized (this)
421         {
422             if (_idle.compareAndSet(true, false))
423             {
424                 _destination.getHttpClient().cancel(_idleTimeout);
425                 return true;
426             }
427         }
428 
429         return false;
430     }
431 
432     protected void exchangeExpired(HttpExchange exchange)
433     {
434         synchronized (this)
435         {
436             // We are expiring an exchange, but the exchange is pending
437             // Cannot reuse the connection because the reply may arrive, so close it
438             if (_exchange == exchange)
439             {
440                 try
441                 {
442                     _destination.returnConnection(this, true);
443                 }
444                 catch (IOException x)
445                 {
446                     LOG.ignore(x);
447                 }
448             }
449         }
450     }
451 
452     /* ------------------------------------------------------------ */
453     /**
454      * @see org.eclipse.jetty.util.component.Dumpable#dump()
455      */
456     public String dump()
457     {
458         return AggregateLifeCycle.dump(this);
459     }
460 
461     /* ------------------------------------------------------------ */
462     /**
463      * @see org.eclipse.jetty.util.component.Dumpable#dump(java.lang.Appendable, java.lang.String)
464      */
465     public void dump(Appendable out, String indent) throws IOException
466     {
467         synchronized (this)
468         {
469             out.append(String.valueOf(this)).append("\n");
470             AggregateLifeCycle.dump(out,indent,Collections.singletonList(_endp));
471         }
472     }
473 
474     /* ------------------------------------------------------------ */
475     private class ConnectionIdleTask extends Timeout.Task
476     {
477         /* ------------------------------------------------------------ */
478         @Override
479         public void expired()
480         {
481             // Connection idle, close it
482             if (_idle.compareAndSet(true, false))
483             {
484                 _destination.returnIdleConnection(AbstractHttpConnection.this);
485             }
486         }
487     }
488 
489 
490     /* ------------------------------------------------------------ */
491     private class NonFinalResponseListener implements HttpEventListener
492     {
493         final HttpExchange _exchange;
494         final HttpEventListener _next;
495 
496         /* ------------------------------------------------------------ */
497         public NonFinalResponseListener(HttpExchange exchange)
498         {
499             _exchange=exchange;
500             _next=exchange.getEventListener();
501         }
502 
503         /* ------------------------------------------------------------ */
504         public void onRequestCommitted() throws IOException
505         {
506         }
507 
508         /* ------------------------------------------------------------ */
509         public void onRequestComplete() throws IOException
510         {
511         }
512 
513         /* ------------------------------------------------------------ */
514         public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
515         {
516         }
517 
518         /* ------------------------------------------------------------ */
519         public void onResponseHeader(Buffer name, Buffer value) throws IOException
520         {
521             _next.onResponseHeader(name,value);
522         }
523 
524         /* ------------------------------------------------------------ */
525         public void onResponseHeaderComplete() throws IOException
526         {
527             _next.onResponseHeaderComplete();
528         }
529 
530         /* ------------------------------------------------------------ */
531         public void onResponseContent(Buffer content) throws IOException
532         {
533         }
534 
535         /* ------------------------------------------------------------ */
536         public void onResponseComplete() throws IOException
537         {
538             _exchange.setEventListener(_next);
539             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
540             _parser.reset();
541         }
542 
543         /* ------------------------------------------------------------ */
544         public void onConnectionFailed(Throwable ex)
545         {
546             _exchange.setEventListener(_next);
547             _next.onConnectionFailed(ex);
548         }
549 
550         /* ------------------------------------------------------------ */
551         public void onException(Throwable ex)
552         {
553             _exchange.setEventListener(_next);
554             _next.onException(ex);
555         }
556 
557         /* ------------------------------------------------------------ */
558         public void onExpire()
559         {
560             _exchange.setEventListener(_next);
561             _next.onExpire();
562         }
563 
564         /* ------------------------------------------------------------ */
565         public void onRetry()
566         {
567             _exchange.setEventListener(_next);
568             _next.onRetry();
569         }
570     }
571 }