View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.EOFException;
17  import java.io.IOException;
18  import java.io.InputStream;
19  import java.io.InterruptedIOException;
20  import java.util.concurrent.atomic.AtomicBoolean;
21  
22  import org.eclipse.jetty.client.security.Authentication;
23  import org.eclipse.jetty.http.HttpFields;
24  import org.eclipse.jetty.http.HttpGenerator;
25  import org.eclipse.jetty.http.HttpHeaderValues;
26  import org.eclipse.jetty.http.HttpHeaders;
27  import org.eclipse.jetty.http.HttpMethods;
28  import org.eclipse.jetty.http.HttpParser;
29  import org.eclipse.jetty.http.HttpSchemes;
30  import org.eclipse.jetty.http.HttpStatus;
31  import org.eclipse.jetty.http.HttpVersions;
32  import org.eclipse.jetty.io.AsyncEndPoint;
33  import org.eclipse.jetty.io.Buffer;
34  import org.eclipse.jetty.io.Buffers;
35  import org.eclipse.jetty.io.ByteArrayBuffer;
36  import org.eclipse.jetty.io.Connection;
37  import org.eclipse.jetty.io.EndPoint;
38  import org.eclipse.jetty.io.View;
39  import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.thread.Timeout;
42  
43  /**
44   *
45   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
46   */
47  public class HttpConnection implements Connection
48  {
49      private HttpDestination _destination;
50      private EndPoint _endp;
51      private HttpGenerator _generator;
52      private HttpParser _parser;
53      private boolean _http11 = true;
54      private int _status;
55      private Buffer _connectionHeader;
56      private Buffer _requestContentChunk;
57      private boolean _requestComplete;
58      private boolean _reserved;
59      // The current exchange waiting for a response
60      private volatile HttpExchange _exchange;
61      private HttpExchange _pipeline;
62      private final Timeout.Task _timeout = new TimeoutTask();
63      private AtomicBoolean _idle = new AtomicBoolean(false);
64  
65      public void dump() throws IOException
66      {
67          Log.info("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
68          Log.info("generator=" + _generator);
69          Log.info("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
70          Log.info("exchange=" + _exchange);
71          if (_endp instanceof SslSelectChannelEndPoint)
72              ((SslSelectChannelEndPoint)_endp).dump();
73      }
74  
75      HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
76      {
77          _endp = endp;
78          _generator = new HttpGenerator(requestBuffers,endp);
79          _parser = new HttpParser(responseBuffers,endp,new Handler());
80      }
81  
82      public long getTimeStamp()
83      {
84          return -1;
85      }
86  
87      public void setReserved (boolean reserved)
88      {
89          _reserved = reserved;
90      }
91  
92      public boolean isReserved()
93      {
94          return _reserved;
95      }
96  
97      public HttpDestination getDestination()
98      {
99          return _destination;
100     }
101 
102     public void setDestination(HttpDestination destination)
103     {
104         _destination = destination;
105     }
106 
107     public boolean send(HttpExchange ex) throws IOException
108     {
109         synchronized (this)
110         {
111             if (_exchange != null)
112             {
113                 if (_pipeline != null)
114                     throw new IllegalStateException(this + " PIPELINED!!!  _exchange=" + _exchange);
115                 _pipeline = ex;
116                 return true;
117             }
118 
119             if (!_endp.isOpen())
120                 return false;
121 
122             _exchange = ex;
123             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
124 
125             if (_endp.isBlocking())
126             {
127                 this.notify();
128             }
129             else
130             {
131                 AsyncEndPoint scep = (AsyncEndPoint)_endp;
132                 scep.scheduleWrite();
133             }
134 
135             long exchTimeout = _exchange.getTimeout();
136             if (exchTimeout > 0)
137             {
138                 _destination.getHttpClient().schedule(_timeout, exchTimeout);
139             }
140             else
141             {
142                 _destination.getHttpClient().schedule(_timeout);
143             }
144 
145             return true;
146         }
147     }
148 
149     public Connection handle() throws IOException
150     {
151         if (_exchange != null)
152             _exchange.associate(this);
153 
154         try
155         {
156             int no_progress = 0;
157 
158             boolean failed = false;
159             while (_endp.isBufferingInput() || _endp.isOpen())
160             {
161                 synchronized (this)
162                 {
163                     while (_exchange == null)
164                     {
165                         if (_endp.isBlocking())
166                         {
167                             try
168                             {
169                                 this.wait();
170                             }
171                             catch (InterruptedException e)
172                             {
173                                 throw new InterruptedIOException();
174                             }
175                         }
176                         else
177                         {
178                             // Hopefully just space?
179                             _parser.fill();
180                             _parser.skipCRLF();
181                             if (_parser.isMoreInBuffer())
182                             {
183                                 Log.warn("Unexpected data received but no request sent");
184                                 close();
185                             }
186                             return this;
187                         }
188                     }
189                     if (!_exchange.isAssociated())
190                         _exchange.associate(this);
191                 }
192 
193                 try
194                 {
195                     if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
196                     {
197                         no_progress = 0;
198                         commitRequest();
199                     }
200                     
201                     long io = 0;
202                     _endp.flush();
203 
204                     if (_generator.isComplete())
205                     {
206                         if (!_requestComplete)
207                         {
208                             _requestComplete = true;
209                             _exchange.getEventListener().onRequestComplete();
210                         }
211                     }
212                     else
213                     {
214                         // Write as much of the request as possible
215                         synchronized (this)
216                         {
217                             if (_exchange == null)
218                                 continue;
219                         }
220 
221                         long flushed = _generator.flushBuffer();
222                         io += flushed;
223 
224                         if (!_generator.isComplete())
225                         {
226                             if (_exchange!=null)
227                             {
228                                 InputStream in = _exchange.getRequestContentSource();
229                                 if (in != null)
230                                 {
231                                     if (_requestContentChunk == null || _requestContentChunk.length() == 0)
232                                     {
233                                         _requestContentChunk = _exchange.getRequestContentChunk();
234                                         _destination.getHttpClient().schedule(_timeout);
235 
236                                         if (_requestContentChunk != null)
237                                             _generator.addContent(_requestContentChunk,false);
238                                         else
239                                             _generator.complete();
240 
241                                         flushed = _generator.flushBuffer();
242                                         io += flushed;
243                                     }
244                                 }
245                                 else
246                                     _generator.complete();
247                             }
248                             else
249                                 _generator.complete();
250                         }
251                     }
252 
253                     if (_generator.isComplete() && !_requestComplete)
254                     {
255                         _requestComplete = true;
256                         _exchange.getEventListener().onRequestComplete();
257                     }
258 
259                     // If we are not ended then parse available
260                     if (!_parser.isComplete() && (_generator.isComplete() || _generator.isCommitted() && !_endp.isBlocking()))
261                     {
262                         long filled = _parser.parseAvailable();
263                         io += filled;
264                     }
265 
266                     if (io > 0)
267                         no_progress = 0;
268                     else if (no_progress++ >= 1 && !_endp.isBlocking())
269                     {
270                         // SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
271                         if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
272                         {
273                             long flushed = _generator.flushBuffer();
274                             if (flushed>0)
275                                 continue;
276                         }
277                         return this;
278                     }
279                 }
280                 catch (Throwable e)
281                 {
282                     Log.debug("Failure on " + _exchange, e);
283 
284                     if (e instanceof ThreadDeath)
285                         throw (ThreadDeath)e;
286 
287                     failed = true;
288                     
289                     synchronized (this)
290                     {
291                         if (_exchange != null)
292                         {
293                             // Cancelling the exchange causes an exception as we close the connection,
294                             // but we don't report it as it is normal cancelling operation
295                             if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING)
296                             {
297                                 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
298                                 _exchange.getEventListener().onException(e);
299                             }
300                         }
301                         else
302                         {
303                             if (e instanceof IOException)
304                                 throw (IOException)e;
305 
306                             if (e instanceof Error)
307                                 throw (Error)e;
308 
309                             if (e instanceof RuntimeException)
310                                 throw (RuntimeException)e;
311 
312                             throw new RuntimeException(e);
313                         }
314                     }
315                 }
316                 finally
317                 {
318                     boolean complete = false;
319                     boolean close = failed; // always close the connection on error
320                     if (!failed)
321                     {
322                         // are we complete?
323                         if (_generator.isComplete())
324                         {
325                             if (!_requestComplete)
326                             {
327                                 _requestComplete = true;
328                                 _exchange.getEventListener().onRequestComplete();
329                             }
330 
331                             // we need to return the HttpConnection to a state that
332                             // it can be reused or closed out
333                             if (_parser.isComplete())
334                             {
335                                 _destination.getHttpClient().cancel(_timeout);
336                                 complete = true;
337                             }
338                         }
339                     }
340 
341                     if (complete || failed)
342                     {
343                         synchronized (this)
344                         {
345                             if (!close)
346                                 close = shouldClose();
347 
348                             reset(true);
349 
350                             no_progress = 0;
351                             if (_exchange != null)
352                             {
353                                 HttpExchange exchange=_exchange;
354                                 _exchange.disassociate();
355                                 _exchange = null;
356 
357                                 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
358                                 {
359                                     Connection switched=exchange.onSwitchProtocol(_endp);
360                                     if (switched!=null)
361                                     {
362                                         // switched protocol!
363                                         exchange = _pipeline;
364                                         _pipeline = null;
365                                         if (exchange!=null)
366                                             _destination.send(exchange);
367 
368                                         return switched;
369                                     }
370                                 }
371 
372                                 if (_pipeline == null)
373                                 {
374                                     if (!isReserved())
375                                         _destination.returnConnection(this, close);
376                                 }
377                                 else
378                                 {
379                                     if (close)
380                                     {
381                                         if (!isReserved())
382                                             _destination.returnConnection(this,close);
383 
384                                         exchange = _pipeline;
385                                         _pipeline = null;
386                                         _destination.send(exchange);
387                                     }
388                                     else
389                                     {
390                                         exchange = _pipeline;
391                                         _pipeline = null;
392                                         send(exchange);
393                                     }
394                                 }
395 
396                             }
397                         }
398                     }
399                 }
400             }
401         }
402         finally
403         {
404             if (_exchange != null && _exchange.isAssociated())
405             {
406                 _exchange.disassociate();
407             }
408 
409             // Do we have more stuff to write?
410             if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp instanceof AsyncEndPoint)
411             {
412                 // Assume we are write blocked!
413                 ((AsyncEndPoint)_endp).setWritable(false);
414             }
415         }
416 
417         return this;
418     }
419 
420     public boolean isIdle()
421     {
422         synchronized (this)
423         {
424             return _exchange == null;
425         }
426     }
427 
428     /**
429      * @see org.eclipse.jetty.io.Connection#isSuspended()
430      */
431     public boolean isSuspended()
432     {
433         return false;
434     }
435 
436     public void closed()
437     {
438     }
439 
440     public EndPoint getEndPoint()
441     {
442         return _endp;
443     }
444 
445     private void commitRequest() throws IOException
446     {
447         synchronized (this)
448         {
449             _status=0;
450             if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
451                 throw new IllegalStateException();
452 
453             _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
454             _generator.setVersion(_exchange.getVersion());
455 
456             String method=_exchange.getMethod();
457             String uri = _exchange.getURI();
458             if (_destination.isProxied() && !HttpMethods.CONNECT.equals(method) && uri.startsWith("/"))
459             {
460                 boolean secure = _destination.isSecure();
461                 String host = _destination.getAddress().getHost();
462                 int port = _destination.getAddress().getPort();
463                 StringBuilder absoluteURI = new StringBuilder();
464                 absoluteURI.append(secure ? HttpSchemes.HTTPS : HttpSchemes.HTTP);
465                 absoluteURI.append("://");
466                 absoluteURI.append(host);
467                 // Avoid adding default ports
468                 if (!(secure && port == 443 || !secure && port == 80))
469                     absoluteURI.append(":").append(port);
470                 absoluteURI.append(uri);
471                 uri = absoluteURI.toString();
472                 Authentication auth = _destination.getProxyAuthentication();
473                 if (auth != null)
474                     auth.setCredentials(_exchange);
475             }
476 
477             _generator.setRequest(method, uri);
478             _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
479 
480             HttpFields requestHeaders = _exchange.getRequestFields();
481             if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
482             {
483                 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
484                     requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
485             }
486 
487             Buffer requestContent = _exchange.getRequestContent();
488             if (requestContent != null)
489             {
490                 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
491                 _generator.completeHeader(requestHeaders,false);
492                 _generator.addContent(new View(requestContent),true);
493             }
494             else
495             {
496                 InputStream requestContentStream = _exchange.getRequestContentSource();
497                 if (requestContentStream != null)
498                 {
499                     _generator.completeHeader(requestHeaders, false);
500                     int available = requestContentStream.available();
501                     if (available > 0)
502                     {
503                         // TODO deal with any known content length
504                         // TODO reuse this buffer!
505                         byte[] buf = new byte[available];
506                         int length = requestContentStream.read(buf);
507                         _generator.addContent(new ByteArrayBuffer(buf, 0, length), false);
508                     }
509                 }
510                 else
511                 {
512                     requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
513                     _generator.completeHeader(requestHeaders, true);
514                 }
515             }
516 
517             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
518         }
519     }
520 
521     protected void reset(boolean returnBuffers) throws IOException
522     {
523         _requestComplete = false;
524         _connectionHeader = null;
525         _parser.reset(returnBuffers);
526         _generator.reset(returnBuffers);
527         _http11 = true;
528     }
529 
530     private boolean shouldClose()
531     {
532         if (_connectionHeader!=null)
533         {
534             if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
535                 return true;
536             if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
537                 return false;
538         }
539         return !_http11;
540     }
541 
542     private class Handler extends HttpParser.EventHandler
543     {
544         @Override
545         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
546         {
547             // System.out.println( method.toString() + "///" + url.toString() +
548             // "///" + version.toString() );
549             // TODO validate this is acceptable, the <!DOCTYPE goop was coming
550             // out here
551             // throw new IllegalStateException();
552         }
553 
554         @Override
555         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
556         {
557             HttpExchange exchange = _exchange;
558             if (exchange!=null)
559             {
560                 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
561                 _status=status;
562                 exchange.getEventListener().onResponseStatus(version,status,reason);
563                 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
564             }
565         }
566 
567         @Override
568         public void parsedHeader(Buffer name, Buffer value) throws IOException
569         {
570             HttpExchange exchange = _exchange;
571             if (exchange!=null)
572             {
573                 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
574                 {
575                     _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
576                 }
577                 exchange.getEventListener().onResponseHeader(name,value);
578             }
579         }
580 
581         @Override
582         public void headerComplete() throws IOException
583         {
584             if (_endp instanceof AsyncEndPoint)
585                 ((AsyncEndPoint)_endp).scheduleIdle();
586             HttpExchange exchange = _exchange;
587             if (exchange!=null)
588                 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
589         }
590 
591         @Override
592         public void content(Buffer ref) throws IOException
593         {
594             if (_endp instanceof AsyncEndPoint)
595                 ((AsyncEndPoint)_endp).scheduleIdle();
596             HttpExchange exchange = _exchange;
597             if (exchange!=null)
598                 exchange.getEventListener().onResponseContent(ref);
599         }
600 
601         @Override
602         public void messageComplete(long contextLength) throws IOException
603         {
604             HttpExchange exchange = _exchange;
605             if (exchange!=null)
606                 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
607         }
608     }
609 
610     @Override
611     public String toString()
612     {
613         return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
614     }
615 
616     public String toDetailString()
617     {
618         return toString() + " ex=" + _exchange + " " + _timeout.getAge();
619     }
620 
621     public void close() throws IOException
622     {
623         //if there is a live, unfinished exchange, set its status to be
624         //excepted and wake up anyone waiting on waitForDone()
625 
626         if (_exchange != null && !_exchange.isDone())
627         {
628             switch (_exchange.getStatus())
629             {
630                 case HttpExchange.STATUS_CANCELLED:
631                 case HttpExchange.STATUS_CANCELLING:
632                 case HttpExchange.STATUS_COMPLETED:
633                 case HttpExchange.STATUS_EXCEPTED:
634                 case HttpExchange.STATUS_EXPIRED:
635                     break;
636                 default:
637                     _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
638                     _exchange.getEventListener().onException(new EOFException("local close"));
639             }
640         }
641 
642         _endp.close();
643     }
644 
645     public void setIdleTimeout()
646     {
647         synchronized (this)
648         {
649             if (_idle.compareAndSet(false,true))
650                 _destination.getHttpClient().scheduleIdle(_timeout);
651             else
652                 throw new IllegalStateException();
653         }
654     }
655 
656     public boolean cancelIdleTimeout()
657     {
658         synchronized (this)
659         {
660             if (_idle.compareAndSet(true,false))
661             {
662                 _destination.getHttpClient().cancel(_timeout);
663                 return true;
664             }
665         }
666 
667         return false;
668     }
669 
670     private class TimeoutTask extends Timeout.Task
671     {
672         @Override
673         public void expired()
674         {
675             HttpExchange ex = null;
676             try
677             {
678                 synchronized (HttpConnection.this)
679                 {
680                     ex = _exchange;
681                     _exchange = null;
682                     if (ex != null)
683                     {
684                         ex.disassociate();
685                         _destination.returnConnection(HttpConnection.this, true);
686                     }
687                     else if (_idle.compareAndSet(true,false))
688                     {
689                         _destination.returnIdleConnection(HttpConnection.this);
690                     }
691                 }
692             }
693             catch (Exception e)
694             {
695                 Log.debug(e);
696             }
697             finally
698             {
699                 try
700                 {
701                     close();
702                 }
703                 catch (IOException e)
704                 {
705                     Log.ignore(e);
706                 }
707 
708                 if (ex != null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
709                 {
710                     ex.setStatus(HttpExchange.STATUS_EXPIRED);
711                 }
712             }
713         }
714     }
715 }