View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.EOFException;
17  import java.io.IOException;
18  import java.io.InputStream;
19  import java.io.InterruptedIOException;
20  import java.util.Collections;
21  import java.util.concurrent.atomic.AtomicBoolean;
22  
23  import org.eclipse.jetty.client.security.Authentication;
24  import org.eclipse.jetty.http.HttpFields;
25  import org.eclipse.jetty.http.HttpGenerator;
26  import org.eclipse.jetty.http.HttpHeaderValues;
27  import org.eclipse.jetty.http.HttpHeaders;
28  import org.eclipse.jetty.http.HttpMethods;
29  import org.eclipse.jetty.http.HttpParser;
30  import org.eclipse.jetty.http.HttpSchemes;
31  import org.eclipse.jetty.http.HttpStatus;
32  import org.eclipse.jetty.http.HttpVersions;
33  import org.eclipse.jetty.io.AbstractConnection;
34  import org.eclipse.jetty.io.AsyncEndPoint;
35  import org.eclipse.jetty.io.Buffer;
36  import org.eclipse.jetty.io.Buffers;
37  import org.eclipse.jetty.io.ByteArrayBuffer;
38  import org.eclipse.jetty.io.Connection;
39  import org.eclipse.jetty.io.EndPoint;
40  import org.eclipse.jetty.io.View;
41  import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
42  import org.eclipse.jetty.util.component.AggregateLifeCycle;
43  import org.eclipse.jetty.util.component.Dumpable;
44  import org.eclipse.jetty.util.log.Log;
45  import org.eclipse.jetty.util.thread.Timeout;
46  
47  /**
48   *
49   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
50   */
51  public class HttpConnection extends AbstractConnection implements Dumpable
52  {
53      private HttpDestination _destination;
54      private HttpGenerator _generator;
55      private HttpParser _parser;
56      private boolean _http11 = true;
57      private int _status;
58      private Buffer _connectionHeader;
59      private Buffer _requestContentChunk;
60      private boolean _requestComplete;
61      private boolean _reserved;
62  
63      // The current exchange waiting for a response
64      private volatile HttpExchange _exchange;
65      private HttpExchange _pipeline;
66      private final Timeout.Task _idleTimeout = new ConnectionIdleTask();
67      private AtomicBoolean _idle = new AtomicBoolean(false);
68  
69  
70      HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
71      {
72          super(endp);
73  
74          _generator = new HttpGenerator(requestBuffers,endp);
75          _parser = new HttpParser(responseBuffers,endp,new Handler());
76      }
77  
78      public void setReserved (boolean reserved)
79      {
80          _reserved = reserved;
81      }
82  
83      public boolean isReserved()
84      {
85          return _reserved;
86      }
87  
88      public HttpDestination getDestination()
89      {
90          return _destination;
91      }
92  
93      public void setDestination(HttpDestination destination)
94      {
95          _destination = destination;
96      }
97  
98      public boolean send(HttpExchange ex) throws IOException
99      {
100         synchronized (this)
101         {
102             if (_exchange != null)
103             {
104                 if (_pipeline != null)
105                     throw new IllegalStateException(this + " PIPELINED!!!  _exchange=" + _exchange);
106                 _pipeline = ex;
107                 return true;
108             }
109 
110             _exchange = ex;
111             _exchange.associate(this);
112 
113             // The call to associate() may have closed the connection, check if it's the case
114             if (!_endp.isOpen())
115             {
116                 _exchange.disassociate();
117                 _exchange = null;
118                 return false;
119             }
120 
121             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
122 
123             if (_endp.isBlocking())
124             {
125                 this.notify();
126             }
127             else
128             {
129                 AsyncEndPoint scep = (AsyncEndPoint)_endp;
130                 scep.scheduleWrite();
131             }
132 
133             adjustIdleTimeout();
134 
135             return true;
136         }
137     }
138 
139     private void adjustIdleTimeout() throws IOException
140     {
141         // Adjusts the idle timeout in case the default or exchange timeout
142         // are greater. This is needed for long polls, where one wants an
143         // aggressive releasing of idle connections (so idle timeout is small)
144         // but still allow long polls to complete normally
145 
146         long timeout = _exchange.getTimeout();
147         if (timeout <= 0)
148             timeout = _destination.getHttpClient().getTimeout();
149 
150         long endPointTimeout = _endp.getMaxIdleTime();
151 
152         if (timeout > 0 && timeout > endPointTimeout)
153         {
154             // Make it larger than the exchange timeout so that there are
155             // no races between the idle timeout and the exchange timeout
156             // when trying to close the endpoint
157             _endp.setMaxIdleTime(2 * (int)timeout);
158         }
159     }
160 
161     public Connection handle() throws IOException
162     {
163         try
164         {
165             int no_progress = 0;
166 
167             boolean failed = false;
168             while (_endp.isBufferingInput() || _endp.isOpen())
169             {
170                 synchronized (this)
171                 {
172                     while (_exchange == null)
173                     {
174                         if (_endp.isBlocking())
175                         {
176                             try
177                             {
178                                 this.wait();
179                             }
180                             catch (InterruptedException e)
181                             {
182                                 throw new InterruptedIOException();
183                             }
184                         }
185                         else
186                         {
187                             long filled = _parser.fill();
188                             if (filled < 0)
189                             {
190                                 close();
191                             }
192                             else
193                             {
194                                 // Hopefully just space?
195                                 _parser.skipCRLF();
196                                 if (_parser.isMoreInBuffer())
197                                 {
198                                     Log.warn("Unexpected data received but no request sent");
199                                     close();
200                                 }
201                             }
202                             return this;
203                         }
204                     }
205                 }
206 
207                 try
208                 {
209                     if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
210                     {
211                         no_progress = 0;
212                         commitRequest();
213                     }
214 
215                     long io = 0;
216                     _endp.flush();
217 
218                     if (_generator.isComplete())
219                     {
220                         if (!_requestComplete)
221                         {
222                             _requestComplete = true;
223                             _exchange.getEventListener().onRequestComplete();
224                         }
225                     }
226                     else
227                     {
228                         // Write as much of the request as possible
229                         synchronized (this)
230                         {
231                             if (_exchange == null)
232                                 continue;
233                         }
234 
235                         long flushed = _generator.flushBuffer();
236                         io += flushed;
237 
238                         if (!_generator.isComplete())
239                         {
240                             if (_exchange!=null)
241                             {
242                                 InputStream in = _exchange.getRequestContentSource();
243                                 if (in != null)
244                                 {
245                                     if (_requestContentChunk == null || _requestContentChunk.length() == 0)
246                                     {
247                                         _requestContentChunk = _exchange.getRequestContentChunk();
248 
249                                         if (_requestContentChunk != null)
250                                             _generator.addContent(_requestContentChunk,false);
251                                         else
252                                             _generator.complete();
253 
254                                         flushed = _generator.flushBuffer();
255                                         io += flushed;
256                                     }
257                                 }
258                                 else
259                                     _generator.complete();
260                             }
261                             else
262                                 _generator.complete();
263                         }
264                     }
265 
266                     if (_generator.isComplete() && !_requestComplete)
267                     {
268                         _requestComplete = true;
269                         _exchange.getEventListener().onRequestComplete();
270                     }
271 
272                     // If we are not ended then parse available
273                     if (!_parser.isComplete() && (_generator.isComplete() || _generator.isCommitted() && !_endp.isBlocking()))
274                     {
275                         long filled = _parser.parseAvailable();
276                         io += filled;
277                     }
278 
279                     if (io > 0)
280                         no_progress = 0;
281                     else if (no_progress++ >= 1 && !_endp.isBlocking())
282                     {
283                         // SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
284                         if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
285                         {
286                             long flushed = _generator.flushBuffer();
287                             if (flushed>0)
288                                 continue;
289                         }
290                         return this;
291                     }
292                 }
293                 catch (Throwable e)
294                 {
295                     Log.debug("Failure on " + _exchange, e);
296 
297                     if (e instanceof ThreadDeath)
298                         throw (ThreadDeath)e;
299 
300                     failed = true;
301 
302                     synchronized (this)
303                     {
304                         if (_exchange != null)
305                         {
306                             // Cancelling the exchange causes an exception as we close the connection,
307                             // but we don't report it as it is normal cancelling operation
308                             if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
309                                     _exchange.getStatus() != HttpExchange.STATUS_CANCELLED)
310                             {
311                                 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
312                                 _exchange.getEventListener().onException(e);
313                             }
314                         }
315                         else
316                         {
317                             if (e instanceof IOException)
318                                 throw (IOException)e;
319 
320                             if (e instanceof Error)
321                                 throw (Error)e;
322 
323                             if (e instanceof RuntimeException)
324                                 throw (RuntimeException)e;
325 
326                             throw new RuntimeException(e);
327                         }
328                     }
329                 }
330                 finally
331                 {
332                     boolean complete = false;
333                     boolean close = failed; // always close the connection on error
334                     if (!failed)
335                     {
336                         // are we complete?
337                         if (_generator.isComplete())
338                         {
339                             if (!_requestComplete)
340                             {
341                                 _requestComplete = true;
342                                 _exchange.getEventListener().onRequestComplete();
343                             }
344 
345                             // we need to return the HttpConnection to a state that
346                             // it can be reused or closed out
347                             if (_parser.isComplete())
348                             {
349                                 _exchange.cancelTimeout(_destination.getHttpClient());
350                                 complete = true;
351                             }
352                         }
353                     }
354 
355                     if (_generator.isComplete() && !_parser.isComplete())
356                     {
357                         if (!_endp.isOpen() || _endp.isInputShutdown())
358                         {
359                             complete=true;
360                             close=true;
361                             close();
362                         }
363                     }
364 
365                     if (complete || failed)
366                     {
367                         synchronized (this)
368                         {
369                             if (!close)
370                                 close = shouldClose();
371 
372                             reset(true);
373 
374                             no_progress = 0;
375                             if (_exchange != null)
376                             {
377                                 HttpExchange exchange=_exchange;
378                                 _exchange = null;
379 
380                                 // Reset the maxIdleTime because it may have been changed
381                                 if (!close)
382                                     _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
383 
384                                 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
385                                 {
386                                     Connection switched=exchange.onSwitchProtocol(_endp);
387                                     if (switched!=null)
388                                     {
389                                         // switched protocol!
390                                         exchange = _pipeline;
391                                         _pipeline = null;
392                                         if (exchange!=null)
393                                             _destination.send(exchange);
394 
395                                         return switched;
396                                     }
397                                 }
398 
399                                 if (_pipeline == null)
400                                 {
401                                     if (!isReserved())
402                                         _destination.returnConnection(this, close);
403                                 }
404                                 else
405                                 {
406                                     if (close)
407                                     {
408                                         if (!isReserved())
409                                             _destination.returnConnection(this,close);
410 
411                                         exchange = _pipeline;
412                                         _pipeline = null;
413                                         _destination.send(exchange);
414                                     }
415                                     else
416                                     {
417                                         exchange = _pipeline;
418                                         _pipeline = null;
419                                         send(exchange);
420                                     }
421                                 }
422                             }
423                         }
424                     }
425                 }
426             }
427         }
428         finally
429         {
430             _parser.returnBuffers();
431             
432             // Do we have more stuff to write?
433             if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp instanceof AsyncEndPoint)
434             {
435                 // Assume we are write blocked!
436                 ((AsyncEndPoint)_endp).scheduleWrite();
437             }
438         }
439 
440         return this;
441     }
442 
443     public boolean isIdle()
444     {
445         synchronized (this)
446         {
447             return _exchange == null;
448         }
449     }
450 
451     public boolean isSuspended()
452     {
453         return false;
454     }
455 
456     public void closed()
457     {
458     }
459 
460     private void commitRequest() throws IOException
461     {
462         synchronized (this)
463         {
464             _status=0;
465             if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
466                 throw new IllegalStateException();
467 
468             _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
469             _generator.setVersion(_exchange.getVersion());
470 
471             String method=_exchange.getMethod();
472             String uri = _exchange.getURI();
473             if (_destination.isProxied() && !HttpMethods.CONNECT.equals(method) && uri.startsWith("/"))
474             {
475                 boolean secure = _destination.isSecure();
476                 String host = _destination.getAddress().getHost();
477                 int port = _destination.getAddress().getPort();
478                 StringBuilder absoluteURI = new StringBuilder();
479                 absoluteURI.append(secure ? HttpSchemes.HTTPS : HttpSchemes.HTTP);
480                 absoluteURI.append("://");
481                 absoluteURI.append(host);
482                 // Avoid adding default ports
483                 if (!(secure && port == 443 || !secure && port == 80))
484                     absoluteURI.append(":").append(port);
485                 absoluteURI.append(uri);
486                 uri = absoluteURI.toString();
487                 Authentication auth = _destination.getProxyAuthentication();
488                 if (auth != null)
489                     auth.setCredentials(_exchange);
490             }
491 
492             _generator.setRequest(method, uri);
493             _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
494 
495             HttpFields requestHeaders = _exchange.getRequestFields();
496             if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
497             {
498                 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
499                     requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
500             }
501 
502             Buffer requestContent = _exchange.getRequestContent();
503             if (requestContent != null)
504             {
505                 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
506                 _generator.completeHeader(requestHeaders,false);
507                 _generator.addContent(new View(requestContent),true);
508             }
509             else
510             {
511                 InputStream requestContentStream = _exchange.getRequestContentSource();
512                 if (requestContentStream != null)
513                 {
514                     _generator.completeHeader(requestHeaders, false);
515                     int available = requestContentStream.available();
516                     if (available > 0)
517                     {
518                         // TODO deal with any known content length
519                         // TODO reuse this buffer!
520                         byte[] buf = new byte[available];
521                         int length = requestContentStream.read(buf);
522                         _generator.addContent(new ByteArrayBuffer(buf, 0, length), false);
523                     }
524                 }
525                 else
526                 {
527                     requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
528                     _generator.completeHeader(requestHeaders, true);
529                 }
530             }
531 
532             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
533         }
534     }
535 
536     protected void reset(boolean returnBuffers) throws IOException
537     {
538         _requestComplete = false;
539         _connectionHeader = null;
540         _parser.reset();
541         if (returnBuffers)
542             _parser.returnBuffers();
543         _generator.reset(returnBuffers);
544         _http11 = true;
545     }
546 
547     private boolean shouldClose()
548     {
549         if (_connectionHeader!=null)
550         {
551             if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
552                 return true;
553             if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
554                 return false;
555         }
556         return !_http11;
557     }
558 
559     private class Handler extends HttpParser.EventHandler
560     {
561         @Override
562         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
563         {
564             // System.out.println( method.toString() + "///" + url.toString() +
565             // "///" + version.toString() );
566             // TODO validate this is acceptable, the <!DOCTYPE goop was coming
567             // out here
568             // throw new IllegalStateException();
569         }
570 
571         @Override
572         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
573         {
574             
575             HttpExchange exchange = _exchange;
576             if (exchange!=null)
577             {
578                 // handle special case for CONNECT 200 responses
579                 if (status==HttpStatus.OK_200 && HttpMethods.CONNECT.equalsIgnoreCase(exchange.getMethod()))
580                     _parser.setHeadResponse(true);
581                 
582                 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
583                 _status=status;
584                 exchange.getEventListener().onResponseStatus(version,status,reason);
585                 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
586             }
587         }
588 
589         @Override
590         public void parsedHeader(Buffer name, Buffer value) throws IOException
591         {
592             HttpExchange exchange = _exchange;
593             if (exchange!=null)
594             {
595                 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
596                 {
597                     _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
598                 }
599                 exchange.getEventListener().onResponseHeader(name,value);
600             }
601         }
602 
603         @Override
604         public void headerComplete() throws IOException
605         {
606             if (_endp instanceof AsyncEndPoint)
607                 ((AsyncEndPoint)_endp).scheduleIdle();
608             HttpExchange exchange = _exchange;
609             if (exchange!=null)
610                 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
611         }
612 
613         @Override
614         public void content(Buffer ref) throws IOException
615         {
616             if (_endp instanceof AsyncEndPoint)
617                 ((AsyncEndPoint)_endp).scheduleIdle();
618             HttpExchange exchange = _exchange;
619             if (exchange!=null)
620                 exchange.getEventListener().onResponseContent(ref);
621         }
622 
623         @Override
624         public void messageComplete(long contextLength) throws IOException
625         {
626             HttpExchange exchange = _exchange;
627             if (exchange!=null)
628                 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
629         }
630     }
631 
632     @Override
633     public String toString()
634     {
635         return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
636     }
637 
638     public String toDetailString()
639     {
640         return toString() + " ex=" + _exchange + " idle for " + _idleTimeout.getAge();
641     }
642 
643     public void close() throws IOException
644     {
645         //if there is a live, unfinished exchange, set its status to be
646         //excepted and wake up anyone waiting on waitForDone()
647 
648         HttpExchange exchange = _exchange;
649         if (exchange != null && !exchange.isDone())
650         {
651             switch (exchange.getStatus())
652             {
653                 case HttpExchange.STATUS_CANCELLED:
654                 case HttpExchange.STATUS_CANCELLING:
655                 case HttpExchange.STATUS_COMPLETED:
656                 case HttpExchange.STATUS_EXCEPTED:
657                 case HttpExchange.STATUS_EXPIRED:
658                     break;
659                 default:
660                     String exch= exchange.toString();
661                     String reason = _endp.isOpen()?(_endp.isInputShutdown()?"half closed: ":"local close: "):"closed: ";
662                     exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
663                     exchange.getEventListener().onException(new EOFException(reason+exch));
664             }
665         }
666 
667         _endp.close();
668     }
669 
670     public void setIdleTimeout()
671     {
672         synchronized (this)
673         {
674             if (_idle.compareAndSet(false, true))
675                 _destination.getHttpClient().scheduleIdle(_idleTimeout);
676             else
677                 throw new IllegalStateException();
678         }
679     }
680 
681     public boolean cancelIdleTimeout()
682     {
683         synchronized (this)
684         {
685             if (_idle.compareAndSet(true, false))
686             {
687                 _destination.getHttpClient().cancel(_idleTimeout);
688                 return true;
689             }
690         }
691 
692         return false;
693     }
694 
695     protected void exchangeExpired(HttpExchange exchange)
696     {
697         synchronized (this)
698         {
699             // We are expiring an exchange, but the exchange is pending
700             // Cannot reuse the connection because the reply may arrive, so close it
701             if (_exchange == exchange)
702             {
703                 try
704                 {
705                     _destination.returnConnection(this, true);
706                 }
707                 catch (IOException x)
708                 {
709                     Log.ignore(x);
710                 }
711             }
712         }
713     }
714     
715     /* ------------------------------------------------------------ */
716     /**
717      * @see org.eclipse.jetty.util.component.Dumpable#dump()
718      */
719     public String dump()
720     {
721         return AggregateLifeCycle.dump(this);
722     }
723 
724     /* ------------------------------------------------------------ */
725     /**
726      * @see org.eclipse.jetty.util.component.Dumpable#dump(java.lang.Appendable, java.lang.String)
727      */
728     public void dump(Appendable out, String indent) throws IOException
729     {
730         synchronized (this)
731         {
732             out.append(String.valueOf(this)).append("\n");
733             AggregateLifeCycle.dump(out,indent,Collections.singletonList(_endp));
734         }
735     }
736     
737     private class ConnectionIdleTask extends Timeout.Task
738     {
739         @Override
740         public void expired()
741         {
742             // Connection idle, close it
743             if (_idle.compareAndSet(true, false))
744             {
745                 _destination.returnIdleConnection(HttpConnection.this);
746             }
747         }
748     }
749 }