View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy.server.http;
20  
21  import java.nio.ByteBuffer;
22  import java.util.Queue;
23  import java.util.Set;
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  
27  import org.eclipse.jetty.http.HttpField;
28  import org.eclipse.jetty.http.HttpFields;
29  import org.eclipse.jetty.http.HttpGenerator;
30  import org.eclipse.jetty.http.HttpHeader;
31  import org.eclipse.jetty.http.HttpMethod;
32  import org.eclipse.jetty.http.HttpStatus;
33  import org.eclipse.jetty.http.HttpVersion;
34  import org.eclipse.jetty.io.EndPoint;
35  import org.eclipse.jetty.io.EofException;
36  import org.eclipse.jetty.server.Connector;
37  import org.eclipse.jetty.server.HttpConfiguration;
38  import org.eclipse.jetty.server.HttpTransport;
39  import org.eclipse.jetty.spdy.StreamException;
40  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
41  import org.eclipse.jetty.spdy.api.HeadersInfo;
42  import org.eclipse.jetty.spdy.api.PushInfo;
43  import org.eclipse.jetty.spdy.api.ReplyInfo;
44  import org.eclipse.jetty.spdy.api.SPDY;
45  import org.eclipse.jetty.spdy.api.Session;
46  import org.eclipse.jetty.spdy.api.Stream;
47  import org.eclipse.jetty.spdy.api.StreamStatus;
48  import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
49  import org.eclipse.jetty.util.BlockingCallback;
50  import org.eclipse.jetty.util.BufferUtil;
51  import org.eclipse.jetty.util.Callback;
52  import org.eclipse.jetty.util.ConcurrentArrayQueue;
53  import org.eclipse.jetty.util.Fields;
54  import org.eclipse.jetty.util.Promise;
55  import org.eclipse.jetty.util.log.Log;
56  import org.eclipse.jetty.util.log.Logger;
57  
58  public class HttpTransportOverSPDY implements HttpTransport
59  {
60      private static final Logger LOG = Log.getLogger(HttpTransportOverSPDY.class);
61  
62      private final Connector connector;
63      private final HttpConfiguration configuration;
64      private final EndPoint endPoint;
65      private final PushStrategy pushStrategy;
66      private final Stream stream;
67      private final short version;
68      private final Fields requestHeaders;
69      private final BlockingCallback streamBlocker = new BlockingCallback();
70      private final AtomicBoolean committed = new AtomicBoolean();
71  
72      public HttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders)
73      {
74          this.connector = connector;
75          this.configuration = configuration;
76          this.endPoint = endPoint;
77          this.pushStrategy = pushStrategy == null ? new PushStrategy.None() : pushStrategy;
78          this.stream = stream;
79          this.requestHeaders = requestHeaders;
80          Session session = stream.getSession();
81          this.version = session.getVersion();
82      }
83  
84      protected Stream getStream()
85      {
86          return stream;
87      }
88  
89      protected Fields getRequestHeaders()
90      {
91          return requestHeaders;
92      }
93  
94  
95      @Override
96      public void send(ByteBuffer responseBodyContent, boolean lastContent, Callback callback)
97      {
98          // TODO can this be more efficient?
99          send(null, responseBodyContent, lastContent, callback);
100     }
101 
102     @Override
103     public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, final Callback callback)
104     {
105         if (LOG.isDebugEnabled())
106             LOG.debug("Sending {} {} {} {} last={}", this, stream, info, BufferUtil.toDetailString(content), lastContent);
107 
108         if (stream.isClosed() || stream.isReset())
109         {
110             EofException exception = new EofException("stream closed");
111             callback.failed(exception);
112             return;
113         }
114 
115         // info==null content==null lastContent==false          should not happen
116         // info==null content==null lastContent==true           signals no more content - complete
117         // info==null content!=null lastContent==false          send data on committed response
118         // info==null content!=null lastContent==true           send last data on committed response - complete
119         // info!=null content==null lastContent==false          reply, commit
120         // info!=null content==null lastContent==true           reply, commit and complete
121         // info!=null content!=null lastContent==false          reply, commit with content
122         // info!=null content!=null lastContent==true           reply, commit with content and complete
123 
124         boolean isHeadRequest = HttpMethod.HEAD.name().equalsIgnoreCase(requestHeaders.get(HTTPSPDYHeader.METHOD.name(version)).getValue());
125         boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
126         boolean close = !hasContent && lastContent;
127 
128         if (info != null)
129         {
130             if (!committed.compareAndSet(false, true))
131             {
132                 StreamException exception = new StreamException(stream.getId(), StreamStatus.PROTOCOL_ERROR,
133                         "Stream already committed!");
134                 callback.failed(exception);
135                 LOG.debug("Committed response twice.", exception);
136                 return;
137             }
138             sendReply(info, !hasContent ? callback : new Callback.Adapter()
139             {
140                 @Override
141                 public void failed(Throwable x)
142                 {
143                     callback.failed(x);
144                 }
145             }, close);
146         }
147 
148         // Do we have some content to send as well
149         if (hasContent)
150         {
151             // send the data and let it call the callback
152             LOG.debug("Send content: {} on stream: {} lastContent={}", BufferUtil.toDetailString(content), stream,
153                     lastContent);
154             stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS, content, lastContent
155             ), callback);
156         }
157         // else do we need to close
158         else if (lastContent && info == null)
159         {
160             // send empty data to close and let the send call the callback
161             LOG.debug("No content and lastContent=true. Sending empty ByteBuffer to close stream: {}", stream);
162             stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS,
163                     BufferUtil.EMPTY_BUFFER, lastContent), callback);
164         }
165         else if (!lastContent && !hasContent && info == null)
166             throw new IllegalStateException("not lastContent, no content and no responseInfo!");
167 
168     }
169 
170     private void sendReply(HttpGenerator.ResponseInfo info, Callback callback, boolean close)
171     {
172         Fields headers = new Fields();
173 
174         HttpVersion httpVersion = HttpVersion.HTTP_1_1;
175         headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.asString());
176 
177         int status = info.getStatus();
178         StringBuilder httpStatus = new StringBuilder().append(status);
179         String reason = info.getReason();
180         if (reason == null)
181             reason = HttpStatus.getMessage(status);
182         if (reason != null)
183             httpStatus.append(" ").append(reason);
184         headers.put(HTTPSPDYHeader.STATUS.name(version), httpStatus.toString());
185         LOG.debug("HTTP < {} {}", httpVersion, httpStatus);
186 
187         // TODO merge the two Field classes into one
188         HttpFields fields = info.getHttpFields();
189         if (fields != null)
190         {
191             for (int i = 0; i < fields.size(); ++i)
192             {
193                 HttpField field = fields.getField(i);
194                 String name = field.getName();
195                 String value = field.getValue();
196                 headers.add(name, value);
197                 LOG.debug("HTTP < {}: {}", name, value);
198             }
199         }
200 
201         if (configuration.getSendServerVersion())
202             headers.add(HttpHeader.SERVER.asString(), HttpConfiguration.SERVER_VERSION);
203         if (configuration.getSendXPoweredBy())
204             headers.add(HttpHeader.X_POWERED_BY.asString(), HttpConfiguration.SERVER_VERSION);
205 
206         ReplyInfo reply = new ReplyInfo(headers, close);
207         LOG.debug("Sending reply: {} on stream: {}", reply, stream);
208         reply(stream, reply, callback);
209     }
210 
211     @Override
212     public void completed()
213     {
214         LOG.debug("Completed {}", this);
215     }
216 
217     private void reply(Stream stream, ReplyInfo replyInfo, Callback callback)
218     {
219         if (!stream.isUnidirectional())
220             stream.reply(replyInfo, callback);
221         else
222             stream.headers(new HeadersInfo(replyInfo.getHeaders(), replyInfo.isClose()), callback);
223 
224         Fields responseHeaders = replyInfo.getHeaders();
225         if (responseHeaders.get(HTTPSPDYHeader.STATUS.name(version)).getValue().startsWith("200") && !stream.isClosed())
226         {
227             Set<String> pushResources = pushStrategy.apply(stream, requestHeaders, responseHeaders);
228             if (pushResources.size() > 0)
229             {
230                 PushResourceCoordinator pushResourceCoordinator = new PushResourceCoordinator(pushResources);
231                 pushResourceCoordinator.coordinate();
232             }
233         }
234     }
235 
236     private static class PushHttpTransportOverSPDY extends HttpTransportOverSPDY
237     {
238         private final PushResourceCoordinator coordinator;
239         private final short version;
240 
241         private PushHttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint,
242                                           PushStrategy pushStrategy, Stream stream, Fields requestHeaders,
243                                           PushResourceCoordinator coordinator, short version)
244         {
245             super(connector, configuration, endPoint, pushStrategy, stream, requestHeaders);
246             this.coordinator = coordinator;
247             this.version = version;
248         }
249 
250         @Override
251         public void completed()
252         {
253             Stream stream = getStream();
254             LOG.debug("Resource pushed for {} on {}",
255                     getRequestHeaders().get(HTTPSPDYHeader.URI.name(version)), stream);
256             coordinator.complete();
257         }
258     }
259 
260     private class PushResourceCoordinator
261     {
262         private final Queue<PushResource> queue = new ConcurrentArrayQueue<>();
263         private final Set<String> resources;
264         private AtomicBoolean active = new AtomicBoolean(false);
265 
266         private PushResourceCoordinator(Set<String> resources)
267         {
268             this.resources = resources;
269         }
270 
271         private void coordinate()
272         {
273             LOG.debug("Pushing resources: {}", resources);
274             // Must send all push frames to the client at once before we
275             // return from this method and send the main resource data
276             for (String pushResource : resources)
277                 pushResource(pushResource);
278         }
279 
280         private void sendNextResourceData()
281         {
282             LOG.debug("{} sendNextResourceData active: {}", hashCode(), active.get());
283             if (active.compareAndSet(false, true))
284             {
285                 PushResource resource = queue.poll();
286                 if (resource != null)
287                 {
288                     LOG.debug("Opening new push channel for: {}", resource);
289                     HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(resource.getPushStream(), resource.getPushRequestHeaders());
290                     pushChannel.requestStart(resource.getPushRequestHeaders(), true);
291                     return;
292                 }
293 
294                 if (active.compareAndSet(true, false))
295                 {
296                     if (queue.peek() != null)
297                         sendNextResourceData();
298                 }
299                 else
300                 {
301                     throw new IllegalStateException("active must not be false here! Concurrency bug!");
302                 }
303             }
304         }
305 
306         private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders)
307         {
308             HttpTransport transport = new PushHttpTransportOverSPDY(connector, configuration, endPoint, pushStrategy,
309                     pushStream, pushRequestHeaders, this, version);
310             HttpInputOverSPDY input = new HttpInputOverSPDY();
311             return new HttpChannelOverSPDY(connector, configuration, endPoint, transport, input, pushStream);
312         }
313 
314         private void pushResource(String pushResource)
315         {
316             Fields.Field scheme = requestHeaders.get(HTTPSPDYHeader.SCHEME.name(version));
317             Fields.Field host = requestHeaders.get(HTTPSPDYHeader.HOST.name(version));
318             Fields.Field uri = requestHeaders.get(HTTPSPDYHeader.URI.name(version));
319             final Fields pushHeaders = createPushHeaders(scheme, host, pushResource);
320             final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource);
321 
322             stream.push(new PushInfo(pushHeaders, false), new Promise<Stream>()
323             {
324                 @Override
325                 public void succeeded(Stream pushStream)
326                 {
327                     LOG.debug("Headers pushed for {} on {}", pushHeaders.get(HTTPSPDYHeader.URI.name(version)), pushStream);
328                     queue.offer(new PushResource(pushStream, pushRequestHeaders));
329                     sendNextResourceData();
330                 }
331 
332                 @Override
333                 public void failed(Throwable x)
334                 {
335                     LOG.debug("Creating push stream failed.", x);
336                     sendNextResourceData();
337                 }
338             });
339         }
340 
341         private void complete()
342         {
343             if (!active.compareAndSet(true, false))
344                 throw new IllegalStateException();
345             sendNextResourceData();
346         }
347 
348         private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath)
349         {
350             final Fields newRequestHeaders = new Fields(requestHeaders, false);
351             newRequestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
352             newRequestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
353             newRequestHeaders.put(scheme);
354             newRequestHeaders.put(host);
355             newRequestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
356             String referrer = scheme.getValue() + "://" + host.getValue() + uri.getValue();
357             newRequestHeaders.put("referer", referrer);
358             newRequestHeaders.put("x-spdy-push", "true");
359             return newRequestHeaders;
360         }
361 
362         private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath)
363         {
364             final Fields pushHeaders = new Fields();
365             if (version == SPDY.V2)
366                 pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.getValue() + "://" + host.getValue() + pushResourcePath);
367             else
368             {
369                 pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
370                 pushHeaders.put(scheme);
371                 pushHeaders.put(host);
372             }
373             return pushHeaders;
374         }
375     }
376 
377     private static class PushResource
378     {
379         private final Stream pushStream;
380         private final Fields pushRequestHeaders;
381 
382         public PushResource(Stream pushStream, Fields pushRequestHeaders)
383         {
384             this.pushStream = pushStream;
385             this.pushRequestHeaders = pushRequestHeaders;
386         }
387 
388         public Stream getPushStream()
389         {
390             return pushStream;
391         }
392 
393         public Fields getPushRequestHeaders()
394         {
395             return pushRequestHeaders;
396         }
397 
398         @Override
399         public String toString()
400         {
401             return "PushResource{" +
402                     "pushStream=" + pushStream +
403                     ", pushRequestHeaders=" + pushRequestHeaders +
404                     '}';
405         }
406     }
407 }