View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2011 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.io.InputStream;
18  import java.util.Collections;
19  import java.util.concurrent.atomic.AtomicBoolean;
20  
21  import org.eclipse.jetty.client.security.Authentication;
22  import org.eclipse.jetty.http.HttpFields;
23  import org.eclipse.jetty.http.HttpGenerator;
24  import org.eclipse.jetty.http.HttpHeaderValues;
25  import org.eclipse.jetty.http.HttpHeaders;
26  import org.eclipse.jetty.http.HttpMethods;
27  import org.eclipse.jetty.http.HttpParser;
28  import org.eclipse.jetty.http.HttpSchemes;
29  import org.eclipse.jetty.http.HttpStatus;
30  import org.eclipse.jetty.http.HttpVersions;
31  import org.eclipse.jetty.io.AbstractConnection;
32  import org.eclipse.jetty.io.Buffer;
33  import org.eclipse.jetty.io.Buffers;
34  import org.eclipse.jetty.io.ByteArrayBuffer;
35  import org.eclipse.jetty.io.Connection;
36  import org.eclipse.jetty.io.EndPoint;
37  import org.eclipse.jetty.io.EofException;
38  import org.eclipse.jetty.io.View;
39  import org.eclipse.jetty.util.component.AggregateLifeCycle;
40  import org.eclipse.jetty.util.component.Dumpable;
41  import org.eclipse.jetty.util.log.Log;
42  import org.eclipse.jetty.util.log.Logger;
43  import org.eclipse.jetty.util.thread.Timeout;
44  
45  /**
46   *
47   * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
48   */
49  public abstract class AbstractHttpConnection extends AbstractConnection implements Dumpable
50  {
51      private static final Logger LOG = Log.getLogger(AbstractHttpConnection.class);
52  
53      protected HttpDestination _destination;
54      protected HttpGenerator _generator;
55      protected HttpParser _parser;
56      protected boolean _http11 = true;
57      protected int _status;
58      protected Buffer _connectionHeader;
59      protected boolean _reserved;
60  
61      // The current exchange waiting for a response
62      protected volatile HttpExchange _exchange;
63      protected HttpExchange _pipeline;
64      private final Timeout.Task _idleTimeout = new ConnectionIdleTask();
65      private AtomicBoolean _idle = new AtomicBoolean(false);
66  
67  
68      AbstractHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
69      {
70          super(endp);
71  
72          _generator = new HttpGenerator(requestBuffers,endp);
73          _parser = new HttpParser(responseBuffers,endp,new Handler());
74      }
75  
76      public void setReserved (boolean reserved)
77      {
78          _reserved = reserved;
79      }
80  
81      public boolean isReserved()
82      {
83          return _reserved;
84      }
85  
86      public HttpDestination getDestination()
87      {
88          return _destination;
89      }
90  
91      public void setDestination(HttpDestination destination)
92      {
93          _destination = destination;
94      }
95  
96      public boolean send(HttpExchange ex) throws IOException
97      {
98          LOG.debug("Send {} on {}",ex,this);
99          synchronized (this)
100         {
101             if (_exchange != null)
102             {
103                 if (_pipeline != null)
104                     throw new IllegalStateException(this + " PIPELINED!!!  _exchange=" + _exchange);
105                 _pipeline = ex;
106                 return true;
107             }
108 
109             _exchange = ex;
110             _exchange.associate(this);
111 
112             // The call to associate() may have closed the connection, check if it's the case
113             if (!_endp.isOpen())
114             {
115                 _exchange.disassociate();
116                 _exchange = null;
117                 return false;
118             }
119 
120             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
121 
122             adjustIdleTimeout();
123 
124             return true;
125         }
126     }
127 
128     private void adjustIdleTimeout() throws IOException
129     {
130         // Adjusts the idle timeout in case the default or exchange timeout
131         // are greater. This is needed for long polls, where one wants an
132         // aggressive releasing of idle connections (so idle timeout is small)
133         // but still allow long polls to complete normally
134 
135         long timeout = _exchange.getTimeout();
136         if (timeout <= 0)
137             timeout = _destination.getHttpClient().getTimeout();
138 
139         long endPointTimeout = _endp.getMaxIdleTime();
140 
141         if (timeout > 0 && timeout > endPointTimeout)
142         {
143             // Make it larger than the exchange timeout so that there are
144             // no races between the idle timeout and the exchange timeout
145             // when trying to close the endpoint
146             _endp.setMaxIdleTime(2 * (int)timeout);
147         }
148     }
149 
150     public abstract Connection handle() throws IOException;
151 
152 
153     public boolean isIdle()
154     {
155         synchronized (this)
156         {
157             return _exchange == null;
158         }
159     }
160 
161     public boolean isSuspended()
162     {
163         return false;
164     }
165 
166     public void onClose()
167     {
168     }
169 
170     protected void commitRequest() throws IOException
171     {
172         synchronized (this)
173         {
174             _status=0;
175             if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
176                 throw new IllegalStateException();
177 
178             _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
179             _generator.setVersion(_exchange.getVersion());
180 
181             String method=_exchange.getMethod();
182             String uri = _exchange.getRequestURI();
183             if (_destination.isProxied() && !HttpMethods.CONNECT.equals(method) && uri.startsWith("/"))
184             {
185                 boolean secure = _destination.isSecure();
186                 String host = _destination.getAddress().getHost();
187                 int port = _destination.getAddress().getPort();
188                 StringBuilder absoluteURI = new StringBuilder();
189                 absoluteURI.append(secure ? HttpSchemes.HTTPS : HttpSchemes.HTTP);
190                 absoluteURI.append("://");
191                 absoluteURI.append(host);
192                 // Avoid adding default ports
193                 if (!(secure && port == 443 || !secure && port == 80))
194                     absoluteURI.append(":").append(port);
195                 absoluteURI.append(uri);
196                 uri = absoluteURI.toString();
197                 Authentication auth = _destination.getProxyAuthentication();
198                 if (auth != null)
199                     auth.setCredentials(_exchange);
200             }
201 
202             _generator.setRequest(method, uri);
203             _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
204 
205             HttpFields requestHeaders = _exchange.getRequestFields();
206             if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
207             {
208                 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
209                     requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
210             }
211 
212             Buffer requestContent = _exchange.getRequestContent();
213             if (requestContent != null)
214             {
215                 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
216                 _generator.completeHeader(requestHeaders,false);
217                 _generator.addContent(new View(requestContent),true);
218             }
219             else
220             {
221                 InputStream requestContentStream = _exchange.getRequestContentSource();
222                 if (requestContentStream != null)
223                 {
224                     _generator.completeHeader(requestHeaders, false);
225                     int available = requestContentStream.available();
226                     if (available > 0)
227                     {
228                         // TODO deal with any known content length
229                         // TODO reuse this buffer!
230                         byte[] buf = new byte[available];
231                         int length = requestContentStream.read(buf);
232                         _generator.addContent(new ByteArrayBuffer(buf, 0, length), false);
233                     }
234                 }
235                 else
236                 {
237                     requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
238                     _generator.completeHeader(requestHeaders, true);
239                 }
240             }
241 
242             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
243         }
244     }
245 
246     protected void reset() throws IOException
247     {
248         _connectionHeader = null;
249         _parser.reset();
250         _generator.reset();
251         _http11 = true;
252     }
253 
254 
255     private class Handler extends HttpParser.EventHandler
256     {
257         @Override
258         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
259         {
260             // System.out.println( method.toString() + "///" + url.toString() +
261             // "///" + version.toString() );
262             // TODO validate this is acceptable, the <!DOCTYPE goop was coming
263             // out here
264             // throw new IllegalStateException();
265         }
266 
267         @Override
268         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
269         {
270             HttpExchange exchange = _exchange;
271             if (exchange==null)
272             {
273                 LOG.warn("No exchange for response");
274                 _endp.close();
275                 return;
276             }
277 
278             switch(status)
279             {
280                 case HttpStatus.CONTINUE_100:
281                 case HttpStatus.PROCESSING_102:
282                     // TODO check if appropriate expect was sent in the request.
283                     exchange.setEventListener(new NonFinalResponseListener(exchange));
284                     break;
285 
286                 case HttpStatus.OK_200:
287                     // handle special case for CONNECT 200 responses
288                     if (HttpMethods.CONNECT.equalsIgnoreCase(exchange.getMethod()))
289                         _parser.setHeadResponse(true);
290                     break;
291             }
292 
293             _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
294             _status=status;
295             exchange.getEventListener().onResponseStatus(version,status,reason);
296             exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
297 
298         }
299 
300         @Override
301         public void parsedHeader(Buffer name, Buffer value) throws IOException
302         {
303             HttpExchange exchange = _exchange;
304             if (exchange!=null)
305             {
306                 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
307                 {
308                     _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
309                 }
310                 exchange.getEventListener().onResponseHeader(name,value);
311             }
312         }
313 
314         @Override
315         public void headerComplete() throws IOException
316         {
317             HttpExchange exchange = _exchange;
318             if (exchange!=null)
319                 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
320         }
321 
322         @Override
323         public void content(Buffer ref) throws IOException
324         {
325             HttpExchange exchange = _exchange;
326             if (exchange!=null)
327                 exchange.getEventListener().onResponseContent(ref);
328         }
329 
330         @Override
331         public void messageComplete(long contextLength) throws IOException
332         {
333             HttpExchange exchange = _exchange;
334             if (exchange!=null)
335                 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
336         }
337 
338         @Override
339         public void earlyEOF()
340         {
341             HttpExchange exchange = _exchange;
342             if (exchange!=null)
343             {
344                 if (!exchange.isDone())
345                 {
346                     if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
347                         exchange.getEventListener().onException(new EofException("early EOF"));
348                 }
349             }
350         }
351 
352 
353     }
354 
355     @Override
356     public String toString()
357     {
358         return String.format("%s %s g=%s p=%s",
359                 super.toString(),
360                 _destination == null ? "?.?.?.?:??" : _destination.getAddress(),
361                 _generator,
362                 _parser);
363     }
364 
365     public String toDetailString()
366     {
367         return toString() + " ex=" + _exchange + " idle for " + _idleTimeout.getAge();
368     }
369 
370     public void close() throws IOException
371     {
372         //if there is a live, unfinished exchange, set its status to be
373         //excepted and wake up anyone waiting on waitForDone()
374 
375         HttpExchange exchange = _exchange;
376         if (exchange != null && !exchange.isDone())
377         {
378             switch (exchange.getStatus())
379             {
380                 case HttpExchange.STATUS_CANCELLED:
381                 case HttpExchange.STATUS_CANCELLING:
382                 case HttpExchange.STATUS_COMPLETED:
383                 case HttpExchange.STATUS_EXCEPTED:
384                 case HttpExchange.STATUS_EXPIRED:
385                     break;
386                 case HttpExchange.STATUS_PARSING_CONTENT:
387                     if (_endp.isInputShutdown() && _parser.isState(HttpParser.STATE_EOF_CONTENT))
388                         break;
389                 default:
390                     String exch= exchange.toString();
391                     String reason = _endp.isOpen()?(_endp.isInputShutdown()?"half closed: ":"local close: "):"closed: ";
392                     if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
393                         exchange.getEventListener().onException(new EofException(reason+exch));
394             }
395         }
396 
397         if (_endp.isOpen())
398         {
399             _endp.close();
400             _destination.returnConnection(this, true);
401         }
402     }
403 
404     public void setIdleTimeout()
405     {
406         synchronized (this)
407         {
408             if (_idle.compareAndSet(false, true))
409                 _destination.getHttpClient().scheduleIdle(_idleTimeout);
410             else
411                 throw new IllegalStateException();
412         }
413     }
414 
415     public boolean cancelIdleTimeout()
416     {
417         synchronized (this)
418         {
419             if (_idle.compareAndSet(true, false))
420             {
421                 _destination.getHttpClient().cancel(_idleTimeout);
422                 return true;
423             }
424         }
425 
426         return false;
427     }
428 
429     protected void exchangeExpired(HttpExchange exchange)
430     {
431         synchronized (this)
432         {
433             // We are expiring an exchange, but the exchange is pending
434             // Cannot reuse the connection because the reply may arrive, so close it
435             if (_exchange == exchange)
436             {
437                 try
438                 {
439                     _destination.returnConnection(this, true);
440                 }
441                 catch (IOException x)
442                 {
443                     LOG.ignore(x);
444                 }
445             }
446         }
447     }
448 
449     /* ------------------------------------------------------------ */
450     /**
451      * @see org.eclipse.jetty.util.component.Dumpable#dump()
452      */
453     public String dump()
454     {
455         return AggregateLifeCycle.dump(this);
456     }
457 
458     /* ------------------------------------------------------------ */
459     /**
460      * @see org.eclipse.jetty.util.component.Dumpable#dump(java.lang.Appendable, java.lang.String)
461      */
462     public void dump(Appendable out, String indent) throws IOException
463     {
464         synchronized (this)
465         {
466             out.append(String.valueOf(this)).append("\n");
467             AggregateLifeCycle.dump(out,indent,Collections.singletonList(_endp));
468         }
469     }
470 
471     /* ------------------------------------------------------------ */
472     private class ConnectionIdleTask extends Timeout.Task
473     {
474         /* ------------------------------------------------------------ */
475         @Override
476         public void expired()
477         {
478             // Connection idle, close it
479             if (_idle.compareAndSet(true, false))
480             {
481                 _destination.returnIdleConnection(AbstractHttpConnection.this);
482             }
483         }
484     }
485 
486 
487     /* ------------------------------------------------------------ */
488     private class NonFinalResponseListener implements HttpEventListener
489     {
490         final HttpExchange _exchange;
491         final HttpEventListener _next;
492 
493         /* ------------------------------------------------------------ */
494         public NonFinalResponseListener(HttpExchange exchange)
495         {
496             _exchange=exchange;
497             _next=exchange.getEventListener();
498         }
499 
500         /* ------------------------------------------------------------ */
501         public void onRequestCommitted() throws IOException
502         {
503         }
504 
505         /* ------------------------------------------------------------ */
506         public void onRequestComplete() throws IOException
507         {
508         }
509 
510         /* ------------------------------------------------------------ */
511         public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
512         {
513         }
514 
515         /* ------------------------------------------------------------ */
516         public void onResponseHeader(Buffer name, Buffer value) throws IOException
517         {
518             _next.onResponseHeader(name,value);
519         }
520 
521         /* ------------------------------------------------------------ */
522         public void onResponseHeaderComplete() throws IOException
523         {
524             _next.onResponseHeaderComplete();
525         }
526 
527         /* ------------------------------------------------------------ */
528         public void onResponseContent(Buffer content) throws IOException
529         {
530         }
531 
532         /* ------------------------------------------------------------ */
533         public void onResponseComplete() throws IOException
534         {
535             _exchange.setEventListener(_next);
536             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
537             _parser.reset();
538         }
539 
540         /* ------------------------------------------------------------ */
541         public void onConnectionFailed(Throwable ex)
542         {
543             _exchange.setEventListener(_next);
544             _next.onConnectionFailed(ex);
545         }
546 
547         /* ------------------------------------------------------------ */
548         public void onException(Throwable ex)
549         {
550             _exchange.setEventListener(_next);
551             _next.onException(ex);
552         }
553 
554         /* ------------------------------------------------------------ */
555         public void onExpire()
556         {
557             _exchange.setEventListener(_next);
558             _next.onExpire();
559         }
560 
561         /* ------------------------------------------------------------ */
562         public void onRetry()
563         {
564             _exchange.setEventListener(_next);
565             _next.onRetry();
566         }
567     }
568 }