View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy.server.http;
20  
21  import java.nio.ByteBuffer;
22  import java.util.Queue;
23  import java.util.Set;
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  
27  import org.eclipse.jetty.http.HttpField;
28  import org.eclipse.jetty.http.HttpFields;
29  import org.eclipse.jetty.http.HttpGenerator;
30  import org.eclipse.jetty.http.HttpHeader;
31  import org.eclipse.jetty.http.HttpMethod;
32  import org.eclipse.jetty.http.HttpStatus;
33  import org.eclipse.jetty.http.HttpVersion;
34  import org.eclipse.jetty.io.EndPoint;
35  import org.eclipse.jetty.io.EofException;
36  import org.eclipse.jetty.server.Connector;
37  import org.eclipse.jetty.server.HttpConfiguration;
38  import org.eclipse.jetty.server.HttpTransport;
39  import org.eclipse.jetty.spdy.StreamException;
40  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
41  import org.eclipse.jetty.spdy.api.HeadersInfo;
42  import org.eclipse.jetty.spdy.api.PushInfo;
43  import org.eclipse.jetty.spdy.api.ReplyInfo;
44  import org.eclipse.jetty.spdy.api.SPDY;
45  import org.eclipse.jetty.spdy.api.Session;
46  import org.eclipse.jetty.spdy.api.Stream;
47  import org.eclipse.jetty.spdy.api.StreamStatus;
48  import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
49  import org.eclipse.jetty.util.BufferUtil;
50  import org.eclipse.jetty.util.Callback;
51  import org.eclipse.jetty.util.ConcurrentArrayQueue;
52  import org.eclipse.jetty.util.Fields;
53  import org.eclipse.jetty.util.Promise;
54  import org.eclipse.jetty.util.log.Log;
55  import org.eclipse.jetty.util.log.Logger;
56  
57  public class HttpTransportOverSPDY implements HttpTransport
58  {
59      private static final Logger LOG = Log.getLogger(HttpTransportOverSPDY.class);
60  
61      private final Connector connector;
62      private final HttpConfiguration configuration;
63      private final EndPoint endPoint;
64      private final PushStrategy pushStrategy;
65      private final Stream stream;
66      private final short version;
67      private final Fields requestHeaders;
68      private final AtomicBoolean committed = new AtomicBoolean();
69  
70      public HttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders)
71      {
72          this.connector = connector;
73          this.configuration = configuration;
74          this.endPoint = endPoint;
75          this.pushStrategy = pushStrategy == null ? new PushStrategy.None() : pushStrategy;
76          this.stream = stream;
77          this.requestHeaders = requestHeaders;
78          Session session = stream.getSession();
79          this.version = session.getVersion();
80      }
81  
82      protected Stream getStream()
83      {
84          return stream;
85      }
86  
87      protected Fields getRequestHeaders()
88      {
89          return requestHeaders;
90      }
91  
92  
93      @Override
94      public void send(ByteBuffer responseBodyContent, boolean lastContent, Callback callback)
95      {
96          // TODO can this be more efficient?
97          send(null, responseBodyContent, lastContent, callback);
98      }
99  
100     @Override
101     public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, final Callback callback)
102     {
103         if (LOG.isDebugEnabled())
104             LOG.debug("Sending {} {} {} {} last={}", this, stream, info, BufferUtil.toDetailString(content), lastContent);
105 
106         if (stream.isClosed() || stream.isReset())
107         {
108             EofException exception = new EofException("stream closed");
109             callback.failed(exception);
110             return;
111         }
112 
113         // info==null content==null lastContent==false          should not happen
114         // info==null content==null lastContent==true           signals no more content - complete
115         // info==null content!=null lastContent==false          send data on committed response
116         // info==null content!=null lastContent==true           send last data on committed response - complete
117         // info!=null content==null lastContent==false          reply, commit
118         // info!=null content==null lastContent==true           reply, commit and complete
119         // info!=null content!=null lastContent==false          reply, commit with content
120         // info!=null content!=null lastContent==true           reply, commit with content and complete
121 
122         boolean isHeadRequest = HttpMethod.HEAD.name().equalsIgnoreCase(requestHeaders.get(HTTPSPDYHeader.METHOD.name(version)).getValue());
123         boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
124         boolean close = !hasContent && lastContent;
125 
126         if (info != null)
127         {
128             if (!committed.compareAndSet(false, true))
129             {
130                 StreamException exception = new StreamException(stream.getId(), StreamStatus.PROTOCOL_ERROR,
131                         "Stream already committed!");
132                 callback.failed(exception);
133                 if (LOG.isDebugEnabled())
134                     LOG.debug("Committed response twice.", exception);
135                 return;
136             }
137             sendReply(info, !hasContent ? callback : new Callback.Adapter()
138             {
139                 @Override
140                 public void failed(Throwable x)
141                 {
142                     callback.failed(x);
143                 }
144             }, close);
145         }
146 
147         // Do we have some content to send as well
148         if (hasContent)
149         {
150             // send the data and let it call the callback
151             if (LOG.isDebugEnabled())
152                 LOG.debug("Send content: {} on stream: {} lastContent={}", BufferUtil.toDetailString(content), stream,
153                     lastContent);
154             stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS, content, lastContent
155             ), callback);
156         }
157         // else do we need to close
158         else if (lastContent && info == null)
159         {
160             // send empty data to close and let the send call the callback
161             if (LOG.isDebugEnabled())
162                 LOG.debug("No content and lastContent=true. Sending empty ByteBuffer to close stream: {}", stream);
163             stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS,
164                     BufferUtil.EMPTY_BUFFER, lastContent), callback);
165         }
166         else if (!lastContent && !hasContent && info == null)
167             throw new IllegalStateException("not lastContent, no content and no responseInfo!");
168 
169     }
170 
171     private void sendReply(HttpGenerator.ResponseInfo info, Callback callback, boolean close)
172     {
173         Fields headers = new Fields();
174 
175         HttpVersion httpVersion = HttpVersion.HTTP_1_1;
176         headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.asString());
177 
178         int status = info.getStatus();
179         StringBuilder httpStatus = new StringBuilder().append(status);
180         String reason = info.getReason();
181         if (reason == null)
182             reason = HttpStatus.getMessage(status);
183         if (reason != null)
184             httpStatus.append(" ").append(reason);
185         headers.put(HTTPSPDYHeader.STATUS.name(version), httpStatus.toString());
186         if (LOG.isDebugEnabled())
187             LOG.debug("HTTP < {} {}", httpVersion, httpStatus);
188 
189         // TODO merge the two Field classes into one
190         HttpFields fields = info.getHttpFields();
191         if (fields != null)
192         {
193             for (int i = 0; i < fields.size(); ++i)
194             {
195                 HttpField field = fields.getField(i);
196                 String name = field.getName();
197                 String value = field.getValue();
198                 headers.add(name, value);
199                 if (LOG.isDebugEnabled())
200                     LOG.debug("HTTP < {}: {}", name, value);
201             }
202         }
203 
204         if (configuration.getSendServerVersion())
205             headers.add(HttpHeader.SERVER.asString(), HttpConfiguration.SERVER_VERSION);
206         if (configuration.getSendXPoweredBy())
207             headers.add(HttpHeader.X_POWERED_BY.asString(), HttpConfiguration.SERVER_VERSION);
208 
209         ReplyInfo reply = new ReplyInfo(headers, close);
210         if (LOG.isDebugEnabled())
211             LOG.debug("Sending reply: {} on stream: {}", reply, stream);
212         reply(stream, reply, callback);
213     }
214 
215     @Override
216     public void completed()
217     {
218         if (LOG.isDebugEnabled())
219             LOG.debug("Completed {}", this);
220     }
221 
222     private void reply(Stream stream, ReplyInfo replyInfo, Callback callback)
223     {
224         if (!stream.isUnidirectional())
225             stream.reply(replyInfo, callback);
226         else
227             stream.headers(new HeadersInfo(replyInfo.getHeaders(), replyInfo.isClose()), callback);
228 
229         Fields responseHeaders = replyInfo.getHeaders();
230         if (responseHeaders.get(HTTPSPDYHeader.STATUS.name(version)).getValue().startsWith("200") && !stream.isClosed())
231         {
232             Set<String> pushResources = pushStrategy.apply(stream, requestHeaders, responseHeaders);
233             if (pushResources.size() > 0)
234             {
235                 PushResourceCoordinator pushResourceCoordinator = new PushResourceCoordinator(pushResources);
236                 pushResourceCoordinator.coordinate();
237             }
238         }
239     }
240 
241     private static class PushHttpTransportOverSPDY extends HttpTransportOverSPDY
242     {
243         private final PushResourceCoordinator coordinator;
244         private final short version;
245 
246         private PushHttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint,
247                                           PushStrategy pushStrategy, Stream stream, Fields requestHeaders,
248                                           PushResourceCoordinator coordinator, short version)
249         {
250             super(connector, configuration, endPoint, pushStrategy, stream, requestHeaders);
251             this.coordinator = coordinator;
252             this.version = version;
253         }
254 
255         @Override
256         public void completed()
257         {
258             Stream stream = getStream();
259             if (LOG.isDebugEnabled())
260                 LOG.debug("Resource pushed for {} on {}",
261                     getRequestHeaders().get(HTTPSPDYHeader.URI.name(version)), stream);
262             coordinator.complete();
263         }
264     }
265 
266     private class PushResourceCoordinator
267     {
268         private final Queue<PushResource> queue = new ConcurrentArrayQueue<>();
269         private final Set<String> resources;
270         private AtomicBoolean active = new AtomicBoolean(false);
271 
272         private PushResourceCoordinator(Set<String> resources)
273         {
274             this.resources = resources;
275         }
276 
277         private void coordinate()
278         {
279             if (LOG.isDebugEnabled())
280                 LOG.debug("Pushing resources: {}", resources);
281             // Must send all push frames to the client at once before we
282             // return from this method and send the main resource data
283             for (String pushResource : resources)
284                 pushResource(pushResource);
285         }
286 
287         private void sendNextResourceData()
288         {
289             if (LOG.isDebugEnabled())
290                 LOG.debug("{} sendNextResourceData active: {}", hashCode(), active.get());
291             if (active.compareAndSet(false, true))
292             {
293                 PushResource resource = queue.poll();
294                 if (resource != null)
295                 {
296                     if (LOG.isDebugEnabled())
297                         LOG.debug("Opening new push channel for: {}", resource);
298                     HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(resource.getPushStream(), resource.getPushRequestHeaders());
299                     pushChannel.requestStart(resource.getPushRequestHeaders(), true);
300                     return;
301                 }
302 
303                 if (active.compareAndSet(true, false))
304                 {
305                     if (queue.peek() != null)
306                         sendNextResourceData();
307                 }
308                 else
309                 {
310                     throw new IllegalStateException("active must not be false here! Concurrency bug!");
311                 }
312             }
313         }
314 
315         private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders)
316         {
317             HttpTransport transport = new PushHttpTransportOverSPDY(connector, configuration, endPoint, pushStrategy,
318                     pushStream, pushRequestHeaders, this, version);
319             HttpInputOverSPDY input = new HttpInputOverSPDY();
320             return new HttpChannelOverSPDY(connector, configuration, endPoint, transport, input, pushStream);
321         }
322 
323         private void pushResource(String pushResource)
324         {
325             Fields.Field scheme = requestHeaders.get(HTTPSPDYHeader.SCHEME.name(version));
326             Fields.Field host = requestHeaders.get(HTTPSPDYHeader.HOST.name(version));
327             Fields.Field uri = requestHeaders.get(HTTPSPDYHeader.URI.name(version));
328             final Fields pushHeaders = createPushHeaders(scheme, host, pushResource);
329             final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource);
330 
331             stream.push(new PushInfo(pushHeaders, false), new Promise<Stream>()
332             {
333                 @Override
334                 public void succeeded(Stream pushStream)
335                 {
336                     if (LOG.isDebugEnabled())
337                         LOG.debug("Headers pushed for {} on {}", pushHeaders.get(HTTPSPDYHeader.URI.name(version)), pushStream);
338                     queue.offer(new PushResource(pushStream, pushRequestHeaders));
339                     sendNextResourceData();
340                 }
341 
342                 @Override
343                 public void failed(Throwable x)
344                 {
345                     LOG.debug("Creating push stream failed.", x);
346                     sendNextResourceData();
347                 }
348             });
349         }
350 
351         private void complete()
352         {
353             if (!active.compareAndSet(true, false))
354                 throw new IllegalStateException();
355             sendNextResourceData();
356         }
357 
358         private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath)
359         {
360             final Fields newRequestHeaders = new Fields(requestHeaders, false);
361             newRequestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
362             newRequestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
363             newRequestHeaders.put(scheme);
364             newRequestHeaders.put(host);
365             newRequestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
366             String referrer = scheme.getValue() + "://" + host.getValue() + uri.getValue();
367             newRequestHeaders.put("referer", referrer);
368             newRequestHeaders.put("x-spdy-push", "true");
369             return newRequestHeaders;
370         }
371 
372         private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath)
373         {
374             final Fields pushHeaders = new Fields();
375             if (version == SPDY.V2)
376                 pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.getValue() + "://" + host.getValue() + pushResourcePath);
377             else
378             {
379                 pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
380                 pushHeaders.put(scheme);
381                 pushHeaders.put(host);
382             }
383             return pushHeaders;
384         }
385     }
386 
387     private static class PushResource
388     {
389         private final Stream pushStream;
390         private final Fields pushRequestHeaders;
391 
392         public PushResource(Stream pushStream, Fields pushRequestHeaders)
393         {
394             this.pushStream = pushStream;
395             this.pushRequestHeaders = pushRequestHeaders;
396         }
397 
398         public Stream getPushStream()
399         {
400             return pushStream;
401         }
402 
403         public Fields getPushRequestHeaders()
404         {
405             return pushRequestHeaders;
406         }
407 
408         @Override
409         public String toString()
410         {
411             return "PushResource{" +
412                     "pushStream=" + pushStream +
413                     ", pushRequestHeaders=" + pushRequestHeaders +
414                     '}';
415         }
416     }
417 
418     @Override
419     public void abort()
420     {
421         // TODO close the stream in a way to indicate an incomplete response?
422     }
423 }