1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy.server.http;
20
21 import java.nio.ByteBuffer;
22 import java.util.Queue;
23 import java.util.Set;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 import org.eclipse.jetty.http.HttpField;
28 import org.eclipse.jetty.http.HttpFields;
29 import org.eclipse.jetty.http.HttpGenerator;
30 import org.eclipse.jetty.http.HttpHeader;
31 import org.eclipse.jetty.http.HttpMethod;
32 import org.eclipse.jetty.http.HttpStatus;
33 import org.eclipse.jetty.http.HttpVersion;
34 import org.eclipse.jetty.io.EndPoint;
35 import org.eclipse.jetty.io.EofException;
36 import org.eclipse.jetty.server.Connector;
37 import org.eclipse.jetty.server.HttpConfiguration;
38 import org.eclipse.jetty.server.HttpTransport;
39 import org.eclipse.jetty.spdy.StreamException;
40 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
41 import org.eclipse.jetty.spdy.api.HeadersInfo;
42 import org.eclipse.jetty.spdy.api.PushInfo;
43 import org.eclipse.jetty.spdy.api.ReplyInfo;
44 import org.eclipse.jetty.spdy.api.SPDY;
45 import org.eclipse.jetty.spdy.api.Session;
46 import org.eclipse.jetty.spdy.api.Stream;
47 import org.eclipse.jetty.spdy.api.StreamStatus;
48 import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
49 import org.eclipse.jetty.util.BufferUtil;
50 import org.eclipse.jetty.util.Callback;
51 import org.eclipse.jetty.util.ConcurrentArrayQueue;
52 import org.eclipse.jetty.util.Fields;
53 import org.eclipse.jetty.util.Promise;
54 import org.eclipse.jetty.util.log.Log;
55 import org.eclipse.jetty.util.log.Logger;
56
57 public class HttpTransportOverSPDY implements HttpTransport
58 {
59 private static final Logger LOG = Log.getLogger(HttpTransportOverSPDY.class);
60
61 private final Connector connector;
62 private final HttpConfiguration configuration;
63 private final EndPoint endPoint;
64 private final PushStrategy pushStrategy;
65 private final Stream stream;
66 private final short version;
67 private final Fields requestHeaders;
68 private final AtomicBoolean committed = new AtomicBoolean();
69
70 public HttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders)
71 {
72 this.connector = connector;
73 this.configuration = configuration;
74 this.endPoint = endPoint;
75 this.pushStrategy = pushStrategy == null ? new PushStrategy.None() : pushStrategy;
76 this.stream = stream;
77 this.requestHeaders = requestHeaders;
78 Session session = stream.getSession();
79 this.version = session.getVersion();
80 }
81
82 protected Stream getStream()
83 {
84 return stream;
85 }
86
87 protected Fields getRequestHeaders()
88 {
89 return requestHeaders;
90 }
91
92
93 @Override
94 public void send(ByteBuffer responseBodyContent, boolean lastContent, Callback callback)
95 {
96
97 send(null, responseBodyContent, lastContent, callback);
98 }
99
100 @Override
101 public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, final Callback callback)
102 {
103 if (LOG.isDebugEnabled())
104 LOG.debug("Sending {} {} {} {} last={}", this, stream, info, BufferUtil.toDetailString(content), lastContent);
105
106 if (stream.isClosed() || stream.isReset())
107 {
108 EofException exception = new EofException("stream closed");
109 callback.failed(exception);
110 return;
111 }
112
113
114
115
116
117
118
119
120
121
122 boolean isHeadRequest = HttpMethod.HEAD.name().equalsIgnoreCase(requestHeaders.get(HTTPSPDYHeader.METHOD.name(version)).getValue());
123 boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
124 boolean close = !hasContent && lastContent;
125
126 if (info != null)
127 {
128 if (!committed.compareAndSet(false, true))
129 {
130 StreamException exception = new StreamException(stream.getId(), StreamStatus.PROTOCOL_ERROR,
131 "Stream already committed!");
132 callback.failed(exception);
133 if (LOG.isDebugEnabled())
134 LOG.debug("Committed response twice.", exception);
135 return;
136 }
137 sendReply(info, !hasContent ? callback : new Callback.Adapter()
138 {
139 @Override
140 public void failed(Throwable x)
141 {
142 callback.failed(x);
143 }
144 }, close);
145 }
146
147
148 if (hasContent)
149 {
150
151 if (LOG.isDebugEnabled())
152 LOG.debug("Send content: {} on stream: {} lastContent={}", BufferUtil.toDetailString(content), stream,
153 lastContent);
154 stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS, content, lastContent
155 ), callback);
156 }
157
158 else if (lastContent && info == null)
159 {
160
161 if (LOG.isDebugEnabled())
162 LOG.debug("No content and lastContent=true. Sending empty ByteBuffer to close stream: {}", stream);
163 stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS,
164 BufferUtil.EMPTY_BUFFER, lastContent), callback);
165 }
166 else if (!lastContent && !hasContent && info == null)
167 throw new IllegalStateException("not lastContent, no content and no responseInfo!");
168
169 }
170
171 private void sendReply(HttpGenerator.ResponseInfo info, Callback callback, boolean close)
172 {
173 Fields headers = new Fields();
174
175 HttpVersion httpVersion = HttpVersion.HTTP_1_1;
176 headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.asString());
177
178 int status = info.getStatus();
179 StringBuilder httpStatus = new StringBuilder().append(status);
180 String reason = info.getReason();
181 if (reason == null)
182 reason = HttpStatus.getMessage(status);
183 if (reason != null)
184 httpStatus.append(" ").append(reason);
185 headers.put(HTTPSPDYHeader.STATUS.name(version), httpStatus.toString());
186 if (LOG.isDebugEnabled())
187 LOG.debug("HTTP < {} {}", httpVersion, httpStatus);
188
189
190 HttpFields fields = info.getHttpFields();
191 if (fields != null)
192 {
193 for (int i = 0; i < fields.size(); ++i)
194 {
195 HttpField field = fields.getField(i);
196 String name = field.getName();
197 String value = field.getValue();
198 headers.add(name, value);
199 if (LOG.isDebugEnabled())
200 LOG.debug("HTTP < {}: {}", name, value);
201 }
202 }
203
204 if (configuration.getSendServerVersion())
205 headers.add(HttpHeader.SERVER.asString(), HttpConfiguration.SERVER_VERSION);
206 if (configuration.getSendXPoweredBy())
207 headers.add(HttpHeader.X_POWERED_BY.asString(), HttpConfiguration.SERVER_VERSION);
208
209 ReplyInfo reply = new ReplyInfo(headers, close);
210 if (LOG.isDebugEnabled())
211 LOG.debug("Sending reply: {} on stream: {}", reply, stream);
212 reply(stream, reply, callback);
213 }
214
215 @Override
216 public void completed()
217 {
218 if (LOG.isDebugEnabled())
219 LOG.debug("Completed {}", this);
220 }
221
222 private void reply(Stream stream, ReplyInfo replyInfo, Callback callback)
223 {
224 if (!stream.isUnidirectional())
225 stream.reply(replyInfo, callback);
226 else
227 stream.headers(new HeadersInfo(replyInfo.getHeaders(), replyInfo.isClose()), callback);
228
229 Fields responseHeaders = replyInfo.getHeaders();
230 if (responseHeaders.get(HTTPSPDYHeader.STATUS.name(version)).getValue().startsWith("200") && !stream.isClosed())
231 {
232 Set<String> pushResources = pushStrategy.apply(stream, requestHeaders, responseHeaders);
233 if (pushResources.size() > 0)
234 {
235 PushResourceCoordinator pushResourceCoordinator = new PushResourceCoordinator(pushResources);
236 pushResourceCoordinator.coordinate();
237 }
238 }
239 }
240
241 private static class PushHttpTransportOverSPDY extends HttpTransportOverSPDY
242 {
243 private final PushResourceCoordinator coordinator;
244 private final short version;
245
246 private PushHttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint,
247 PushStrategy pushStrategy, Stream stream, Fields requestHeaders,
248 PushResourceCoordinator coordinator, short version)
249 {
250 super(connector, configuration, endPoint, pushStrategy, stream, requestHeaders);
251 this.coordinator = coordinator;
252 this.version = version;
253 }
254
255 @Override
256 public void completed()
257 {
258 Stream stream = getStream();
259 if (LOG.isDebugEnabled())
260 LOG.debug("Resource pushed for {} on {}",
261 getRequestHeaders().get(HTTPSPDYHeader.URI.name(version)), stream);
262 coordinator.complete();
263 }
264 }
265
266 private class PushResourceCoordinator
267 {
268 private final Queue<PushResource> queue = new ConcurrentArrayQueue<>();
269 private final Set<String> resources;
270 private AtomicBoolean active = new AtomicBoolean(false);
271
272 private PushResourceCoordinator(Set<String> resources)
273 {
274 this.resources = resources;
275 }
276
277 private void coordinate()
278 {
279 if (LOG.isDebugEnabled())
280 LOG.debug("Pushing resources: {}", resources);
281
282
283 for (String pushResource : resources)
284 pushResource(pushResource);
285 }
286
287 private void sendNextResourceData()
288 {
289 if (LOG.isDebugEnabled())
290 LOG.debug("{} sendNextResourceData active: {}", hashCode(), active.get());
291 if (active.compareAndSet(false, true))
292 {
293 PushResource resource = queue.poll();
294 if (resource != null)
295 {
296 if (LOG.isDebugEnabled())
297 LOG.debug("Opening new push channel for: {}", resource);
298 HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(resource.getPushStream(), resource.getPushRequestHeaders());
299 pushChannel.requestStart(resource.getPushRequestHeaders(), true);
300 return;
301 }
302
303 if (active.compareAndSet(true, false))
304 {
305 if (queue.peek() != null)
306 sendNextResourceData();
307 }
308 else
309 {
310 throw new IllegalStateException("active must not be false here! Concurrency bug!");
311 }
312 }
313 }
314
315 private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders)
316 {
317 HttpTransport transport = new PushHttpTransportOverSPDY(connector, configuration, endPoint, pushStrategy,
318 pushStream, pushRequestHeaders, this, version);
319 HttpInputOverSPDY input = new HttpInputOverSPDY();
320 return new HttpChannelOverSPDY(connector, configuration, endPoint, transport, input, pushStream);
321 }
322
323 private void pushResource(String pushResource)
324 {
325 Fields.Field scheme = requestHeaders.get(HTTPSPDYHeader.SCHEME.name(version));
326 Fields.Field host = requestHeaders.get(HTTPSPDYHeader.HOST.name(version));
327 Fields.Field uri = requestHeaders.get(HTTPSPDYHeader.URI.name(version));
328 final Fields pushHeaders = createPushHeaders(scheme, host, pushResource);
329 final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource);
330
331 stream.push(new PushInfo(pushHeaders, false), new Promise<Stream>()
332 {
333 @Override
334 public void succeeded(Stream pushStream)
335 {
336 if (LOG.isDebugEnabled())
337 LOG.debug("Headers pushed for {} on {}", pushHeaders.get(HTTPSPDYHeader.URI.name(version)), pushStream);
338 queue.offer(new PushResource(pushStream, pushRequestHeaders));
339 sendNextResourceData();
340 }
341
342 @Override
343 public void failed(Throwable x)
344 {
345 LOG.debug("Creating push stream failed.", x);
346 sendNextResourceData();
347 }
348 });
349 }
350
351 private void complete()
352 {
353 if (!active.compareAndSet(true, false))
354 throw new IllegalStateException();
355 sendNextResourceData();
356 }
357
358 private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath)
359 {
360 final Fields newRequestHeaders = new Fields(requestHeaders, false);
361 newRequestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
362 newRequestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
363 newRequestHeaders.put(scheme);
364 newRequestHeaders.put(host);
365 newRequestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
366 String referrer = scheme.getValue() + "://" + host.getValue() + uri.getValue();
367 newRequestHeaders.put("referer", referrer);
368 newRequestHeaders.put("x-spdy-push", "true");
369 return newRequestHeaders;
370 }
371
372 private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath)
373 {
374 final Fields pushHeaders = new Fields();
375 if (version == SPDY.V2)
376 pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.getValue() + "://" + host.getValue() + pushResourcePath);
377 else
378 {
379 pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
380 pushHeaders.put(scheme);
381 pushHeaders.put(host);
382 }
383 return pushHeaders;
384 }
385 }
386
387 private static class PushResource
388 {
389 private final Stream pushStream;
390 private final Fields pushRequestHeaders;
391
392 public PushResource(Stream pushStream, Fields pushRequestHeaders)
393 {
394 this.pushStream = pushStream;
395 this.pushRequestHeaders = pushRequestHeaders;
396 }
397
398 public Stream getPushStream()
399 {
400 return pushStream;
401 }
402
403 public Fields getPushRequestHeaders()
404 {
405 return pushRequestHeaders;
406 }
407
408 @Override
409 public String toString()
410 {
411 return "PushResource{" +
412 "pushStream=" + pushStream +
413 ", pushRequestHeaders=" + pushRequestHeaders +
414 '}';
415 }
416 }
417
418 @Override
419 public void abort()
420 {
421
422 }
423 }