View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2011 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.io.InterruptedIOException;
18  
19  import org.eclipse.jetty.http.AbstractGenerator;
20  import org.eclipse.jetty.http.HttpStatus;
21  import org.eclipse.jetty.io.Buffer;
22  import org.eclipse.jetty.io.Buffers;
23  import org.eclipse.jetty.io.Connection;
24  import org.eclipse.jetty.io.EndPoint;
25  import org.eclipse.jetty.util.log.Log;
26  import org.eclipse.jetty.util.log.Logger;
27  
28  
29  /* ------------------------------------------------------------ */
30  /** Blocking HTTP Connection
31   */
32  public class BlockingHttpConnection extends AbstractHttpConnection
33  {
34      private static final Logger LOG = Log.getLogger(BlockingHttpConnection.class);
35  
36      private boolean _requestComplete;
37      private Buffer _requestContentChunk;
38  
39      BlockingHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endPoint)
40      {
41          super(requestBuffers, responseBuffers, endPoint);
42      }
43  
44      protected void reset() throws IOException
45      {
46          _requestComplete = false;
47          super.reset();
48      }
49  
50      @Override
51      public Connection handle() throws IOException
52      {
53          Connection connection = this;
54  
55          try
56          {
57              boolean failed = false;
58  
59  
60              // While we are making progress and have not changed connection
61              while (_endp.isOpen() && connection==this)
62              {
63                  LOG.debug("open={} more={}",_endp.isOpen(),_parser.isMoreInBuffer());
64  
65                  HttpExchange exchange;
66                  synchronized (this)
67                  {
68                      exchange=_exchange;
69  
70                      while (exchange == null)
71                      {
72                          try
73                          {
74                              this.wait();
75                              exchange=_exchange;
76                          }
77                          catch (InterruptedException e)
78                          {
79                              throw new InterruptedIOException();
80                          }
81                      }
82                  }
83                  LOG.debug("exchange {}",exchange);
84  
85                  try
86                  {
87                      // Should we commit the request?
88                      if (!_generator.isCommitted() && exchange!=null && exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
89                      {
90                          LOG.debug("commit");
91                          commitRequest();
92                      }
93  
94                      // Generate output
95                      while (_generator.isCommitted() && !_generator.isComplete())
96                      {
97                          if (_generator.flushBuffer()>0)
98                          {
99                              LOG.debug("flushed");
100                         }
101 
102                         // Is there more content to send or should we complete the generator
103                         if (_generator.isState(AbstractGenerator.STATE_CONTENT))
104                         {
105                             // Look for more content to send.
106                             if (_requestContentChunk==null)
107                                 _requestContentChunk = exchange.getRequestContentChunk(null);
108 
109                             if (_requestContentChunk==null)
110                             {
111                                 LOG.debug("complete");
112                                 _generator.complete();
113                             }
114                             else if (_generator.isEmpty())
115                             {
116                                 LOG.debug("addChunk");
117                                 Buffer chunk=_requestContentChunk;
118                                 _requestContentChunk=exchange.getRequestContentChunk(null);
119                                 _generator.addContent(chunk,_requestContentChunk==null);
120                             }
121                         }
122                     }
123 
124                     // Signal request completion
125                     if (_generator.isComplete() && !_requestComplete)
126                     {
127                         LOG.debug("requestComplete");
128                         _requestComplete = true;
129                         exchange.getEventListener().onRequestComplete();
130                     }
131 
132                     // Read any input that is available
133                     if (!_parser.isComplete() && _parser.parseAvailable())
134                     {
135                         LOG.debug("parsed");
136                     }
137 
138                     // Flush output
139                     _endp.flush();
140                 }
141                 catch (Throwable e)
142                 {
143                     LOG.debug("Failure on " + _exchange, e);
144 
145                     failed = true;
146 
147                     synchronized (this)
148                     {
149                         if (exchange != null)
150                         {
151                             // Cancelling the exchange causes an exception as we close the connection,
152                             // but we don't report it as it is normal cancelling operation
153                             if (exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
154                                     exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
155                                     !exchange.isDone())
156                             {
157                                 if(exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
158                                     exchange.getEventListener().onException(e);
159                             }
160                         }
161                         else
162                         {
163                             if (e instanceof IOException)
164                                 throw (IOException)e;
165                             if (e instanceof Error)
166                                 throw (Error)e;
167                             if (e instanceof RuntimeException)
168                                 throw (RuntimeException)e;
169                             throw new RuntimeException(e);
170                         }
171                     }
172                 }
173                 finally
174                 {
175                     LOG.debug("{} {}",_generator, _parser);
176                     LOG.debug("{}",_endp);
177 
178                     boolean complete = failed || _generator.isComplete() && _parser.isComplete();
179 
180                     if (complete)
181                     {
182                         boolean persistent = !failed && _parser.isPersistent() && _generator.isPersistent();
183                         _generator.setPersistent(persistent);
184                         reset();
185                         if (persistent)
186                             _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
187 
188                         synchronized (this)
189                         {
190                             exchange=_exchange;
191                             _exchange = null;
192 
193                             // Cancel the exchange
194                             if (exchange!=null)
195                             {
196                                 exchange.cancelTimeout(_destination.getHttpClient());
197 
198                                 // TODO should we check the exchange is done?
199                             }
200 
201                             // handle switched protocols
202                             if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
203                             {
204                                 Connection switched=exchange.onSwitchProtocol(_endp);
205                                 if (switched!=null)
206                                     connection=switched;
207                                 {
208                                     // switched protocol!
209                                     _pipeline = null;
210                                     if (_pipeline!=null)
211                                         _destination.send(_pipeline);
212                                     _pipeline = null;
213 
214                                     connection=switched;
215                                 }
216                             }
217 
218                             // handle pipelined requests
219                             if (_pipeline!=null)
220                             {
221                                 if (!persistent || connection!=this)
222                                     _destination.send(_pipeline);
223                                 else
224                                     _exchange=_pipeline;
225                                 _pipeline=null;
226                             }
227 
228                             if (_exchange==null && !isReserved())  // TODO how do we return switched connections?
229                                 _destination.returnConnection(this, !persistent);
230                         }
231                     }
232                 }
233             }
234         }
235         finally
236         {
237             _parser.returnBuffers();
238             _generator.returnBuffers();
239         }
240 
241         return connection;
242     }
243 
244     @Override
245     public boolean send(HttpExchange ex) throws IOException
246     {
247         boolean sent=super.send(ex);
248         if (sent)
249         {
250             synchronized (this)
251             {
252                 notifyAll();
253             }
254         }
255         return sent;
256     }
257 }