View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.io.InputStream;
18  import java.io.InterruptedIOException;
19  import java.util.concurrent.atomic.AtomicBoolean;
20  
21  import org.eclipse.jetty.client.security.Authentication;
22  import org.eclipse.jetty.http.HttpFields;
23  import org.eclipse.jetty.http.HttpGenerator;
24  import org.eclipse.jetty.http.HttpHeaderValues;
25  import org.eclipse.jetty.http.HttpHeaders;
26  import org.eclipse.jetty.http.HttpMethods;
27  import org.eclipse.jetty.http.HttpParser;
28  import org.eclipse.jetty.http.HttpSchemes;
29  import org.eclipse.jetty.http.HttpStatus;
30  import org.eclipse.jetty.http.HttpVersions;
31  import org.eclipse.jetty.io.AsyncEndPoint;
32  import org.eclipse.jetty.io.Buffer;
33  import org.eclipse.jetty.io.Buffers;
34  import org.eclipse.jetty.io.ByteArrayBuffer;
35  import org.eclipse.jetty.io.Connection;
36  import org.eclipse.jetty.io.EndPoint;
37  import org.eclipse.jetty.io.View;
38  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
39  import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.thread.Timeout;
42  
43  /**
44   *
45   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
46   */
47  public class HttpConnection implements Connection
48  {
49      private HttpDestination _destination;
50      private EndPoint _endp;
51      private HttpGenerator _generator;
52      private HttpParser _parser;
53      private boolean _http11 = true;
54      private int _status;
55      private Buffer _connectionHeader;
56      private Buffer _requestContentChunk;
57      private boolean _requestComplete;
58      private boolean _reserved;
59      // The current exchange waiting for a response
60      private volatile HttpExchange _exchange;
61      private HttpExchange _pipeline;
62      private final Timeout.Task _timeout = new TimeoutTask();
63      private AtomicBoolean _idle = new AtomicBoolean(false);
64  
65      public void dump() throws IOException
66      {
67          Log.info("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
68          Log.info("generator=" + _generator);
69          Log.info("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
70          Log.info("exchange=" + _exchange);
71          if (_endp instanceof SslSelectChannelEndPoint)
72              ((SslSelectChannelEndPoint)_endp).dump();
73      }
74  
75      HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
76      {
77          _endp = endp;
78          _generator = new HttpGenerator(requestBuffers,endp);
79          _parser = new HttpParser(responseBuffers,endp,new Handler());
80      }
81  
82      public long getTimeStamp()
83      {
84          return -1;
85      }
86  
87      public void setReserved (boolean reserved)
88      {
89          _reserved = reserved;
90      }
91  
92      public boolean isReserved()
93      {
94          return _reserved;
95      }
96  
97      public HttpDestination getDestination()
98      {
99          return _destination;
100     }
101 
102     public void setDestination(HttpDestination destination)
103     {
104         _destination = destination;
105     }
106 
107     public boolean send(HttpExchange ex) throws IOException
108     {
109         synchronized (this)
110         {
111             if (_exchange != null)
112             {
113                 if (_pipeline != null)
114                     throw new IllegalStateException(this + " PIPELINED!!!  _exchange=" + _exchange);
115                 _pipeline = ex;
116                 return true;
117             }
118 
119             if (!_endp.isOpen())
120                 return false;
121 
122             _exchange = ex;
123             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
124 
125             if (_endp.isBlocking())
126             {
127                 this.notify();
128             }
129             else
130             {
131                 AsyncEndPoint scep = (AsyncEndPoint)_endp;
132                 scep.scheduleWrite();
133             }
134             _destination.getHttpClient().schedule(_timeout);
135 
136             return true;
137         }
138     }
139 
140     public Connection handle() throws IOException
141     {
142         if (_exchange != null)
143             _exchange.associate(this);
144 
145         try
146         {
147             int no_progress = 0;
148 
149             boolean failed = false;
150             while (_endp.isBufferingInput() || _endp.isOpen())
151             {
152                 synchronized (this)
153                 {
154                     while (_exchange == null)
155                     {
156                         if (_endp.isBlocking())
157                         {
158                             try
159                             {
160                                 this.wait();
161                             }
162                             catch (InterruptedException e)
163                             {
164                                 throw new InterruptedIOException();
165                             }
166                         }
167                         else
168                         {
169                             // Hopefully just space?
170                             _parser.fill();
171                             _parser.skipCRLF();
172                             if (_parser.isMoreInBuffer())
173                             {
174                                 Log.warn("Unexpected data received but no request sent");
175                                 close();
176                             }
177                             return this;
178                         }
179                     }
180                     if (!_exchange.isAssociated())
181                         _exchange.associate(this);
182                 }
183 
184                 if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
185                 {
186                     no_progress = 0;
187                     commitRequest();
188                 }
189 
190                 try
191                 {
192                     long io = 0;
193                     _endp.flush();
194 
195                     if (_generator.isComplete())
196                     {
197                         if (!_requestComplete)
198                         {
199                             _requestComplete = true;
200                             _exchange.getEventListener().onRequestComplete();
201                         }
202                     }
203                     else
204                     {
205                         // Write as much of the request as possible
206                         synchronized (this)
207                         {
208                             if (_exchange == null)
209                                 continue;
210                         }
211                         
212                         long flushed = _generator.flushBuffer();
213                         io += flushed;
214 
215                         if (!_generator.isComplete())
216                         {
217                             if (_exchange!=null)
218                             {
219                                 InputStream in = _exchange.getRequestContentSource();
220                                 if (in != null)
221                                 {
222                                     if (_requestContentChunk == null || _requestContentChunk.length() == 0)
223                                     {
224                                         _requestContentChunk = _exchange.getRequestContentChunk();
225                                         _destination.getHttpClient().schedule(_timeout);
226 
227                                         if (_requestContentChunk != null)
228                                             _generator.addContent(_requestContentChunk,false);
229                                         else
230                                             _generator.complete();
231 
232                                         flushed = _generator.flushBuffer();
233                                         io += flushed;
234                                     }
235                                 }
236                                 else
237                                     _generator.complete();
238                             }                            
239                             else
240                                 _generator.complete();
241                         }
242                     }
243 
244                     if (_generator.isComplete() && !_requestComplete)
245                     {
246                         _requestComplete = true;
247                         _exchange.getEventListener().onRequestComplete();
248                     }
249 
250                     // If we are not ended then parse available
251                     if (!_parser.isComplete() && (_generator.isComplete() || _generator.isCommitted() && !_endp.isBlocking()))
252                     {
253                         long filled = _parser.parseAvailable();
254                         io += filled;
255                     }
256                     
257                     if (io > 0)
258                         no_progress = 0;
259                     else if (no_progress++ >= 2 && !_endp.isBlocking())
260                     {
261                         // SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
262                         if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
263                         {
264                             long flushed = _generator.flushBuffer();
265                             if (flushed>0)
266                                 continue;
267                         }
268                         return this;
269                     }
270                 }
271                 catch (Throwable e)
272                 {
273                     Log.debug("Failure on " + _exchange, e);
274 
275                     if (e instanceof ThreadDeath)
276                         throw (ThreadDeath)e;
277 
278                     synchronized (this)
279                     {
280                         if (_exchange != null)
281                         {
282                             // Cancelling the exchange causes an exception as we close the connection,
283                             // but we don't report it as it is normal cancelling operation
284                             if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING)
285                             {
286                                 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
287                                 _exchange.getEventListener().onException(e);
288                             }
289                         }
290                     }
291 
292                     failed = true;
293                     if (e instanceof IOException)
294                         throw (IOException)e;
295 
296                     if (e instanceof Error)
297                         throw (Error)e;
298 
299                     if (e instanceof RuntimeException)
300                         throw (RuntimeException)e;
301 
302                     throw new RuntimeException(e);
303                 }
304                 finally
305                 {
306                     boolean complete = false;
307                     boolean close = failed; // always close the connection on error
308                     if (!failed)
309                     {
310                         // are we complete?
311                         if (_generator.isComplete())
312                         {
313                             if (!_requestComplete)
314                             {
315                                 _requestComplete = true;
316                                 _exchange.getEventListener().onRequestComplete();
317                             }
318 
319                             // we need to return the HttpConnection to a state that
320                             // it can be reused or closed out
321                             if (_parser.isComplete())
322                             {
323                                 _destination.getHttpClient().cancel(_timeout);
324                                 complete = true;
325                             }
326                         }
327                     }
328 
329                     if (complete || failed)
330                     {
331                         synchronized (this)
332                         {
333                             if (!close)
334                                 close = shouldClose();
335 
336                             reset(true);
337 
338                             no_progress = 0;
339                             if (_exchange != null)
340                             {
341                                 HttpExchange exchange=_exchange;
342                                 _exchange.disassociate();
343                                 _exchange = null;
344                                 
345                                 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
346                                 {
347                                     Connection switched=exchange.onSwitchProtocol(_endp);
348                                     if (switched!=null)
349                                     {    
350                                         // switched protocol!
351                                         exchange = _pipeline;
352                                         _pipeline = null;
353                                         if (exchange!=null)
354                                             _destination.send(exchange);
355 
356                                         return switched;
357                                     }
358                                 }
359 
360                                 if (_pipeline == null)
361                                 {
362                                     if (!isReserved())
363                                         _destination.returnConnection(this, close);
364                                 }
365                                 else
366                                 {
367                                     if (close)
368                                     {
369                                         if (!isReserved())
370                                             _destination.returnConnection(this,close);
371 
372                                         exchange = _pipeline;
373                                         _pipeline = null;
374                                         _destination.send(exchange);
375                                     }
376                                     else
377                                     {
378                                         exchange = _pipeline;
379                                         _pipeline = null;
380                                         send(exchange);
381                                     }
382                                 }
383 
384                             }
385                         }
386                     }
387                 }
388             }
389         }
390         finally
391         {
392             if (_exchange != null && _exchange.isAssociated())
393             {
394                 _exchange.disassociate();
395             }
396             
397             if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp instanceof AsyncEndPoint)
398             {                        
399                 ((AsyncEndPoint)_endp).setWritable(false);
400             }
401         }
402         
403         return this;
404     }
405 
406     public boolean isIdle()
407     {
408         synchronized (this)
409         {
410             return _exchange == null;
411         }
412     }
413 
414     /**
415      * @see org.eclipse.jetty.io.Connection#isSuspended()
416      */
417     public boolean isSuspended()
418     {
419         return false;
420     }
421 
422     public EndPoint getEndPoint()
423     {
424         return _endp;
425     }
426 
427     private void commitRequest() throws IOException
428     {
429         synchronized (this)
430         {
431             _status=0;
432             if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
433                 throw new IllegalStateException();
434 
435             _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
436             _generator.setVersion(_exchange.getVersion());
437 
438             String uri = _exchange.getURI();
439             if (_destination.isProxied() && uri.startsWith("/"))
440             {
441                 // TODO suppress port 80 or 443
442                 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
443                         + _destination.getAddress().getPort() + uri;
444                 Authentication auth = _destination.getProxyAuthentication();
445                 if (auth != null)
446                     auth.setCredentials(_exchange);
447             }
448 
449             String method=_exchange.getMethod();
450             _generator.setRequest(method, uri);
451             _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
452 
453             HttpFields requestHeaders = _exchange.getRequestFields();
454             if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
455             {
456                 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
457                     requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
458             }
459 
460             Buffer requestContent = _exchange.getRequestContent();
461             if (requestContent != null)
462             {
463                 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
464                 _generator.completeHeader(requestHeaders,false);
465                 _generator.addContent(new View(requestContent),true);
466             }
467             else
468             {
469                 InputStream requestContentStream = _exchange.getRequestContentSource();
470                 if (requestContentStream != null)
471                 {
472                     _generator.completeHeader(requestHeaders, false);
473                     int available = requestContentStream.available();
474                     if (available > 0)
475                     {
476                         // TODO deal with any known content length
477                         // TODO reuse this buffer!
478                         byte[] buf = new byte[available];
479                         int length = requestContentStream.read(buf);
480                         _generator.addContent(new ByteArrayBuffer(buf, 0, length), false);
481                     }
482                 }
483                 else
484                 {
485                     requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
486                     _generator.completeHeader(requestHeaders, true);
487                 }
488             }
489 
490             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
491         }
492     }
493 
494     protected void reset(boolean returnBuffers) throws IOException
495     {
496         _requestComplete = false;
497         _connectionHeader = null;
498         _parser.reset(returnBuffers);
499         _generator.reset(returnBuffers);
500         _http11 = true;
501     }
502 
503     private boolean shouldClose()
504     {
505         if (_connectionHeader!=null)
506         {
507             if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
508                 return true;
509             if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
510                 return false;
511         }
512         return !_http11;
513     }
514 
515     private class Handler extends HttpParser.EventHandler
516     {
517         @Override
518         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
519         {
520             // System.out.println( method.toString() + "///" + url.toString() +
521             // "///" + version.toString() );
522             // TODO validate this is acceptable, the <!DOCTYPE goop was coming
523             // out here
524             // throw new IllegalStateException();
525         }
526 
527         @Override
528         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
529         {
530             HttpExchange exchange = _exchange;
531             if (exchange!=null)
532             {
533                 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
534                 _status=status;
535                 exchange.getEventListener().onResponseStatus(version,status,reason);
536                 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
537             }
538         }
539 
540         @Override
541         public void parsedHeader(Buffer name, Buffer value) throws IOException
542         {
543             HttpExchange exchange = _exchange;
544             if (exchange!=null)
545             {
546                 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
547                 {
548                     _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
549                 }
550                 exchange.getEventListener().onResponseHeader(name,value);
551             }
552         }
553 
554         @Override
555         public void headerComplete() throws IOException
556         {
557             if (_endp instanceof AsyncEndPoint)
558                 ((AsyncEndPoint)_endp).scheduleIdle();
559             HttpExchange exchange = _exchange;
560             if (exchange!=null)
561                 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
562         }
563 
564         @Override
565         public void content(Buffer ref) throws IOException
566         {
567             if (_endp instanceof AsyncEndPoint)
568                 ((AsyncEndPoint)_endp).scheduleIdle();
569             HttpExchange exchange = _exchange;
570             if (exchange!=null)
571                 exchange.getEventListener().onResponseContent(ref);
572         }
573 
574         @Override
575         public void messageComplete(long contextLength) throws IOException
576         {
577             HttpExchange exchange = _exchange;
578             if (exchange!=null)
579                 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
580         }
581     }
582 
583     @Override
584     public String toString()
585     {
586         return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
587     }
588 
589     public String toDetailString()
590     {
591         return toString() + " ex=" + _exchange + " " + _timeout.getAge();
592     }
593 
594     public void close() throws IOException
595     {
596         _endp.close();
597     }
598 
599     public void setIdleTimeout()
600     {
601         synchronized (this)
602         {
603             if (_idle.compareAndSet(false,true))
604                 _destination.getHttpClient().scheduleIdle(_timeout);
605             else
606                 throw new IllegalStateException();
607         }
608     }
609 
610     public boolean cancelIdleTimeout()
611     {
612         synchronized (this)
613         {
614             if (_idle.compareAndSet(true,false))
615             {
616                 _destination.getHttpClient().cancel(_timeout);
617                 return true;
618             }
619         }
620 
621         return false;
622     }
623 
624     private class TimeoutTask extends Timeout.Task
625     {
626         @Override
627         public void expired()
628         {
629             HttpExchange ex = null;
630             try
631             {
632                 synchronized (HttpConnection.this)
633                 {
634                     ex = _exchange;
635                     _exchange = null;
636                     if (ex != null)
637                     {
638                         ex.disassociate();
639                         _destination.returnConnection(HttpConnection.this, true);
640                     }
641                     else if (_idle.compareAndSet(true,false))
642                     {
643                         _destination.returnIdleConnection(HttpConnection.this);
644                     }
645                 }
646             }
647             catch (Exception e)
648             {
649                 Log.debug(e);
650             }
651             finally
652             {
653                 try
654                 {
655                     close();
656                 }
657                 catch (IOException e)
658                 {
659                     Log.ignore(e);
660                 }
661 
662                 if (ex != null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
663                 {
664                     ex.setStatus(HttpExchange.STATUS_EXPIRED);
665                 }
666             }
667         }
668     }
669 }