View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy.server.http;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.Set;
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.TimeoutException;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  
28  import org.eclipse.jetty.http.HttpField;
29  import org.eclipse.jetty.http.HttpFields;
30  import org.eclipse.jetty.http.HttpGenerator;
31  import org.eclipse.jetty.http.HttpStatus;
32  import org.eclipse.jetty.http.HttpVersion;
33  import org.eclipse.jetty.io.EndPoint;
34  import org.eclipse.jetty.io.EofException;
35  import org.eclipse.jetty.server.Connector;
36  import org.eclipse.jetty.server.HttpConfiguration;
37  import org.eclipse.jetty.server.HttpTransport;
38  import org.eclipse.jetty.spdy.StreamException;
39  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
40  import org.eclipse.jetty.spdy.api.PushInfo;
41  import org.eclipse.jetty.spdy.api.ReplyInfo;
42  import org.eclipse.jetty.spdy.api.SPDY;
43  import org.eclipse.jetty.spdy.api.Stream;
44  import org.eclipse.jetty.spdy.api.StreamStatus;
45  import org.eclipse.jetty.util.BlockingCallback;
46  import org.eclipse.jetty.util.BufferUtil;
47  import org.eclipse.jetty.util.Callback;
48  import org.eclipse.jetty.util.Fields;
49  import org.eclipse.jetty.util.Promise;
50  import org.eclipse.jetty.util.log.Log;
51  import org.eclipse.jetty.util.log.Logger;
52  
53  public class HttpTransportOverSPDY implements HttpTransport
54  {
55      private static final Logger LOG = Log.getLogger(HttpTransportOverSPDY.class);
56  
57      private final Connector connector;
58      private final HttpConfiguration configuration;
59      private final EndPoint endPoint;
60      private final PushStrategy pushStrategy;
61      private final Stream stream;
62      private final Fields requestHeaders;
63      private final BlockingCallback streamBlocker = new BlockingCallback();
64      private final AtomicBoolean committed = new AtomicBoolean();
65  
66      public HttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders)
67      {
68          this.connector = connector;
69          this.configuration = configuration;
70          this.endPoint = endPoint;
71          this.pushStrategy = pushStrategy == null ? new PushStrategy.None() : pushStrategy;
72          this.stream = stream;
73          this.requestHeaders = requestHeaders;
74      }
75  
76      @Override
77      public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
78      {
79          if (LOG.isDebugEnabled())
80              LOG.debug("send  {} {} {} {} last={}", this, stream, info, BufferUtil.toDetailString(content), lastContent);
81  
82          if (stream.isClosed() || stream.isReset())
83          {
84              EofException exception = new EofException("stream closed");
85              callback.failed(exception);
86              return;
87          }
88          // new Throwable().printStackTrace();
89  
90          // info==null content==null lastContent==false          should not happen
91          // info==null content==null lastContent==true           signals no more content - complete
92          // info==null content!=null lastContent==false          send data on committed response
93          // info==null content!=null lastContent==true           send last data on committed response - complete
94          // info!=null content==null lastContent==false          reply, commit
95          // info!=null content==null lastContent==true           reply, commit and complete
96          // info!=null content!=null lastContent==false          reply, commit with content
97          // info!=null content!=null lastContent==true           reply, commit with content and complete
98  
99          boolean hasContent = BufferUtil.hasContent(content);
100 
101         if (info != null)
102         {
103             if (!committed.compareAndSet(false, true))
104             {
105                 StreamException exception = new StreamException(stream.getId(), StreamStatus.PROTOCOL_ERROR,
106                         "Stream already committed!");
107                 callback.failed(exception);
108                 LOG.warn("Committed response twice.", exception);
109                 return;
110             }
111             short version = stream.getSession().getVersion();
112             Fields headers = new Fields();
113 
114             HttpVersion httpVersion = HttpVersion.HTTP_1_1;
115             headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.asString());
116 
117             int status = info.getStatus();
118             StringBuilder httpStatus = new StringBuilder().append(status);
119             String reason = info.getReason();
120             if (reason == null)
121                 reason = HttpStatus.getMessage(status);
122             if (reason != null)
123                 httpStatus.append(" ").append(reason);
124             headers.put(HTTPSPDYHeader.STATUS.name(version), httpStatus.toString());
125             LOG.debug("HTTP < {} {}", httpVersion, httpStatus);
126 
127             // TODO merge the two Field classes into one
128             HttpFields fields = info.getHttpFields();
129             if (fields != null)
130             {
131                 for (int i = 0; i < fields.size(); ++i)
132                 {
133                     HttpField field = fields.getField(i);
134                     String name = field.getName();
135                     String value = field.getValue();
136                     headers.put(name, value);
137                     LOG.debug("HTTP < {}: {}", name, value);
138                 }
139             }
140 
141             boolean close = !hasContent && lastContent;
142             ReplyInfo reply = new ReplyInfo(headers, close);
143             reply(stream, reply);
144         }
145 
146         // Do we have some content to send as well
147         if (hasContent)
148         {
149             // Is the stream still open?
150             if (stream.isClosed() || stream.isReset())
151                 // tell the callback about the EOF 
152                 callback.failed(new EofException("stream closed"));
153             else
154                 // send the data and let it call the callback
155                 stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS, content, lastContent
156                 ), callback);
157         }
158         // else do we need to close
159         else if (lastContent)
160         {
161             // Are we closed ?
162             if (stream.isClosed() || stream.isReset())
163                 // already closed by reply, so just tell callback we are complete
164                 callback.succeeded();
165             else
166                 // send empty data to close and let the send call the callback
167                 stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS,
168                         BufferUtil.EMPTY_BUFFER, lastContent), callback);
169         }
170         else
171             // No data and no close so tell callback we are completed
172             callback.succeeded();
173 
174     }
175 
176     @Override
177     public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws EofException
178     {
179         send(info, content, lastContent, streamBlocker);
180         try
181         {
182             streamBlocker.block();
183         }
184         catch (InterruptedException | TimeoutException | IOException e)
185         {
186             LOG.debug(e);
187         }
188     }
189 
190 
191     @Override
192     public void completed()
193     {
194         LOG.debug("completed");
195     }
196 
197     private void reply(Stream stream, ReplyInfo replyInfo)
198     {
199         if (!stream.isUnidirectional())
200             stream.reply(replyInfo, new Callback.Adapter());
201 
202         Fields responseHeaders = replyInfo.getHeaders();
203         short version = stream.getSession().getVersion();
204         if (responseHeaders.get(HTTPSPDYHeader.STATUS.name(version)).value().startsWith("200") && !stream.isClosed())
205         {
206             // We have a 200 OK with some content to send, check the push strategy
207             Fields.Field scheme = requestHeaders.get(HTTPSPDYHeader.SCHEME.name(version));
208             Fields.Field host = requestHeaders.get(HTTPSPDYHeader.HOST.name(version));
209             Fields.Field uri = requestHeaders.get(HTTPSPDYHeader.URI.name(version));
210             Set<String> pushResources = pushStrategy.apply(stream, requestHeaders, responseHeaders);
211 
212             for (String pushResource : pushResources)
213             {
214                 Fields pushHeaders = createPushHeaders(scheme, host, pushResource);
215                 final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource);
216 
217                 // TODO: handle the timeout better
218                 stream.push(new PushInfo(0, TimeUnit.MILLISECONDS, pushHeaders, false), new Promise.Adapter<Stream>()
219                 {
220                     @Override
221                     public void succeeded(Stream pushStream)
222                     {
223                         HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(pushStream, pushRequestHeaders);
224                         pushChannel.requestStart(pushRequestHeaders, true);
225                     }
226                 });
227             }
228         }
229     }
230 
231     private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath)
232     {
233         final Fields requestHeaders = new Fields();
234         short version = stream.getSession().getVersion();
235         requestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
236         requestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
237         requestHeaders.put(scheme);
238         requestHeaders.put(host);
239         requestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
240         String referrer = scheme.value() + "://" + host.value() + uri.value();
241         requestHeaders.put("referer", referrer);
242         // Remember support for gzip encoding
243         requestHeaders.put(requestHeaders.get("accept-encoding"));
244         requestHeaders.put("x-spdy-push", "true");
245         return requestHeaders;
246     }
247 
248     private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath)
249     {
250         final Fields pushHeaders = new Fields();
251         short version = stream.getSession().getVersion();
252         if (version == SPDY.V2)
253             pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath);
254         else
255         {
256             pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
257             pushHeaders.put(scheme);
258             pushHeaders.put(host);
259         }
260         pushHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200");
261         pushHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
262         return pushHeaders;
263     }
264 
265     private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders)
266     {
267         HttpTransport transport = new HttpTransportOverSPDY(connector, configuration, endPoint, pushStrategy, pushStream, pushRequestHeaders);
268         HttpInputOverSPDY input = new HttpInputOverSPDY();
269         return new HttpChannelOverSPDY(connector, configuration, endPoint, transport, input, pushStream);
270     }
271 }