View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.io.InputStream;
18  import java.io.InterruptedIOException;
19  
20  import org.eclipse.jetty.client.security.Authorization;
21  import org.eclipse.jetty.http.HttpGenerator;
22  import org.eclipse.jetty.http.HttpHeaderValues;
23  import org.eclipse.jetty.http.HttpHeaders;
24  import org.eclipse.jetty.http.HttpParser;
25  import org.eclipse.jetty.http.HttpSchemes;
26  import org.eclipse.jetty.http.HttpVersions;
27  import org.eclipse.jetty.http.ssl.SslSelectChannelEndPoint;
28  import org.eclipse.jetty.io.Buffer;
29  import org.eclipse.jetty.io.Buffers;
30  import org.eclipse.jetty.io.ByteArrayBuffer;
31  import org.eclipse.jetty.io.Connection;
32  import org.eclipse.jetty.io.EndPoint;
33  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
34  import org.eclipse.jetty.util.log.Log;
35  import org.eclipse.jetty.util.thread.Timeout;
36  
37  
38  /**
39   * 
40   * 
41   * 
42   */
43  public class HttpConnection implements Connection
44  {
45      HttpDestination _destination;
46      EndPoint _endp;
47      HttpGenerator _generator;
48      HttpParser _parser;
49      boolean _http11 = true;
50      Buffer _connectionHeader;
51      Buffer _requestContentChunk;
52      long _last;
53      boolean _requestComplete;
54      public String _message;
55      public Throwable _throwable;
56      public boolean _reserved;
57  
58      /* The current exchange waiting for a response */
59      volatile HttpExchange _exchange;
60      HttpExchange _pipeline;
61  
62      public void dump() throws IOException
63      {
64          Log.info("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
65          Log.info("generator=" + _generator);
66          Log.info("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
67          Log.info("exchange=" + _exchange);
68          if (_endp instanceof SslSelectChannelEndPoint)
69              ((SslSelectChannelEndPoint)_endp).dump();
70      }
71  
72      Timeout.Task _timeout = new Timeout.Task()
73      {
74          public void expired()
75          {
76              HttpExchange ex = null;
77              try
78              {
79                  synchronized (HttpConnection.this)
80                  {
81                      ex = _exchange;
82                      _exchange = null;
83                      if (ex != null)
84                          _destination.returnConnection(HttpConnection.this,true);
85                  }
86              }
87              catch (Exception e)
88              {
89                  Log.debug(e);
90              }
91              finally
92              {
93                  try
94                  {
95                      _endp.close();
96                  }
97                  catch (IOException e)
98                  {
99                      Log.ignore(e);
100                 }
101 
102                 if (ex!=null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
103                 {
104                     ex.setStatus(HttpExchange.STATUS_EXPIRED);
105                 }
106             }
107         }
108     };
109 
110     /* ------------------------------------------------------------ */
111     HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
112     {
113         _endp = endp;
114         _generator = new HttpGenerator(requestBuffers,endp);
115         _parser = new HttpParser(responseBuffers,endp,new Handler());
116     }
117 
118     public void setReserved (boolean reserved)
119     {
120         _reserved = reserved;
121     }
122     
123     public boolean isReserved()
124     {
125         return _reserved;
126     }
127     
128     /* ------------------------------------------------------------ */
129     public HttpDestination getDestination()
130     {
131         return _destination;
132     }
133 
134     /* ------------------------------------------------------------ */
135     public void setDestination(HttpDestination destination)
136     {
137         _destination = destination;
138     }
139 
140     /* ------------------------------------------------------------ */
141     public boolean send(HttpExchange ex) throws IOException
142     {
143         // _message =
144         // Thread.currentThread().getName()+": Generator instance="+_generator
145         // .hashCode()+" state= "+_generator.getState()+" _exchange="+_exchange;
146         _throwable = new Throwable();
147         synchronized (this)
148         {
149             if (_exchange != null)
150             {
151                 if (_pipeline != null)
152                     throw new IllegalStateException(this + " PIPELINED!!!  _exchange=" + _exchange);
153                 _pipeline = ex;
154                 return true;
155             }
156 
157             if (!_endp.isOpen())
158                 return false;
159 
160             ex.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
161             _exchange = ex;
162 
163             if (_endp.isBlocking())
164                 this.notify();
165             else
166             {
167                 SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp;
168                 scep.scheduleWrite();
169             }
170 
171             if (!_endp.isBlocking())
172                 _destination.getHttpClient().schedule(_timeout);
173 
174             return true;
175         }
176     }
177 
178     /* ------------------------------------------------------------ */
179     public void handle() throws IOException
180     {
181         int no_progress = 0;
182         long flushed = 0;
183 
184         boolean failed = false;
185         while (_endp.isBufferingInput() || _endp.isOpen())
186         {
187             synchronized (this)
188             {
189                 while (_exchange == null)
190                 {
191                     if (_endp.isBlocking())
192                     {
193                         try
194                         {
195                             this.wait();
196                         }
197                         catch (InterruptedException e)
198                         {
199                             throw new InterruptedIOException();
200                         }
201                     }
202                     else
203                     {
204                         // Hopefully just space?
205                         _parser.fill();
206                         _parser.skipCRLF();
207                         if (_parser.isMoreInBuffer())
208                         {
209                             Log.warn("unexpected data");
210                             _endp.close();
211                         }
212 
213                         return;
214                     }
215                 }
216             }
217             if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
218             {
219                 no_progress = 0;
220                 commitRequest();
221             }
222 
223             try
224             {
225                 long io = 0;
226                 _endp.flush();
227 
228                 if (_generator.isComplete())
229                 {
230                     if (!_requestComplete)
231                     {
232                         _requestComplete = true;
233                         _exchange.getEventListener().onRequestComplete();
234                     }
235                 }
236                 else
237                 {
238                     // Write as much of the request as possible
239                     synchronized (this)
240                     {
241                         if (_exchange == null)
242                             continue;
243                         flushed = _generator.flushBuffer();
244                         io += flushed;
245                     }
246 
247                     if (!_generator.isComplete())
248                     {
249                         InputStream in = _exchange.getRequestContentSource();
250                         if (in != null)
251                         {
252                             if (_requestContentChunk == null || _requestContentChunk.length() == 0)
253                             {
254                                 _requestContentChunk = _exchange.getRequestContentChunk();
255                                 if (_requestContentChunk != null)
256                                     _generator.addContent(_requestContentChunk,false);
257                                 else
258                                     _generator.complete();
259                                 io += _generator.flushBuffer();
260                             }
261                         }
262                         else
263                             _generator.complete();
264                     }
265                 }
266                 
267 
268                 // If we are not ended then parse available
269                 if (!_parser.isComplete() && _generator.isCommitted())
270                 {
271                     long filled = _parser.parseAvailable();
272                     io += filled;
273                 }
274 
275                 if (io > 0)
276                     no_progress = 0;
277                 else if (no_progress++ >= 2 && !_endp.isBlocking())
278                 {
279                     // SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
280                     if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
281                     {
282                         if (_generator.flushBuffer()>0)
283                             continue;
284                     }
285                     return;
286                 }
287             }
288             catch (Throwable e)
289             {                
290                 if (e instanceof ThreadDeath)
291                     throw (ThreadDeath)e;
292                 
293                 synchronized (this)
294                 {
295                     if (_exchange != null)
296                     {
297                         _exchange.getEventListener().onException(e);
298                         _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
299                     }
300                 }
301                 Log.warn("IOE on "+_exchange);
302                 failed = true;
303                 if (e instanceof IOException)
304                     throw (IOException)e;
305  
306                 if (e instanceof Error)
307                     throw (Error)e;
308                 
309                 if (e instanceof RuntimeException)
310                     throw (RuntimeException)e;
311                 
312                throw new RuntimeException(e);
313             }
314             finally
315             {
316                 boolean complete = false;
317                 boolean close = failed; // always close the connection on error
318                 if (!failed)
319                 {
320                     // are we complete?
321                     if (_generator.isComplete())
322                     {
323                         if (!_requestComplete)
324                         {
325                             _requestComplete = true;
326                             _exchange.getEventListener().onRequestComplete();
327                         }
328 
329                         // we need to return the HttpConnection to a state that
330                         // it can be reused or closed out
331                         if (_parser.isComplete())
332                         {
333                             _destination.getHttpClient().cancel(_timeout);
334                             complete = true;
335                         }
336                     }
337                 }
338 
339                 if (complete || failed)
340                 {
341                     synchronized (this)
342                     {
343                         if (!close)
344                             close = shouldClose();
345                             
346                         reset(true);
347                         no_progress = 0;
348                         flushed = -1;
349                         if (_exchange != null)
350                         {
351                             _exchange = null;
352 
353                             if (_pipeline == null)
354                             {
355                                 if (!isReserved())
356                                     _destination.returnConnection(this,close);
357                                 if (close)
358                                     return;
359                             }
360                             else
361                             {
362                                 if (close)
363                                 {
364                                     if (!isReserved())
365                                         _destination.returnConnection(this,close);
366                                     _destination.send(_pipeline);
367                                     _pipeline = null;
368                                     return;
369                                 }
370 
371                                 HttpExchange ex = _pipeline;
372                                 _pipeline = null;
373 
374                                 send(ex);
375                             }
376                         }
377                     }
378                 }
379             }
380         }
381     }
382 
383     /* ------------------------------------------------------------ */
384     public boolean isIdle()
385     {
386         synchronized (this)
387         {
388             return _exchange == null;
389         }
390     }
391     
392     /* ------------------------------------------------------------ */
393     /**
394      * @see org.eclipse.jetty.io.Connection#isSuspended()
395      */
396     public boolean isSuspended()
397     {
398         return false;
399     }
400 
401     /* ------------------------------------------------------------ */
402     public EndPoint getEndPoint()
403     {
404         return _endp;
405     }
406 
407     /* ------------------------------------------------------------ */
408     private void commitRequest() throws IOException
409     {
410         synchronized (this)
411         {
412             if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
413                 throw new IllegalStateException();
414 
415             _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
416             _generator.setVersion(_exchange._version);
417 
418             String uri = _exchange._uri;
419             if (_destination.isProxied() && uri.startsWith("/"))
420             {
421                 // TODO suppress port 80 or 443
422                 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
423                         + _destination.getAddress().getPort() + uri;
424                 Authorization auth = _destination.getProxyAuthentication();
425                 if (auth != null)
426                     auth.setCredentials(_exchange);
427             }
428 
429             _generator.setRequest(_exchange._method,uri);
430 
431             if (_exchange._version >= HttpVersions.HTTP_1_1_ORDINAL)
432             {
433                 if (!_exchange._requestFields.containsKey(HttpHeaders.HOST_BUFFER))
434                     _exchange._requestFields.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
435             }
436 
437             if (_exchange._requestContent != null)
438             {
439                 _exchange._requestFields.putLongField(HttpHeaders.CONTENT_LENGTH,_exchange._requestContent.length());
440                 _generator.completeHeader(_exchange._requestFields,false);
441                 _generator.addContent(_exchange._requestContent,true);
442             }
443             else if (_exchange._requestContentSource != null)
444             {
445                 _generator.completeHeader(_exchange._requestFields,false);
446                 int available = _exchange._requestContentSource.available();
447                 if (available > 0)
448                 {
449                     // TODO deal with any known content length
450 
451                     // TODO reuse this buffer!
452                     byte[] buf = new byte[available];
453                     int length = _exchange._requestContentSource.read(buf);
454                     _generator.addContent(new ByteArrayBuffer(buf,0,length),false);
455                 }
456             }
457             else
458             {
459                 _exchange._requestFields.remove(HttpHeaders.CONTENT_LENGTH); // TODO
460                 // :
461                 // should
462                 // not
463                 // be
464                 // needed
465                 _generator.completeHeader(_exchange._requestFields,true);
466             }
467 
468             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
469         }
470     }
471 
472     /* ------------------------------------------------------------ */
473     protected void reset(boolean returnBuffers) throws IOException
474     {
475         _requestComplete = false;
476         _connectionHeader = null;
477         _parser.reset(returnBuffers);
478         _generator.reset(returnBuffers);
479         _http11 = true;
480     }
481 
482     /* ------------------------------------------------------------ */
483     private boolean shouldClose()
484     {
485         if (_connectionHeader!=null)
486         {
487             if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
488                 return true;
489             if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
490                 return false;
491         }
492         return !_http11;
493     }
494 
495     /* ------------------------------------------------------------ */
496     private class Handler extends HttpParser.EventHandler
497     {
498         @Override
499         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
500         {
501             // System.out.println( method.toString() + "///" + url.toString() +
502             // "///" + version.toString() );
503             // TODO validate this is acceptable, the <!DOCTYPE goop was coming
504             // out here
505             // throw new IllegalStateException();
506         }
507 
508         @Override
509         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
510         {
511             HttpExchange exchange = _exchange;
512             if (exchange!=null)
513             {
514                 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
515                 exchange.getEventListener().onResponseStatus(version,status,reason);
516                 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
517             }
518         }
519 
520         @Override
521         public void parsedHeader(Buffer name, Buffer value) throws IOException
522         {
523             HttpExchange exchange = _exchange;
524             if (exchange!=null)
525             {
526                 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
527                 {
528                     _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
529                 }
530                 exchange.getEventListener().onResponseHeader(name,value);
531             }
532         }
533 
534         @Override
535         public void headerComplete() throws IOException
536         {
537             HttpExchange exchange = _exchange;
538             if (exchange!=null)
539                 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
540         }
541 
542         @Override
543         public void content(Buffer ref) throws IOException
544         {
545             HttpExchange exchange = _exchange;
546             if (exchange!=null)
547                 exchange.getEventListener().onResponseContent(ref);
548         }
549 
550         @Override
551         public void messageComplete(long contextLength) throws IOException
552         {
553             HttpExchange exchange = _exchange;
554             if (exchange!=null)
555                 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
556         }
557     }
558 
559     /* ------------------------------------------------------------ */
560     public String toString()
561     {
562         return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
563     }
564 
565     /* ------------------------------------------------------------ */
566     public String toDetailString()
567     {
568         return toString() + " ex=" + _exchange + " " + _timeout.getAge();
569     }
570 
571     /* ------------------------------------------------------------ */
572     /**
573      * @return the last
574      */
575     public long getLast()
576     {
577         return _last;
578     }
579 
580     /* ------------------------------------------------------------ */
581     /**
582      * @param last
583      *            the last to set
584      */
585     public void setLast(long last)
586     {
587         _last = last;
588     }
589 
590     /* ------------------------------------------------------------ */
591     public void close() throws IOException
592     {
593         _endp.close();
594     }
595 
596 }