View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2011 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  
18  import org.eclipse.jetty.http.AbstractGenerator;
19  import org.eclipse.jetty.http.HttpStatus;
20  import org.eclipse.jetty.io.AsyncEndPoint;
21  import org.eclipse.jetty.io.Buffer;
22  import org.eclipse.jetty.io.Buffers;
23  import org.eclipse.jetty.io.Connection;
24  import org.eclipse.jetty.io.EndPoint;
25  import org.eclipse.jetty.io.nio.AsyncConnection;
26  import org.eclipse.jetty.util.log.Log;
27  import org.eclipse.jetty.util.log.Logger;
28  
29  
30  
31  /* ------------------------------------------------------------ */
32  /** Asynchronous Client HTTP Connection
33   */
34  public class AsyncHttpConnection extends AbstractHttpConnection implements AsyncConnection
35  {
36      private static final Logger LOG = Log.getLogger(AsyncHttpConnection.class);
37  
38      private boolean _requestComplete;
39      private Buffer _requestContentChunk;
40      private final AsyncEndPoint _asyncEndp;
41  
42      AsyncHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
43      {
44          super(requestBuffers,responseBuffers,endp);
45          _asyncEndp=(AsyncEndPoint)endp;
46      }
47  
48      protected void reset() throws IOException
49      {
50          _requestComplete = false;
51          super.reset();
52      }
53  
54      public Connection handle() throws IOException
55      {
56          Connection connection = this;
57          boolean progress=true;
58  
59          try
60          {
61              boolean failed = false;
62  
63              // While we are making progress and have not changed connection
64              while (progress && connection==this)
65              {
66                  LOG.debug("while open={} more={} progress={}",_endp.isOpen(),_parser.isMoreInBuffer(),progress);
67  
68                  progress=false;
69                  HttpExchange exchange=_exchange;
70  
71                  LOG.debug("exchange {} on {}",exchange,this);
72  
73                  try
74                  {
75                      // Should we commit the request?
76                      if (!_generator.isCommitted() && exchange!=null && exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
77                      {
78                          LOG.debug("commit {}",exchange);
79                          progress=true;
80                          commitRequest();
81                      }
82  
83                      // Generate output
84                      if (_generator.isCommitted() && !_generator.isComplete())
85                      {
86                          if (_generator.flushBuffer()>0)
87                          {
88                              LOG.debug("flushed");
89                              progress=true;
90                          }
91  
92                          // Is there more content to send or should we complete the generator
93                          if (_generator.isState(AbstractGenerator.STATE_CONTENT))
94                          {
95                              // Look for more content to send.
96                              if (_requestContentChunk==null)
97                                  _requestContentChunk = exchange.getRequestContentChunk(null);
98  
99                              if (_requestContentChunk==null)
100                             {
101                                 LOG.debug("complete {}",exchange);
102                                 progress=true;
103                                 _generator.complete();
104                             }
105                             else if (_generator.isEmpty())
106                             {
107                                 LOG.debug("addChunk");
108                                 progress=true;
109                                 Buffer chunk=_requestContentChunk;
110                                 _requestContentChunk=exchange.getRequestContentChunk(null);
111                                 _generator.addContent(chunk,_requestContentChunk==null);
112                             }
113                         }
114                     }
115 
116                     // Signal request completion
117                     if (_generator.isComplete() && !_requestComplete)
118                     {
119                         LOG.debug("requestComplete {}",exchange);
120                         progress=true;
121                         _requestComplete = true;
122                         exchange.getEventListener().onRequestComplete();
123                     }
124 
125                     // Read any input that is available
126                     if (!_parser.isComplete() && _parser.parseAvailable())
127                     {
128                         LOG.debug("parsed {}",exchange);
129                         progress=true;
130                     }
131 
132                     // Flush output
133                     _endp.flush();
134 
135                     // Has any IO been done by the endpoint itself since last loop
136                     if (_asyncEndp.hasProgressed())
137                     {
138                         LOG.debug("hasProgressed {}",exchange);
139                         progress=true;
140                     }
141                 }
142                 catch (Throwable e)
143                 {
144                     LOG.debug("Failure on " + _exchange, e);
145 
146                     failed = true;
147 
148                     synchronized (this)
149                     {
150                         if (exchange != null)
151                         {
152                             // Cancelling the exchange causes an exception as we close the connection,
153                             // but we don't report it as it is normal cancelling operation
154                             if (exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
155                                     exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
156                                     !exchange.isDone())
157                             {
158                                 if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
159                                     exchange.getEventListener().onException(e);
160                             }
161                         }
162                         else
163                         {
164                             if (e instanceof IOException)
165                                 throw (IOException)e;
166                             if (e instanceof Error)
167                                 throw (Error)e;
168                             if (e instanceof RuntimeException)
169                                 throw (RuntimeException)e;
170                             throw new RuntimeException(e);
171                         }
172                     }
173                 }
174                 finally
175                 {
176                     LOG.debug("finally {} on {} progress={} {}",exchange,this,progress,_endp);
177 
178                     boolean complete = failed || _generator.isComplete() && _parser.isComplete();
179 
180                     if (complete)
181                     {
182                         boolean persistent = !failed && _parser.isPersistent() && _generator.isPersistent();
183                         _generator.setPersistent(persistent);
184                         reset();
185                         if (persistent)
186                             _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
187 
188                         synchronized (this)
189                         {
190                             exchange=_exchange;
191                             _exchange = null;
192 
193                             // Cancel the exchange
194                             if (exchange!=null)
195                             {
196                                 exchange.cancelTimeout(_destination.getHttpClient());
197 
198                                 // TODO should we check the exchange is done?
199                             }
200 
201                             // handle switched protocols
202                             if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
203                             {
204                                 Connection switched=exchange.onSwitchProtocol(_endp);
205                                 if (switched!=null)
206                                     connection=switched;
207                                 {
208                                     // switched protocol!
209                                     _pipeline = null;
210                                     if (_pipeline!=null)
211                                         _destination.send(_pipeline);
212                                     _pipeline = null;
213 
214                                     connection=switched;
215                                 }
216                             }
217 
218                             // handle pipelined requests
219                             if (_pipeline!=null)
220                             {
221                                 if (!persistent || connection!=this)
222                                     _destination.send(_pipeline);
223                                 else
224                                     _exchange=_pipeline;
225                                 _pipeline=null;
226                             }
227 
228                             if (_exchange==null && !isReserved())  // TODO how do we return switched connections?
229                                 _destination.returnConnection(this, !persistent);
230                         }
231 
232                     }
233                 }
234             }
235         }
236         finally
237         {
238             _parser.returnBuffers();
239             _generator.returnBuffers();
240             LOG.debug("unhandle {} on {}",_exchange,_endp);
241         }
242 
243         return connection;
244     }
245 
246     public void onInputShutdown() throws IOException
247     {
248         if (_generator.isIdle())
249             _endp.shutdownOutput();
250     }
251 
252     @Override
253     public boolean send(HttpExchange ex) throws IOException
254     {
255         boolean sent=super.send(ex);
256         if (sent)
257             _asyncEndp.asyncDispatch();
258         return sent;
259     }
260 }