View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.io.InputStream;
18  import java.io.InterruptedIOException;
19  
20  import org.eclipse.jetty.client.security.Authorization;
21  import org.eclipse.jetty.http.HttpFields;
22  import org.eclipse.jetty.http.HttpGenerator;
23  import org.eclipse.jetty.http.HttpHeaderValues;
24  import org.eclipse.jetty.http.HttpHeaders;
25  import org.eclipse.jetty.http.HttpParser;
26  import org.eclipse.jetty.http.HttpSchemes;
27  import org.eclipse.jetty.http.HttpVersions;
28  import org.eclipse.jetty.http.ssl.SslSelectChannelEndPoint;
29  import org.eclipse.jetty.io.Buffer;
30  import org.eclipse.jetty.io.Buffers;
31  import org.eclipse.jetty.io.ByteArrayBuffer;
32  import org.eclipse.jetty.io.Connection;
33  import org.eclipse.jetty.io.EndPoint;
34  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
35  import org.eclipse.jetty.util.log.Log;
36  import org.eclipse.jetty.util.thread.Timeout;
37  
38  /**
39   * 
40   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
41   */
42  public class HttpConnection implements Connection
43  {
44      private HttpDestination _destination;
45      private EndPoint _endp;
46      private HttpGenerator _generator;
47      private HttpParser _parser;
48      private boolean _http11 = true;
49      private Buffer _connectionHeader;
50      private Buffer _requestContentChunk;
51      private long _last;
52      private boolean _requestComplete;
53      private boolean _reserved;
54      // The current exchange waiting for a response
55      private volatile HttpExchange _exchange;
56      private HttpExchange _pipeline;
57      private final Timeout.Task _timeout = new TimeoutTask();
58  
59      public void dump() throws IOException
60      {
61          Log.info("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
62          Log.info("generator=" + _generator);
63          Log.info("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
64          Log.info("exchange=" + _exchange);
65          if (_endp instanceof SslSelectChannelEndPoint)
66              ((SslSelectChannelEndPoint)_endp).dump();
67      }
68  
69      HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
70      {
71          _endp = endp;
72          _generator = new HttpGenerator(requestBuffers,endp);
73          _parser = new HttpParser(responseBuffers,endp,new Handler());
74      }
75  
76      public void setReserved (boolean reserved)
77      {
78          _reserved = reserved;
79      }
80      
81      public boolean isReserved()
82      {
83          return _reserved;
84      }
85      
86      public HttpDestination getDestination()
87      {
88          return _destination;
89      }
90  
91      public void setDestination(HttpDestination destination)
92      {
93          _destination = destination;
94      }
95  
96      public boolean send(HttpExchange ex) throws IOException
97      {
98          synchronized (this)
99          {
100             if (_exchange != null)
101             {
102                 if (_pipeline != null)
103                     throw new IllegalStateException(this + " PIPELINED!!!  _exchange=" + _exchange);
104                 _pipeline = ex;
105                 return true;
106             }
107 
108             if (!_endp.isOpen())
109                 return false;
110 
111             _exchange = ex;
112             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
113 
114             if (_endp.isBlocking())
115             {
116                 this.notify();
117             }
118             else
119             {
120                 SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp;
121                 scep.scheduleWrite();
122             }
123             _destination.getHttpClient().schedule(_timeout);
124 
125             return true;
126         }
127     }
128 
129     public void handle() throws IOException
130     {
131         int no_progress = 0;
132         long flushed = 0;
133 
134         boolean failed = false;
135         while (_endp.isBufferingInput() || _endp.isOpen())
136         {
137             synchronized (this)
138             {
139                 while (_exchange == null)
140                 {
141                     if (_endp.isBlocking())
142                     {
143                         try
144                         {
145                             this.wait();
146                         }
147                         catch (InterruptedException e)
148                         {
149                             throw new InterruptedIOException();
150                         }
151                     }
152                     else
153                     {
154                         // Hopefully just space?
155                         _parser.fill();
156                         _parser.skipCRLF();
157                         if (_parser.isMoreInBuffer())
158                         {
159                             Log.warn("Unexpected data received but no request sent");
160                             close();
161                         }
162                         return;
163                     }
164                 }
165             }
166             if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
167             {
168                 no_progress = 0;
169                 commitRequest();
170             }
171 
172             try
173             {
174                 long io = 0;
175                 _endp.flush();
176 
177                 if (_generator.isComplete())
178                 {
179                     if (!_requestComplete)
180                     {
181                         _requestComplete = true;
182                         _exchange.getEventListener().onRequestComplete();
183                     }
184                 }
185                 else
186                 {
187                     // Write as much of the request as possible
188                     synchronized (this)
189                     {
190                         if (_exchange == null)
191                             continue;
192                         flushed = _generator.flushBuffer();
193                         io += flushed;
194                     }
195 
196                     if (!_generator.isComplete())
197                     {
198                         InputStream in = _exchange.getRequestContentSource();
199                         if (in != null)
200                         {
201                             if (_requestContentChunk == null || _requestContentChunk.length() == 0)
202                             {
203                                 _requestContentChunk = _exchange.getRequestContentChunk();
204                                 if (_requestContentChunk != null)
205                                     _generator.addContent(_requestContentChunk,false);
206                                 else
207                                     _generator.complete();
208                                 io += _generator.flushBuffer();
209                             }
210                         }
211                         else
212                             _generator.complete();
213                     }
214                 }
215                 
216                 if (_generator.isComplete() && !_requestComplete)
217                 {
218                     _requestComplete = true;
219                     _exchange.getEventListener().onRequestComplete();
220                 }
221 
222                 // If we are not ended then parse available
223                 if (!_parser.isComplete() && _generator.isCommitted())
224                 {
225                     long filled = _parser.parseAvailable();
226                     io += filled;
227                 }
228 
229                 if (io > 0)
230                     no_progress = 0;
231                 else if (no_progress++ >= 2 && !_endp.isBlocking())
232                 {
233                     // SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
234                     if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
235                     {
236                         if (_generator.flushBuffer()>0)
237                             continue;
238                     }
239                     return;
240                 }
241             }
242             catch (Throwable e)
243             {                
244                 Log.debug("Failure on " + _exchange, e);
245 
246                 if (e instanceof ThreadDeath)
247                     throw (ThreadDeath)e;
248                 
249                 synchronized (this)
250                 {
251                     if (_exchange != null)
252                     {
253                         _exchange.getEventListener().onException(e);
254                         _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
255                     }
256                 }
257 
258                 failed = true;
259                 if (e instanceof IOException)
260                     throw (IOException)e;
261  
262                 if (e instanceof Error)
263                     throw (Error)e;
264                 
265                 if (e instanceof RuntimeException)
266                     throw (RuntimeException)e;
267                 
268                throw new RuntimeException(e);
269             }
270             finally
271             {
272                 boolean complete = false;
273                 boolean close = failed; // always close the connection on error
274                 if (!failed)
275                 {
276                     // are we complete?
277                     if (_generator.isComplete())
278                     {
279                         if (!_requestComplete)
280                         {
281                             _requestComplete = true;
282                             _exchange.getEventListener().onRequestComplete();
283                         }
284 
285                         // we need to return the HttpConnection to a state that
286                         // it can be reused or closed out
287                         if (_parser.isComplete())
288                         {
289                             _destination.getHttpClient().cancel(_timeout);
290                             complete = true;
291                         }
292                     }
293                 }
294 
295                 if (complete || failed)
296                 {
297                     synchronized (this)
298                     {
299                         if (!close)
300                             close = shouldClose();
301                             
302                         reset(true);
303 
304                         no_progress = 0;
305                         if (_exchange != null)
306                         {
307                             _exchange = null;
308 
309                             if (_pipeline == null)
310                             {
311                                 if (!isReserved())
312                                     _destination.returnConnection(this,close);
313                             }
314                             else
315                             {
316                                 if (close)
317                                 {
318                                     if (!isReserved())
319                                         _destination.returnConnection(this,close);
320 
321                                     HttpExchange exchange = _pipeline;
322                                     _pipeline = null;
323                                     _destination.send(exchange);
324                                 }
325                                 else
326                                 {
327                                     HttpExchange exchange = _pipeline;
328                                     _pipeline = null;
329                                     send(exchange);
330                                 }
331                             }
332                         }
333                     }
334                 }
335             }
336         }
337     }
338 
339     public boolean isIdle()
340     {
341         synchronized (this)
342         {
343             return _exchange == null;
344         }
345     }
346     
347     /**
348      * @see org.eclipse.jetty.io.Connection#isSuspended()
349      */
350     public boolean isSuspended()
351     {
352         return false;
353     }
354 
355     public EndPoint getEndPoint()
356     {
357         return _endp;
358     }
359 
360     private void commitRequest() throws IOException
361     {
362         synchronized (this)
363         {
364             if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
365                 throw new IllegalStateException();
366 
367             _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
368             _generator.setVersion(_exchange.getVersion());
369 
370             String uri = _exchange.getURI();
371             if (_destination.isProxied() && uri.startsWith("/"))
372             {
373                 // TODO suppress port 80 or 443
374                 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
375                         + _destination.getAddress().getPort() + uri;
376                 Authorization auth = _destination.getProxyAuthentication();
377                 if (auth != null)
378                     auth.setCredentials(_exchange);
379             }
380 
381             _generator.setRequest(_exchange.getMethod(), uri);
382 
383             HttpFields requestHeaders = _exchange.getRequestFields();
384             if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
385             {
386                 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
387                     requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
388             }
389 
390             Buffer requestContent = _exchange.getRequestContent();
391             if (requestContent != null)
392             {
393                 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
394                 _generator.completeHeader(requestHeaders,false);
395                 _generator.addContent(requestContent,true);
396             }
397             else
398             {
399                 InputStream requestContentStream = _exchange.getRequestContentSource();
400                 if (requestContentStream != null)
401                 {
402                     _generator.completeHeader(requestHeaders, false);
403                     int available = requestContentStream.available();
404                     if (available > 0)
405                     {
406                         // TODO deal with any known content length
407                         // TODO reuse this buffer!
408                         byte[] buf = new byte[available];
409                         int length = requestContentStream.read(buf);
410                         _generator.addContent(new ByteArrayBuffer(buf,0,length),false);
411                     }
412                 }
413                 else
414                 {
415                     requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
416                     _generator.completeHeader(requestHeaders, true);
417                 }
418             }
419 
420             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
421         }
422     }
423 
424     protected void reset(boolean returnBuffers) throws IOException
425     {
426         _requestComplete = false;
427         _connectionHeader = null;
428         _parser.reset(returnBuffers);
429         _generator.reset(returnBuffers);
430         _http11 = true;
431     }
432 
433     private boolean shouldClose()
434     {
435         if (_connectionHeader!=null)
436         {
437             if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
438                 return true;
439             if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
440                 return false;
441         }
442         return !_http11;
443     }
444 
445     private class Handler extends HttpParser.EventHandler
446     {
447         @Override
448         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
449         {
450             // System.out.println( method.toString() + "///" + url.toString() +
451             // "///" + version.toString() );
452             // TODO validate this is acceptable, the <!DOCTYPE goop was coming
453             // out here
454             // throw new IllegalStateException();
455         }
456 
457         @Override
458         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
459         {
460             HttpExchange exchange = _exchange;
461             if (exchange!=null)
462             {
463                 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
464                 exchange.getEventListener().onResponseStatus(version,status,reason);
465                 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
466             }
467         }
468 
469         @Override
470         public void parsedHeader(Buffer name, Buffer value) throws IOException
471         {
472             HttpExchange exchange = _exchange;
473             if (exchange!=null)
474             {
475                 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
476                 {
477                     _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
478                 }
479                 exchange.getEventListener().onResponseHeader(name,value);
480             }
481         }
482 
483         @Override
484         public void headerComplete() throws IOException
485         {
486             HttpExchange exchange = _exchange;
487             if (exchange!=null)
488                 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
489         }
490 
491         @Override
492         public void content(Buffer ref) throws IOException
493         {
494             HttpExchange exchange = _exchange;
495             if (exchange!=null)
496                 exchange.getEventListener().onResponseContent(ref);
497         }
498 
499         @Override
500         public void messageComplete(long contextLength) throws IOException
501         {
502             HttpExchange exchange = _exchange;
503             if (exchange!=null)
504                 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
505         }
506     }
507 
508     public String toString()
509     {
510         return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
511     }
512 
513     public String toDetailString()
514     {
515         return toString() + " ex=" + _exchange + " " + _timeout.getAge();
516     }
517 
518     /**
519      * @return the last
520      */
521     public long getLast()
522     {
523         return _last;
524     }
525 
526     /**
527      * @param last
528      *            the last to set
529      */
530     public void setLast(long last)
531     {
532         _last = last;
533     }
534 
535     public void close() throws IOException
536     {
537         _endp.close();
538     }
539 
540     private class TimeoutTask extends Timeout.Task
541     {
542         public void expired()
543         {
544             HttpExchange ex = null;
545             try
546             {
547                 synchronized (HttpConnection.this)
548                 {
549                     ex = _exchange;
550                     _exchange = null;
551                     if (ex != null)
552                     {
553                         _destination.returnConnection(HttpConnection.this, true);
554                     }
555                 }
556             }
557             catch (Exception e)
558             {
559                 Log.debug(e);
560             }
561             finally
562             {
563                 try
564                 {
565                     close();
566                 }
567                 catch (IOException e)
568                 {
569                     Log.ignore(e);
570                 }
571 
572                 if (ex != null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
573                 {
574                     ex.setStatus(HttpExchange.STATUS_EXPIRED);
575                 }
576             }
577         }
578     }
579 }