1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy.server.http;
20
21 import java.nio.ByteBuffer;
22 import java.util.Queue;
23 import java.util.Set;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 import org.eclipse.jetty.http.HttpField;
28 import org.eclipse.jetty.http.HttpFields;
29 import org.eclipse.jetty.http.HttpGenerator;
30 import org.eclipse.jetty.http.HttpHeader;
31 import org.eclipse.jetty.http.HttpMethod;
32 import org.eclipse.jetty.http.HttpStatus;
33 import org.eclipse.jetty.http.HttpVersion;
34 import org.eclipse.jetty.io.EndPoint;
35 import org.eclipse.jetty.io.EofException;
36 import org.eclipse.jetty.server.Connector;
37 import org.eclipse.jetty.server.HttpConfiguration;
38 import org.eclipse.jetty.server.HttpTransport;
39 import org.eclipse.jetty.spdy.StreamException;
40 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
41 import org.eclipse.jetty.spdy.api.HeadersInfo;
42 import org.eclipse.jetty.spdy.api.PushInfo;
43 import org.eclipse.jetty.spdy.api.ReplyInfo;
44 import org.eclipse.jetty.spdy.api.SPDY;
45 import org.eclipse.jetty.spdy.api.Session;
46 import org.eclipse.jetty.spdy.api.Stream;
47 import org.eclipse.jetty.spdy.api.StreamStatus;
48 import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
49 import org.eclipse.jetty.util.BlockingCallback;
50 import org.eclipse.jetty.util.BufferUtil;
51 import org.eclipse.jetty.util.Callback;
52 import org.eclipse.jetty.util.ConcurrentArrayQueue;
53 import org.eclipse.jetty.util.Fields;
54 import org.eclipse.jetty.util.Promise;
55 import org.eclipse.jetty.util.log.Log;
56 import org.eclipse.jetty.util.log.Logger;
57
58 public class HttpTransportOverSPDY implements HttpTransport
59 {
60 private static final Logger LOG = Log.getLogger(HttpTransportOverSPDY.class);
61
62 private final Connector connector;
63 private final HttpConfiguration configuration;
64 private final EndPoint endPoint;
65 private final PushStrategy pushStrategy;
66 private final Stream stream;
67 private final short version;
68 private final Fields requestHeaders;
69 private final BlockingCallback streamBlocker = new BlockingCallback();
70 private final AtomicBoolean committed = new AtomicBoolean();
71
72 public HttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders)
73 {
74 this.connector = connector;
75 this.configuration = configuration;
76 this.endPoint = endPoint;
77 this.pushStrategy = pushStrategy == null ? new PushStrategy.None() : pushStrategy;
78 this.stream = stream;
79 this.requestHeaders = requestHeaders;
80 Session session = stream.getSession();
81 this.version = session.getVersion();
82 }
83
84 protected Stream getStream()
85 {
86 return stream;
87 }
88
89 protected Fields getRequestHeaders()
90 {
91 return requestHeaders;
92 }
93
94
95 @Override
96 public void send(ByteBuffer responseBodyContent, boolean lastContent, Callback callback)
97 {
98
99 send(null, responseBodyContent, lastContent, callback);
100 }
101
102 @Override
103 public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, final Callback callback)
104 {
105 if (LOG.isDebugEnabled())
106 LOG.debug("Sending {} {} {} {} last={}", this, stream, info, BufferUtil.toDetailString(content), lastContent);
107
108 if (stream.isClosed() || stream.isReset())
109 {
110 EofException exception = new EofException("stream closed");
111 callback.failed(exception);
112 return;
113 }
114
115
116
117
118
119
120
121
122
123
124 boolean isHeadRequest = HttpMethod.HEAD.name().equalsIgnoreCase(requestHeaders.get(HTTPSPDYHeader.METHOD.name(version)).getValue());
125 boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
126 boolean close = !hasContent && lastContent;
127
128 if (info != null)
129 {
130 if (!committed.compareAndSet(false, true))
131 {
132 StreamException exception = new StreamException(stream.getId(), StreamStatus.PROTOCOL_ERROR,
133 "Stream already committed!");
134 callback.failed(exception);
135 LOG.debug("Committed response twice.", exception);
136 return;
137 }
138 sendReply(info, !hasContent ? callback : new Callback.Adapter()
139 {
140 @Override
141 public void failed(Throwable x)
142 {
143 callback.failed(x);
144 }
145 }, close);
146 }
147
148
149 if (hasContent)
150 {
151
152 LOG.debug("Send content: {} on stream: {} lastContent={}", BufferUtil.toDetailString(content), stream,
153 lastContent);
154 stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS, content, lastContent
155 ), callback);
156 }
157
158 else if (lastContent && info == null)
159 {
160
161 LOG.debug("No content and lastContent=true. Sending empty ByteBuffer to close stream: {}", stream);
162 stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS,
163 BufferUtil.EMPTY_BUFFER, lastContent), callback);
164 }
165 else if (!lastContent && !hasContent && info == null)
166 throw new IllegalStateException("not lastContent, no content and no responseInfo!");
167
168 }
169
170 private void sendReply(HttpGenerator.ResponseInfo info, Callback callback, boolean close)
171 {
172 Fields headers = new Fields();
173
174 HttpVersion httpVersion = HttpVersion.HTTP_1_1;
175 headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.asString());
176
177 int status = info.getStatus();
178 StringBuilder httpStatus = new StringBuilder().append(status);
179 String reason = info.getReason();
180 if (reason == null)
181 reason = HttpStatus.getMessage(status);
182 if (reason != null)
183 httpStatus.append(" ").append(reason);
184 headers.put(HTTPSPDYHeader.STATUS.name(version), httpStatus.toString());
185 LOG.debug("HTTP < {} {}", httpVersion, httpStatus);
186
187
188 HttpFields fields = info.getHttpFields();
189 if (fields != null)
190 {
191 for (int i = 0; i < fields.size(); ++i)
192 {
193 HttpField field = fields.getField(i);
194 String name = field.getName();
195 String value = field.getValue();
196 headers.add(name, value);
197 LOG.debug("HTTP < {}: {}", name, value);
198 }
199 }
200
201 if (configuration.getSendServerVersion())
202 headers.add(HttpHeader.SERVER.asString(), HttpConfiguration.SERVER_VERSION);
203 if (configuration.getSendXPoweredBy())
204 headers.add(HttpHeader.X_POWERED_BY.asString(), HttpConfiguration.SERVER_VERSION);
205
206 ReplyInfo reply = new ReplyInfo(headers, close);
207 LOG.debug("Sending reply: {} on stream: {}", reply, stream);
208 reply(stream, reply, callback);
209 }
210
211 @Override
212 public void completed()
213 {
214 LOG.debug("Completed {}", this);
215 }
216
217 private void reply(Stream stream, ReplyInfo replyInfo, Callback callback)
218 {
219 if (!stream.isUnidirectional())
220 stream.reply(replyInfo, callback);
221 else
222 stream.headers(new HeadersInfo(replyInfo.getHeaders(), replyInfo.isClose()), callback);
223
224 Fields responseHeaders = replyInfo.getHeaders();
225 if (responseHeaders.get(HTTPSPDYHeader.STATUS.name(version)).getValue().startsWith("200") && !stream.isClosed())
226 {
227 Set<String> pushResources = pushStrategy.apply(stream, requestHeaders, responseHeaders);
228 if (pushResources.size() > 0)
229 {
230 PushResourceCoordinator pushResourceCoordinator = new PushResourceCoordinator(pushResources);
231 pushResourceCoordinator.coordinate();
232 }
233 }
234 }
235
236 private static class PushHttpTransportOverSPDY extends HttpTransportOverSPDY
237 {
238 private final PushResourceCoordinator coordinator;
239 private final short version;
240
241 private PushHttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint,
242 PushStrategy pushStrategy, Stream stream, Fields requestHeaders,
243 PushResourceCoordinator coordinator, short version)
244 {
245 super(connector, configuration, endPoint, pushStrategy, stream, requestHeaders);
246 this.coordinator = coordinator;
247 this.version = version;
248 }
249
250 @Override
251 public void completed()
252 {
253 Stream stream = getStream();
254 LOG.debug("Resource pushed for {} on {}",
255 getRequestHeaders().get(HTTPSPDYHeader.URI.name(version)), stream);
256 coordinator.complete();
257 }
258 }
259
260 private class PushResourceCoordinator
261 {
262 private final Queue<PushResource> queue = new ConcurrentArrayQueue<>();
263 private final Set<String> resources;
264 private AtomicBoolean active = new AtomicBoolean(false);
265
266 private PushResourceCoordinator(Set<String> resources)
267 {
268 this.resources = resources;
269 }
270
271 private void coordinate()
272 {
273 LOG.debug("Pushing resources: {}", resources);
274
275
276 for (String pushResource : resources)
277 pushResource(pushResource);
278 }
279
280 private void sendNextResourceData()
281 {
282 LOG.debug("{} sendNextResourceData active: {}", hashCode(), active.get());
283 if (active.compareAndSet(false, true))
284 {
285 PushResource resource = queue.poll();
286 if (resource != null)
287 {
288 LOG.debug("Opening new push channel for: {}", resource);
289 HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(resource.getPushStream(), resource.getPushRequestHeaders());
290 pushChannel.requestStart(resource.getPushRequestHeaders(), true);
291 return;
292 }
293
294 if (active.compareAndSet(true, false))
295 {
296 if (queue.peek() != null)
297 sendNextResourceData();
298 }
299 else
300 {
301 throw new IllegalStateException("active must not be false here! Concurrency bug!");
302 }
303 }
304 }
305
306 private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders)
307 {
308 HttpTransport transport = new PushHttpTransportOverSPDY(connector, configuration, endPoint, pushStrategy,
309 pushStream, pushRequestHeaders, this, version);
310 HttpInputOverSPDY input = new HttpInputOverSPDY();
311 return new HttpChannelOverSPDY(connector, configuration, endPoint, transport, input, pushStream);
312 }
313
314 private void pushResource(String pushResource)
315 {
316 Fields.Field scheme = requestHeaders.get(HTTPSPDYHeader.SCHEME.name(version));
317 Fields.Field host = requestHeaders.get(HTTPSPDYHeader.HOST.name(version));
318 Fields.Field uri = requestHeaders.get(HTTPSPDYHeader.URI.name(version));
319 final Fields pushHeaders = createPushHeaders(scheme, host, pushResource);
320 final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource);
321
322 stream.push(new PushInfo(pushHeaders, false), new Promise<Stream>()
323 {
324 @Override
325 public void succeeded(Stream pushStream)
326 {
327 LOG.debug("Headers pushed for {} on {}", pushHeaders.get(HTTPSPDYHeader.URI.name(version)), pushStream);
328 queue.offer(new PushResource(pushStream, pushRequestHeaders));
329 sendNextResourceData();
330 }
331
332 @Override
333 public void failed(Throwable x)
334 {
335 LOG.debug("Creating push stream failed.", x);
336 sendNextResourceData();
337 }
338 });
339 }
340
341 private void complete()
342 {
343 if (!active.compareAndSet(true, false))
344 throw new IllegalStateException();
345 sendNextResourceData();
346 }
347
348 private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath)
349 {
350 final Fields newRequestHeaders = new Fields(requestHeaders, false);
351 newRequestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
352 newRequestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
353 newRequestHeaders.put(scheme);
354 newRequestHeaders.put(host);
355 newRequestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
356 String referrer = scheme.getValue() + "://" + host.getValue() + uri.getValue();
357 newRequestHeaders.put("referer", referrer);
358 newRequestHeaders.put("x-spdy-push", "true");
359 return newRequestHeaders;
360 }
361
362 private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath)
363 {
364 final Fields pushHeaders = new Fields();
365 if (version == SPDY.V2)
366 pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.getValue() + "://" + host.getValue() + pushResourcePath);
367 else
368 {
369 pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
370 pushHeaders.put(scheme);
371 pushHeaders.put(host);
372 }
373 return pushHeaders;
374 }
375 }
376
377 private static class PushResource
378 {
379 private final Stream pushStream;
380 private final Fields pushRequestHeaders;
381
382 public PushResource(Stream pushStream, Fields pushRequestHeaders)
383 {
384 this.pushStream = pushStream;
385 this.pushRequestHeaders = pushRequestHeaders;
386 }
387
388 public Stream getPushStream()
389 {
390 return pushStream;
391 }
392
393 public Fields getPushRequestHeaders()
394 {
395 return pushRequestHeaders;
396 }
397
398 @Override
399 public String toString()
400 {
401 return "PushResource{" +
402 "pushStream=" + pushStream +
403 ", pushRequestHeaders=" + pushRequestHeaders +
404 '}';
405 }
406 }
407 }