View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy.server.http;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.Queue;
24  import java.util.Set;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  
28  import org.eclipse.jetty.http.HttpField;
29  import org.eclipse.jetty.http.HttpFields;
30  import org.eclipse.jetty.http.HttpGenerator;
31  import org.eclipse.jetty.http.HttpHeader;
32  import org.eclipse.jetty.http.HttpMethod;
33  import org.eclipse.jetty.http.HttpStatus;
34  import org.eclipse.jetty.http.HttpVersion;
35  import org.eclipse.jetty.io.EndPoint;
36  import org.eclipse.jetty.io.EofException;
37  import org.eclipse.jetty.server.Connector;
38  import org.eclipse.jetty.server.HttpConfiguration;
39  import org.eclipse.jetty.server.HttpTransport;
40  import org.eclipse.jetty.spdy.StreamException;
41  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
42  import org.eclipse.jetty.spdy.api.HeadersInfo;
43  import org.eclipse.jetty.spdy.api.PushInfo;
44  import org.eclipse.jetty.spdy.api.ReplyInfo;
45  import org.eclipse.jetty.spdy.api.SPDY;
46  import org.eclipse.jetty.spdy.api.Session;
47  import org.eclipse.jetty.spdy.api.Stream;
48  import org.eclipse.jetty.spdy.api.StreamStatus;
49  import org.eclipse.jetty.util.BlockingCallback;
50  import org.eclipse.jetty.util.BufferUtil;
51  import org.eclipse.jetty.util.Callback;
52  import org.eclipse.jetty.util.ConcurrentArrayQueue;
53  import org.eclipse.jetty.util.Fields;
54  import org.eclipse.jetty.util.Promise;
55  import org.eclipse.jetty.util.log.Log;
56  import org.eclipse.jetty.util.log.Logger;
57  
58  public class HttpTransportOverSPDY implements HttpTransport
59  {
60      private static final Logger LOG = Log.getLogger(HttpTransportOverSPDY.class);
61  
62      private final Connector connector;
63      private final HttpConfiguration configuration;
64      private final EndPoint endPoint;
65      private final PushStrategy pushStrategy;
66      private final Stream stream;
67      private final short version;
68      private final Fields requestHeaders;
69      private final BlockingCallback streamBlocker = new BlockingCallback();
70      private final AtomicBoolean committed = new AtomicBoolean();
71  
72      public HttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders)
73      {
74          this.connector = connector;
75          this.configuration = configuration;
76          this.endPoint = endPoint;
77          this.pushStrategy = pushStrategy == null ? new PushStrategy.None() : pushStrategy;
78          this.stream = stream;
79          this.requestHeaders = requestHeaders;
80          Session session = stream.getSession();
81          this.version = session.getVersion();
82      }
83  
84      protected Stream getStream()
85      {
86          return stream;
87      }
88  
89      protected Fields getRequestHeaders()
90      {
91          return requestHeaders;
92      }
93  
94  
95      @Override
96      public void send(ByteBuffer responseBodyContent, boolean lastContent, Callback callback)
97      {
98          // TODO can this be more efficient?
99          send(null, responseBodyContent, lastContent, callback);
100     }
101 
102     @Override
103     public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, final Callback callback)
104     {
105         if (LOG.isDebugEnabled())
106             LOG.debug("Sending {} {} {} {} last={}", this, stream, info, BufferUtil.toDetailString(content), lastContent);
107 
108         if (stream.isClosed() || stream.isReset())
109         {
110             EofException exception = new EofException("stream closed");
111             callback.failed(exception);
112             return;
113         }
114 
115         // info==null content==null lastContent==false          should not happen
116         // info==null content==null lastContent==true           signals no more content - complete
117         // info==null content!=null lastContent==false          send data on committed response
118         // info==null content!=null lastContent==true           send last data on committed response - complete
119         // info!=null content==null lastContent==false          reply, commit
120         // info!=null content==null lastContent==true           reply, commit and complete
121         // info!=null content!=null lastContent==false          reply, commit with content
122         // info!=null content!=null lastContent==true           reply, commit with content and complete
123 
124         boolean isHeadRequest = HttpMethod.HEAD.name().equalsIgnoreCase(requestHeaders.get(HTTPSPDYHeader.METHOD.name(version)).value());
125         boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
126         boolean close = !hasContent && lastContent;
127 
128         if (info != null)
129         {
130             if (!committed.compareAndSet(false, true))
131             {
132                 StreamException exception = new StreamException(stream.getId(), StreamStatus.PROTOCOL_ERROR,
133                         "Stream already committed!");
134                 callback.failed(exception);
135                 LOG.warn("Committed response twice.", exception);
136                 return;
137             }
138             sendReply(info, !hasContent ? callback : new Callback.Adapter()
139             {
140                 @Override
141                 public void failed(Throwable x)
142                 {
143                     callback.failed(x);
144                 }
145             }, close);
146         }
147 
148         // Do we have some content to send as well
149         if (hasContent)
150         {
151             // send the data and let it call the callback
152             LOG.debug("Send content: {} on stream: {} lastContent={}", BufferUtil.toDetailString(content), stream,
153                     lastContent);
154             stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS, content, lastContent
155             ), callback);
156         }
157         // else do we need to close
158         else if (lastContent && info == null)
159         {
160             // send empty data to close and let the send call the callback
161             LOG.debug("No content and lastContent=true. Sending empty ByteBuffer to close stream: {}", stream);
162             stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS,
163                     BufferUtil.EMPTY_BUFFER, lastContent), callback);
164         }
165         else if (!lastContent && !hasContent && info == null)
166             throw new IllegalStateException("not lastContent, no content and no responseInfo!");
167 
168     }
169 
170     private void sendReply(HttpGenerator.ResponseInfo info, Callback callback, boolean close)
171     {
172         Fields headers = new Fields();
173 
174         HttpVersion httpVersion = HttpVersion.HTTP_1_1;
175         headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.asString());
176 
177         int status = info.getStatus();
178         StringBuilder httpStatus = new StringBuilder().append(status);
179         String reason = info.getReason();
180         if (reason == null)
181             reason = HttpStatus.getMessage(status);
182         if (reason != null)
183             httpStatus.append(" ").append(reason);
184         headers.put(HTTPSPDYHeader.STATUS.name(version), httpStatus.toString());
185         LOG.debug("HTTP < {} {}", httpVersion, httpStatus);
186 
187         // TODO merge the two Field classes into one
188         HttpFields fields = info.getHttpFields();
189         if (fields != null)
190         {
191             for (int i = 0; i < fields.size(); ++i)
192             {
193                 HttpField field = fields.getField(i);
194                 String name = field.getName();
195                 String value = field.getValue();
196                 headers.add(name, value);
197                 LOG.debug("HTTP < {}: {}", name, value);
198             }
199         }
200 
201         if (configuration.getSendServerVersion())
202             headers.add(HttpHeader.SERVER.asString(), HttpConfiguration.SERVER_VERSION);
203         if (configuration.getSendXPoweredBy())
204             headers.add(HttpHeader.X_POWERED_BY.asString(), HttpConfiguration.SERVER_VERSION);
205 
206         ReplyInfo reply = new ReplyInfo(headers, close);
207         LOG.debug("Sending reply: {} on stream: {}", reply, stream);
208         reply(stream, reply, callback);
209     }
210 
211     @Override
212     public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
213     {
214         send(info, content, lastContent, streamBlocker);
215         try
216         {
217             streamBlocker.block();
218         }
219         catch (Exception e)
220         {
221             LOG.debug(e);
222         }
223     }
224 
225     @Override
226     public void completed()
227     {
228         LOG.debug("Completed {}", this);
229     }
230 
231     private void reply(Stream stream, ReplyInfo replyInfo, Callback callback)
232     {
233         if (!stream.isUnidirectional())
234             stream.reply(replyInfo, callback);
235         else
236             stream.headers(new HeadersInfo(replyInfo.getHeaders(), replyInfo.isClose()), callback);
237 
238         Fields responseHeaders = replyInfo.getHeaders();
239         if (responseHeaders.get(HTTPSPDYHeader.STATUS.name(version)).value().startsWith("200") && !stream.isClosed())
240         {
241             Set<String> pushResources = pushStrategy.apply(stream, requestHeaders, responseHeaders);
242             if (pushResources.size() > 0)
243             {
244                 PushResourceCoordinator pushResourceCoordinator = new PushResourceCoordinator(pushResources);
245                 pushResourceCoordinator.coordinate();
246             }
247         }
248     }
249 
250     private static class PushHttpTransportOverSPDY extends HttpTransportOverSPDY
251     {
252         private final PushResourceCoordinator coordinator;
253         private final short version;
254 
255         private PushHttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint,
256                                           PushStrategy pushStrategy, Stream stream, Fields requestHeaders,
257                                           PushResourceCoordinator coordinator, short version)
258         {
259             super(connector, configuration, endPoint, pushStrategy, stream, requestHeaders);
260             this.coordinator = coordinator;
261             this.version = version;
262         }
263 
264         @Override
265         public void completed()
266         {
267             Stream stream = getStream();
268             LOG.debug("Resource pushed for {} on {}",
269                     getRequestHeaders().get(HTTPSPDYHeader.URI.name(version)), stream);
270             coordinator.complete();
271         }
272     }
273 
274     private class PushResourceCoordinator
275     {
276         private final Queue<PushResource> queue = new ConcurrentArrayQueue<>();
277         private final Set<String> resources;
278         private AtomicBoolean active = new AtomicBoolean(false);
279 
280         private PushResourceCoordinator(Set<String> resources)
281         {
282             this.resources = resources;
283         }
284 
285         private void coordinate()
286         {
287             LOG.debug("Pushing resources: {}", resources);
288             // Must send all push frames to the client at once before we
289             // return from this method and send the main resource data
290             for (String pushResource : resources)
291                 pushResource(pushResource);
292         }
293 
294         private void sendNextResourceData()
295         {
296             LOG.debug("{} sendNextResourceData active: {}", hashCode(), active.get());
297             if (active.compareAndSet(false, true))
298             {
299                 PushResource resource = queue.poll();
300                 if (resource != null)
301                 {
302                     LOG.debug("Opening new push channel for: {}", resource);
303                     HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(resource.getPushStream(), resource.getPushRequestHeaders());
304                     pushChannel.requestStart(resource.getPushRequestHeaders(), true);
305                     return;
306                 }
307 
308                 if (active.compareAndSet(true, false))
309                 {
310                     if (queue.peek() != null)
311                         sendNextResourceData();
312                 }
313                 else
314                 {
315                     throw new IllegalStateException("active must not be false here! Concurrency bug!");
316                 }
317             }
318         }
319 
320         private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders)
321         {
322             HttpTransport transport = new PushHttpTransportOverSPDY(connector, configuration, endPoint, pushStrategy,
323                     pushStream, pushRequestHeaders, this, version);
324             HttpInputOverSPDY input = new HttpInputOverSPDY();
325             return new HttpChannelOverSPDY(connector, configuration, endPoint, transport, input, pushStream);
326         }
327 
328         private void pushResource(String pushResource)
329         {
330             Fields.Field scheme = requestHeaders.get(HTTPSPDYHeader.SCHEME.name(version));
331             Fields.Field host = requestHeaders.get(HTTPSPDYHeader.HOST.name(version));
332             Fields.Field uri = requestHeaders.get(HTTPSPDYHeader.URI.name(version));
333             final Fields pushHeaders = createPushHeaders(scheme, host, pushResource);
334             final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource);
335 
336             stream.push(new PushInfo(pushHeaders, false), new Promise<Stream>()
337             {
338                 @Override
339                 public void succeeded(Stream pushStream)
340                 {
341                     LOG.debug("Headers pushed for {} on {}", pushHeaders.get(HTTPSPDYHeader.URI.name(version)), pushStream);
342                     queue.offer(new PushResource(pushStream, pushRequestHeaders));
343                     sendNextResourceData();
344                 }
345 
346                 @Override
347                 public void failed(Throwable x)
348                 {
349                     LOG.debug("Creating push stream failed.", x);
350                     sendNextResourceData();
351                 }
352             });
353         }
354 
355         private void complete()
356         {
357             if (!active.compareAndSet(true, false))
358                 throw new IllegalStateException();
359             sendNextResourceData();
360         }
361 
362         private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath)
363         {
364             final Fields newRequestHeaders = new Fields(requestHeaders, false);
365             newRequestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
366             newRequestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
367             newRequestHeaders.put(scheme);
368             newRequestHeaders.put(host);
369             newRequestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
370             String referrer = scheme.value() + "://" + host.value() + uri.value();
371             newRequestHeaders.put("referer", referrer);
372             newRequestHeaders.put("x-spdy-push", "true");
373             return newRequestHeaders;
374         }
375 
376         private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath)
377         {
378             final Fields pushHeaders = new Fields();
379             if (version == SPDY.V2)
380                 pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath);
381             else
382             {
383                 pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
384                 pushHeaders.put(scheme);
385                 pushHeaders.put(host);
386             }
387             return pushHeaders;
388         }
389     }
390 
391     private static class PushResource
392     {
393         private final Stream pushStream;
394         private final Fields pushRequestHeaders;
395 
396         public PushResource(Stream pushStream, Fields pushRequestHeaders)
397         {
398             this.pushStream = pushStream;
399             this.pushRequestHeaders = pushRequestHeaders;
400         }
401 
402         public Stream getPushStream()
403         {
404             return pushStream;
405         }
406 
407         public Fields getPushRequestHeaders()
408         {
409             return pushRequestHeaders;
410         }
411 
412         @Override
413         public String toString()
414         {
415             return "PushResource{" +
416                     "pushStream=" + pushStream +
417                     ", pushRequestHeaders=" + pushRequestHeaders +
418                     '}';
419         }
420     }
421 }