View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  
23  import org.eclipse.jetty.http.AbstractGenerator;
24  import org.eclipse.jetty.http.HttpStatus;
25  import org.eclipse.jetty.io.AsyncEndPoint;
26  import org.eclipse.jetty.io.Buffer;
27  import org.eclipse.jetty.io.Buffers;
28  import org.eclipse.jetty.io.Connection;
29  import org.eclipse.jetty.io.EndPoint;
30  import org.eclipse.jetty.io.nio.AsyncConnection;
31  import org.eclipse.jetty.util.log.Log;
32  import org.eclipse.jetty.util.log.Logger;
33  
34  
35  
36  /* ------------------------------------------------------------ */
37  /** Asynchronous Client HTTP Connection
38   */
39  public class AsyncHttpConnection extends AbstractHttpConnection implements AsyncConnection
40  {
41      private static final Logger LOG = Log.getLogger(AsyncHttpConnection.class);
42  
43      private boolean _requestComplete;
44      private Buffer _requestContentChunk;
45      private final AsyncEndPoint _asyncEndp;
46  
47      AsyncHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
48      {
49          super(requestBuffers,responseBuffers,endp);
50          _asyncEndp=(AsyncEndPoint)endp;
51      }
52  
53      protected void reset() throws IOException
54      {
55          _requestComplete = false;
56          super.reset();
57      }
58  
59      public Connection handle() throws IOException
60      {
61          Connection connection = this;
62          boolean progress=true;
63  
64          try
65          {
66              boolean failed = false;
67  
68              // While we are making progress and have not changed connection
69              while (progress && connection==this)
70              {
71                  LOG.debug("while open={} more={} progress={}",_endp.isOpen(),_parser.isMoreInBuffer(),progress);
72  
73                  progress=false;
74                  HttpExchange exchange=_exchange;
75  
76                  LOG.debug("exchange {} on {}",exchange,this);
77  
78                  try
79                  {
80                      // Should we commit the request?
81                      if (!_generator.isCommitted() && exchange!=null && exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
82                      {
83                          LOG.debug("commit {}",exchange);
84                          progress=true;
85                          commitRequest();
86                      }
87  
88                      // Generate output
89                      if (_generator.isCommitted() && !_generator.isComplete())
90                      {
91                          if (_generator.flushBuffer()>0)
92                          {
93                              LOG.debug("flushed");
94                              progress=true;
95                          }
96  
97                          // Is there more content to send or should we complete the generator
98                          if (_generator.isState(AbstractGenerator.STATE_CONTENT))
99                          {
100                             // Look for more content to send.
101                             if (_requestContentChunk==null)
102                                 _requestContentChunk = exchange.getRequestContentChunk(null);
103 
104                             if (_requestContentChunk==null)
105                             {
106                                 LOG.debug("complete {}",exchange);
107                                 progress=true;
108                                 _generator.complete();
109                             }
110                             else if (_generator.isEmpty())
111                             {
112                                 LOG.debug("addChunk");
113                                 progress=true;
114                                 Buffer chunk=_requestContentChunk;
115                                 _requestContentChunk=exchange.getRequestContentChunk(null);
116                                 _generator.addContent(chunk,_requestContentChunk==null);
117                                 if (_requestContentChunk==null)
118                                     exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
119                             }
120                         }
121                     }
122 
123                     // Signal request completion
124                     if (_generator.isComplete() && !_requestComplete)
125                     {
126                         LOG.debug("requestComplete {}",exchange);
127                         progress=true;
128                         _requestComplete = true;
129                         exchange.getEventListener().onRequestComplete();
130                     }
131 
132                     // Read any input that is available
133                     if (!_parser.isComplete() && _parser.parseAvailable())
134                     {
135                         LOG.debug("parsed {}",exchange);
136                         progress=true;
137                     }
138 
139                     // Flush output
140                     _endp.flush();
141 
142                     // Has any IO been done by the endpoint itself since last loop
143                     if (_asyncEndp.hasProgressed())
144                     {
145                         LOG.debug("hasProgressed {}",exchange);
146                         progress=true;
147                     }
148                 }
149                 catch (Throwable e)
150                 {
151                     LOG.debug("Failure on " + _exchange, e);
152 
153                     failed = true;
154 
155                     synchronized (this)
156                     {
157                         if (exchange != null)
158                         {
159                             // Cancelling the exchange causes an exception as we close the connection,
160                             // but we don't report it as it is normal cancelling operation
161                             if (exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
162                                     exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
163                                     !exchange.isDone())
164                             {
165                                 if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
166                                     exchange.getEventListener().onException(e);
167                             }
168                         }
169                         else
170                         {
171                             if (e instanceof IOException)
172                                 throw (IOException)e;
173                             if (e instanceof Error)
174                                 throw (Error)e;
175                             if (e instanceof RuntimeException)
176                                 throw (RuntimeException)e;
177                             throw new RuntimeException(e);
178                         }
179                     }
180                 }
181                 finally
182                 {
183                     LOG.debug("finally {} on {} progress={} {}",exchange,this,progress,_endp);
184 
185                     boolean complete = failed || _generator.isComplete() && _parser.isComplete();
186 
187                     if (complete)
188                     {
189                         boolean persistent = !failed && _parser.isPersistent() && _generator.isPersistent();
190                         _generator.setPersistent(persistent);
191                         reset();
192                         if (persistent)
193                             _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
194 
195                         synchronized (this)
196                         {
197                             exchange=_exchange;
198                             _exchange = null;
199 
200                             // Cancel the exchange
201                             if (exchange!=null)
202                             {
203                                 exchange.cancelTimeout(_destination.getHttpClient());
204 
205                                 // TODO should we check the exchange is done?
206                             }
207 
208                             // handle switched protocols
209                             if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
210                             {
211                                 Connection switched=exchange.onSwitchProtocol(_endp);
212                                 if (switched!=null)
213                                 {
214                                     // switched protocol!
215                                     if (_pipeline!=null)
216                                     {
217                                         _destination.send(_pipeline);
218                                     }
219                                     _pipeline = null;
220 
221                                     connection=switched;
222                                 }
223                             }
224 
225                             // handle pipelined requests
226                             if (_pipeline!=null)
227                             {
228                                 if (!persistent || connection!=this)
229                                     _destination.send(_pipeline);
230                                 else
231                                     _exchange=_pipeline;
232                                 _pipeline=null;
233                             }
234 
235                             if (_exchange==null && !isReserved())  // TODO how do we return switched connections?
236                                 _destination.returnConnection(this, !persistent);
237                         }
238 
239                     }
240                 }
241             }
242         }
243         finally
244         {
245             _parser.returnBuffers();
246             _generator.returnBuffers();
247             LOG.debug("unhandle {} on {}",_exchange,_endp);
248         }
249 
250         return connection;
251     }
252 
253     public void onInputShutdown() throws IOException
254     {
255         if (_generator.isIdle())
256             _endp.shutdownOutput();
257     }
258 
259     @Override
260     public boolean send(HttpExchange ex) throws IOException
261     {
262         boolean sent=super.send(ex);
263         if (sent)
264             _asyncEndp.asyncDispatch();
265         return sent;
266     }
267 }