View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.EOFException;
17  import java.io.IOException;
18  import java.io.InputStream;
19  import java.io.InterruptedIOException;
20  import java.util.Collections;
21  import java.util.concurrent.atomic.AtomicBoolean;
22  
23  import org.eclipse.jetty.client.security.Authentication;
24  import org.eclipse.jetty.http.HttpFields;
25  import org.eclipse.jetty.http.HttpGenerator;
26  import org.eclipse.jetty.http.HttpHeaderValues;
27  import org.eclipse.jetty.http.HttpHeaders;
28  import org.eclipse.jetty.http.HttpMethods;
29  import org.eclipse.jetty.http.HttpParser;
30  import org.eclipse.jetty.http.HttpSchemes;
31  import org.eclipse.jetty.http.HttpStatus;
32  import org.eclipse.jetty.http.HttpVersions;
33  import org.eclipse.jetty.io.AbstractConnection;
34  import org.eclipse.jetty.io.AsyncEndPoint;
35  import org.eclipse.jetty.io.Buffer;
36  import org.eclipse.jetty.io.Buffers;
37  import org.eclipse.jetty.io.ByteArrayBuffer;
38  import org.eclipse.jetty.io.Connection;
39  import org.eclipse.jetty.io.EndPoint;
40  import org.eclipse.jetty.io.View;
41  import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
42  import org.eclipse.jetty.util.component.AggregateLifeCycle;
43  import org.eclipse.jetty.util.component.Dumpable;
44  import org.eclipse.jetty.util.log.Log;
45  import org.eclipse.jetty.util.log.Logger;
46  import org.eclipse.jetty.util.thread.Timeout;
47  
48  /**
49   *
50   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
51   */
52  public class HttpConnection extends AbstractConnection implements Dumpable
53  {
54      private static final Logger LOG = Log.getLogger(HttpConnection.class);
55  
56      private HttpDestination _destination;
57      private HttpGenerator _generator;
58      private HttpParser _parser;
59      private boolean _http11 = true;
60      private int _status;
61      private Buffer _connectionHeader;
62      private Buffer _requestContentChunk;
63      private boolean _requestComplete;
64      private boolean _reserved;
65  
66      // The current exchange waiting for a response
67      private volatile HttpExchange _exchange;
68      private HttpExchange _pipeline;
69      private final Timeout.Task _idleTimeout = new ConnectionIdleTask();
70      private AtomicBoolean _idle = new AtomicBoolean(false);
71  
72  
73      HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
74      {
75          super(endp);
76          
77          _generator = new HttpGenerator(requestBuffers,endp);
78          _parser = new HttpParser(responseBuffers,endp,new Handler());
79      }
80  
81      public void setReserved (boolean reserved)
82      {
83          _reserved = reserved;
84      }
85  
86      public boolean isReserved()
87      {
88          return _reserved;
89      }
90  
91      public HttpDestination getDestination()
92      {
93          return _destination;
94      }
95  
96      public void setDestination(HttpDestination destination)
97      {
98          _destination = destination;
99      }
100 
101     public boolean send(HttpExchange ex) throws IOException
102     {
103         synchronized (this)
104         {
105             if (_exchange != null)
106             {
107                 if (_pipeline != null)
108                     throw new IllegalStateException(this + " PIPELINED!!!  _exchange=" + _exchange);
109                 _pipeline = ex;
110                 return true;
111             }
112 
113             _exchange = ex;
114             _exchange.associate(this);
115 
116             // The call to associate() may have closed the connection, check if it's the case
117             if (!_endp.isOpen())
118             {
119                 _exchange.disassociate();
120                 _exchange = null;
121                 return false;
122             }
123 
124             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
125 
126             if (_endp.isBlocking())
127             {
128                 this.notify();
129             }
130             else
131             {
132                 AsyncEndPoint scep = (AsyncEndPoint)_endp;
133                 scep.scheduleWrite();
134             }
135 
136             adjustIdleTimeout();
137 
138             return true;
139         }
140     }
141 
142     private void adjustIdleTimeout() throws IOException
143     {
144         // Adjusts the idle timeout in case the default or exchange timeout
145         // are greater. This is needed for long polls, where one wants an
146         // aggressive releasing of idle connections (so idle timeout is small)
147         // but still allow long polls to complete normally
148 
149         long timeout = _exchange.getTimeout();
150         if (timeout <= 0)
151             timeout = _destination.getHttpClient().getTimeout();
152 
153         long endPointTimeout = _endp.getMaxIdleTime();
154 
155         if (timeout > 0 && timeout > endPointTimeout)
156         {
157             // Make it larger than the exchange timeout so that there are
158             // no races between the idle timeout and the exchange timeout
159             // when trying to close the endpoint
160             _endp.setMaxIdleTime(2 * (int)timeout);
161         }
162     }
163 
164     public Connection handle() throws IOException
165     {
166         try
167         {
168             int no_progress = 0;
169 
170             boolean failed = false;
171             while (_endp.isBufferingInput() || _endp.isOpen())
172             {
173                 synchronized (this)
174                 {
175                     while (_exchange == null)
176                     {
177                         if (_endp.isBlocking())
178                         {
179                             try
180                             {
181                                 this.wait();
182                             }
183                             catch (InterruptedException e)
184                             {
185                                 throw new InterruptedIOException();
186                             }
187                         }
188                         else
189                         {
190                             long filled = _parser.fill();
191                             if (filled < 0)
192                             {
193                                 close();
194                             }
195                             else
196                             {
197                                 // Hopefully just space?
198                                 _parser.skipCRLF();
199                                 if (_parser.isMoreInBuffer())
200                                 {
201                                     LOG.warn("Unexpected data received but no request sent");
202                                     close();
203                                 }
204                             }
205                             return this;
206                         }
207                     }
208                 }
209 
210                 try
211                 {
212                     if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
213                     {
214                         no_progress = 0;
215                         commitRequest();
216                     }
217 
218                     long io = 0;
219                     _endp.flush();
220 
221                     if (_generator.isComplete())
222                     {
223                         if (!_requestComplete)
224                         {
225                             _requestComplete = true;
226                             _exchange.getEventListener().onRequestComplete();
227                         }
228                     }
229                     else
230                     {
231                         // Write as much of the request as possible
232                         synchronized (this)
233                         {
234                             if (_exchange == null)
235                                 continue;
236                         }
237 
238                         long flushed = _generator.flushBuffer();
239                         io += flushed;
240 
241                         if (!_generator.isComplete())
242                         {
243                             if (_exchange!=null)
244                             {
245                                 InputStream in = _exchange.getRequestContentSource();
246                                 if (in != null)
247                                 {
248                                     if (_requestContentChunk == null || _requestContentChunk.length() == 0)
249                                     {
250                                         _requestContentChunk = _exchange.getRequestContentChunk();
251 
252                                         if (_requestContentChunk != null)
253                                             _generator.addContent(_requestContentChunk,false);
254                                         else
255                                             _generator.complete();
256 
257                                         flushed = _generator.flushBuffer();
258                                         io += flushed;
259                                     }
260                                 }
261                                 else
262                                     _generator.complete();
263                             }
264                             else
265                                 _generator.complete();
266                         }
267                     }
268 
269                     if (_generator.isComplete() && !_requestComplete)
270                     {
271                         _requestComplete = true;
272                         _exchange.getEventListener().onRequestComplete();
273                     }
274 
275                     // If we are not ended then parse available
276                     if (!_parser.isComplete() && (_generator.isComplete() || _generator.isCommitted() && !_endp.isBlocking()))
277                     {
278                         long filled = _parser.parseAvailable();
279                         io += filled;
280                     }
281 
282                     if (io > 0)
283                         no_progress = 0;
284                     else if (no_progress++ >= 1 && !_endp.isBlocking())
285                     {
286                         // SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
287                         if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
288                         {
289                             long flushed = _generator.flushBuffer();
290                             if (flushed>0)
291                                 continue;
292                         }
293                         return this;
294                     }
295                 }
296                 catch (Throwable e)
297                 {
298                     LOG.debug("Failure on " + _exchange, e);
299 
300                     if (e instanceof ThreadDeath)
301                         throw (ThreadDeath)e;
302 
303                     failed = true;
304 
305                     synchronized (this)
306                     {
307                         if (_exchange != null)
308                         {
309                             // Cancelling the exchange causes an exception as we close the connection,
310                             // but we don't report it as it is normal cancelling operation
311                             if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
312                                     _exchange.getStatus() != HttpExchange.STATUS_CANCELLED)
313                             {
314                                 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
315                                 _exchange.getEventListener().onException(e);
316                             }
317                         }
318                         else
319                         {
320                             if (e instanceof IOException)
321                                 throw (IOException)e;
322 
323                             if (e instanceof Error)
324                                 throw (Error)e;
325 
326                             if (e instanceof RuntimeException)
327                                 throw (RuntimeException)e;
328 
329                             throw new RuntimeException(e);
330                         }
331                     }
332                 }
333                 finally
334                 {
335                     boolean complete = false;
336                     boolean close = failed; // always close the connection on error
337                     if (!failed)
338                     {
339                         // are we complete?
340                         if (_generator.isComplete())
341                         {
342                             if (!_requestComplete)
343                             {
344                                 _requestComplete = true;
345                                 _exchange.getEventListener().onRequestComplete();
346                             }
347 
348                             // we need to return the HttpConnection to a state that
349                             // it can be reused or closed out
350                             if (_parser.isComplete())
351                             {
352                                 _exchange.cancelTimeout(_destination.getHttpClient());
353                                 complete = true;
354                             }
355                         }
356                     }
357 
358                     if (_generator.isComplete() && !_parser.isComplete())
359                     {
360                         if (!_endp.isOpen() || _endp.isInputShutdown())
361                         {
362                             complete=true;
363                             close=true;
364                             close();
365                         }
366                     }
367 
368                     if (complete || failed)
369                     {
370                         synchronized (this)
371                         {
372                             if (!close)
373                                 close = shouldClose();
374 
375                             reset(true);
376 
377                             no_progress = 0;
378                             if (_exchange != null)
379                             {
380                                 HttpExchange exchange=_exchange;
381                                 _exchange = null;
382 
383                                 // Reset the maxIdleTime because it may have been changed
384                                 if (!close)
385                                     _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
386 
387                                 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
388                                 {
389                                     Connection switched=exchange.onSwitchProtocol(_endp);
390                                     if (switched!=null)
391                                     {
392                                         // switched protocol!
393                                         exchange = _pipeline;
394                                         _pipeline = null;
395                                         if (exchange!=null)
396                                             _destination.send(exchange);
397 
398                                         return switched;
399                                     }
400                                 }
401 
402                                 if (_pipeline == null)
403                                 {
404                                     if (!isReserved())
405                                         _destination.returnConnection(this, close);
406                                 }
407                                 else
408                                 {
409                                     if (close)
410                                     {
411                                         if (!isReserved())
412                                             _destination.returnConnection(this,close);
413 
414                                         exchange = _pipeline;
415                                         _pipeline = null;
416                                         _destination.send(exchange);
417                                     }
418                                     else
419                                     {
420                                         exchange = _pipeline;
421                                         _pipeline = null;
422                                         send(exchange);
423                                     }
424                                 }
425                             }
426                         }
427                     }
428                 }
429             }
430         }
431         finally
432         {
433             _parser.returnBuffers();
434             
435             // Do we have more stuff to write?
436             if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp instanceof AsyncEndPoint)
437             {
438                 // Assume we are write blocked!
439                 ((AsyncEndPoint)_endp).scheduleWrite();
440             }
441         }
442 
443         return this;
444     }
445 
446     public boolean isIdle()
447     {
448         synchronized (this)
449         {
450             return _exchange == null;
451         }
452     }
453 
454     public boolean isSuspended()
455     {
456         return false;
457     }
458 
459     public void closed()
460     {
461     }
462 
463     private void commitRequest() throws IOException
464     {
465         synchronized (this)
466         {
467             _status=0;
468             if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
469                 throw new IllegalStateException();
470 
471             _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
472             _generator.setVersion(_exchange.getVersion());
473 
474             String method=_exchange.getMethod();
475             String uri = _exchange.getURI();
476             if (_destination.isProxied() && !HttpMethods.CONNECT.equals(method) && uri.startsWith("/"))
477             {
478                 boolean secure = _destination.isSecure();
479                 String host = _destination.getAddress().getHost();
480                 int port = _destination.getAddress().getPort();
481                 StringBuilder absoluteURI = new StringBuilder();
482                 absoluteURI.append(secure ? HttpSchemes.HTTPS : HttpSchemes.HTTP);
483                 absoluteURI.append("://");
484                 absoluteURI.append(host);
485                 // Avoid adding default ports
486                 if (!(secure && port == 443 || !secure && port == 80))
487                     absoluteURI.append(":").append(port);
488                 absoluteURI.append(uri);
489                 uri = absoluteURI.toString();
490                 Authentication auth = _destination.getProxyAuthentication();
491                 if (auth != null)
492                     auth.setCredentials(_exchange);
493             }
494 
495             _generator.setRequest(method, uri);
496             _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
497 
498             HttpFields requestHeaders = _exchange.getRequestFields();
499             if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
500             {
501                 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
502                     requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
503             }
504 
505             Buffer requestContent = _exchange.getRequestContent();
506             if (requestContent != null)
507             {
508                 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
509                 _generator.completeHeader(requestHeaders,false);
510                 _generator.addContent(new View(requestContent),true);
511             }
512             else
513             {
514                 InputStream requestContentStream = _exchange.getRequestContentSource();
515                 if (requestContentStream != null)
516                 {
517                     _generator.completeHeader(requestHeaders, false);
518                     int available = requestContentStream.available();
519                     if (available > 0)
520                     {
521                         // TODO deal with any known content length
522                         // TODO reuse this buffer!
523                         byte[] buf = new byte[available];
524                         int length = requestContentStream.read(buf);
525                         _generator.addContent(new ByteArrayBuffer(buf, 0, length), false);
526                     }
527                 }
528                 else
529                 {
530                     requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
531                     _generator.completeHeader(requestHeaders, true);
532                 }
533             }
534 
535             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
536         }
537     }
538 
539     protected void reset(boolean returnBuffers) throws IOException
540     {
541         _requestComplete = false;
542         _connectionHeader = null;
543         _parser.reset();
544         if (returnBuffers)
545             _parser.returnBuffers();
546         _generator.reset(returnBuffers);
547         _http11 = true;
548     }
549 
550     private boolean shouldClose()
551     {
552         if (_connectionHeader!=null)
553         {
554             if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
555                 return true;
556             if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
557                 return false;
558         }
559         return !_http11;
560     }
561 
562     private class Handler extends HttpParser.EventHandler
563     {
564         @Override
565         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
566         {
567             // System.out.println( method.toString() + "///" + url.toString() +
568             // "///" + version.toString() );
569             // TODO validate this is acceptable, the <!DOCTYPE goop was coming
570             // out here
571             // throw new IllegalStateException();
572         }
573 
574         @Override
575         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
576         {
577             HttpExchange exchange = _exchange;
578             if (exchange!=null)
579             {
580                 switch(status)
581                 {
582                     case HttpStatus.CONTINUE_100:
583                     case HttpStatus.PROCESSING_102:
584                         // TODO check if appropriate expect was sent in the request.
585                         exchange.setEventListener(new NonFinalResponseListener(exchange));
586                         break;
587 
588                     case HttpStatus.OK_200:
589                         // handle special case for CONNECT 200 responses
590                         if (HttpMethods.CONNECT.equalsIgnoreCase(exchange.getMethod()))
591                             _parser.setHeadResponse(true);
592                         break;
593                 }
594 
595                 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
596                 _status=status;
597                 exchange.getEventListener().onResponseStatus(version,status,reason);
598                 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
599             }
600         }
601 
602         @Override
603         public void parsedHeader(Buffer name, Buffer value) throws IOException
604         {
605             HttpExchange exchange = _exchange;
606             if (exchange!=null)
607             {
608                 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
609                 {
610                     _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
611                 }
612                 exchange.getEventListener().onResponseHeader(name,value);
613             }
614         }
615 
616         @Override
617         public void headerComplete() throws IOException
618         {
619             if (_endp instanceof AsyncEndPoint)
620                 ((AsyncEndPoint)_endp).scheduleIdle();
621             HttpExchange exchange = _exchange;
622             if (exchange!=null)
623                 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
624         }
625 
626         @Override
627         public void content(Buffer ref) throws IOException
628         {
629             if (_endp instanceof AsyncEndPoint)
630                 ((AsyncEndPoint)_endp).scheduleIdle();
631             HttpExchange exchange = _exchange;
632             if (exchange!=null)
633                 exchange.getEventListener().onResponseContent(ref);
634         }
635 
636         @Override
637         public void messageComplete(long contextLength) throws IOException
638         {
639             HttpExchange exchange = _exchange;
640             if (exchange!=null)
641                 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
642         }
643     }
644 
645     @Override
646     public String toString()
647     {
648         return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
649     }
650 
651     public String toDetailString()
652     {
653         return toString() + " ex=" + _exchange + " idle for " + _idleTimeout.getAge();
654     }
655 
656     public void close() throws IOException
657     {
658         //if there is a live, unfinished exchange, set its status to be
659         //excepted and wake up anyone waiting on waitForDone()
660 
661         HttpExchange exchange = _exchange;
662         if (exchange != null && !exchange.isDone())
663         {
664             switch (exchange.getStatus())
665             {
666                 case HttpExchange.STATUS_CANCELLED:
667                 case HttpExchange.STATUS_CANCELLING:
668                 case HttpExchange.STATUS_COMPLETED:
669                 case HttpExchange.STATUS_EXCEPTED:
670                 case HttpExchange.STATUS_EXPIRED:
671                     break;
672                 default:
673                     String exch= exchange.toString();
674                     String reason = _endp.isOpen()?(_endp.isInputShutdown()?"half closed: ":"local close: "):"closed: ";
675                     exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
676                     exchange.getEventListener().onException(new EOFException(reason+exch));
677             }
678         }
679 
680         _endp.close();
681     }
682 
683     public void setIdleTimeout()
684     {
685         synchronized (this)
686         {
687             if (_idle.compareAndSet(false, true))
688                 _destination.getHttpClient().scheduleIdle(_idleTimeout);
689             else
690                 throw new IllegalStateException();
691         }
692     }
693 
694     public boolean cancelIdleTimeout()
695     {
696         synchronized (this)
697         {
698             if (_idle.compareAndSet(true, false))
699             {
700                 _destination.getHttpClient().cancel(_idleTimeout);
701                 return true;
702             }
703         }
704 
705         return false;
706     }
707 
708     protected void exchangeExpired(HttpExchange exchange)
709     {
710         synchronized (this)
711         {
712             // We are expiring an exchange, but the exchange is pending
713             // Cannot reuse the connection because the reply may arrive, so close it
714             if (_exchange == exchange)
715             {
716                 try
717                 {
718                     _destination.returnConnection(this, true);
719                 }
720                 catch (IOException x)
721                 {
722                     LOG.ignore(x);
723                 }
724             }
725         }
726     }
727     
728     /* ------------------------------------------------------------ */
729     /**
730      * @see org.eclipse.jetty.util.component.Dumpable#dump()
731      */
732     public String dump()
733     {
734         return AggregateLifeCycle.dump(this);
735     }
736 
737     /* ------------------------------------------------------------ */
738     /**
739      * @see org.eclipse.jetty.util.component.Dumpable#dump(java.lang.Appendable, java.lang.String)
740      */
741     public void dump(Appendable out, String indent) throws IOException
742     {
743         synchronized (this)
744         {
745             out.append(String.valueOf(this)).append("\n");
746             AggregateLifeCycle.dump(out,indent,Collections.singletonList(_endp));
747         }
748     }
749     
750     /* ------------------------------------------------------------ */
751     private class ConnectionIdleTask extends Timeout.Task
752     {
753         /* ------------------------------------------------------------ */
754         @Override
755         public void expired()
756         {
757             // Connection idle, close it
758             if (_idle.compareAndSet(true, false))
759             {
760                 _destination.returnIdleConnection(HttpConnection.this);
761             }
762         }
763     }
764     
765     
766     /* ------------------------------------------------------------ */
767     private class NonFinalResponseListener implements HttpEventListener
768     {
769         final HttpExchange _exchange;
770         final HttpEventListener _next;
771         
772         /* ------------------------------------------------------------ */
773         public NonFinalResponseListener(HttpExchange exchange)
774         {
775             _exchange=exchange;
776             _next=exchange.getEventListener();
777         }
778 
779         /* ------------------------------------------------------------ */
780         public void onRequestCommitted() throws IOException
781         {
782         }
783 
784         /* ------------------------------------------------------------ */
785         public void onRequestComplete() throws IOException
786         {
787         }
788 
789         /* ------------------------------------------------------------ */
790         public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
791         {
792         }
793 
794         /* ------------------------------------------------------------ */
795         public void onResponseHeader(Buffer name, Buffer value) throws IOException
796         {
797             _next.onResponseHeader(name,value);
798         }
799 
800         /* ------------------------------------------------------------ */
801         public void onResponseHeaderComplete() throws IOException
802         {
803             _next.onResponseHeaderComplete();
804         }
805 
806         /* ------------------------------------------------------------ */
807         public void onResponseContent(Buffer content) throws IOException
808         {
809         }
810 
811         /* ------------------------------------------------------------ */
812         public void onResponseComplete() throws IOException
813         {
814             _exchange.setEventListener(_next);
815             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
816             _parser.reset();            
817         }
818 
819         /* ------------------------------------------------------------ */
820         public void onConnectionFailed(Throwable ex)
821         {
822             _exchange.setEventListener(_next);
823             _next.onConnectionFailed(ex);
824         }
825 
826         /* ------------------------------------------------------------ */
827         public void onException(Throwable ex)
828         {
829             _exchange.setEventListener(_next);
830             _next.onException(ex);
831         }
832 
833         /* ------------------------------------------------------------ */
834         public void onExpire()
835         {
836             _exchange.setEventListener(_next);
837             _next.onExpire();
838         }
839 
840         /* ------------------------------------------------------------ */
841         public void onRetry()
842         {
843             _exchange.setEventListener(_next);
844             _next.onRetry();
845         }
846     }
847 }