1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy.server.http;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.Queue;
24 import java.util.Set;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27
28 import org.eclipse.jetty.http.HttpField;
29 import org.eclipse.jetty.http.HttpFields;
30 import org.eclipse.jetty.http.HttpGenerator;
31 import org.eclipse.jetty.http.HttpHeader;
32 import org.eclipse.jetty.http.HttpMethod;
33 import org.eclipse.jetty.http.HttpStatus;
34 import org.eclipse.jetty.http.HttpVersion;
35 import org.eclipse.jetty.io.EndPoint;
36 import org.eclipse.jetty.io.EofException;
37 import org.eclipse.jetty.server.Connector;
38 import org.eclipse.jetty.server.HttpConfiguration;
39 import org.eclipse.jetty.server.HttpTransport;
40 import org.eclipse.jetty.spdy.StreamException;
41 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
42 import org.eclipse.jetty.spdy.api.HeadersInfo;
43 import org.eclipse.jetty.spdy.api.PushInfo;
44 import org.eclipse.jetty.spdy.api.ReplyInfo;
45 import org.eclipse.jetty.spdy.api.SPDY;
46 import org.eclipse.jetty.spdy.api.Session;
47 import org.eclipse.jetty.spdy.api.Stream;
48 import org.eclipse.jetty.spdy.api.StreamStatus;
49 import org.eclipse.jetty.util.BlockingCallback;
50 import org.eclipse.jetty.util.BufferUtil;
51 import org.eclipse.jetty.util.Callback;
52 import org.eclipse.jetty.util.ConcurrentArrayQueue;
53 import org.eclipse.jetty.util.Fields;
54 import org.eclipse.jetty.util.Promise;
55 import org.eclipse.jetty.util.log.Log;
56 import org.eclipse.jetty.util.log.Logger;
57
58 public class HttpTransportOverSPDY implements HttpTransport
59 {
60 private static final Logger LOG = Log.getLogger(HttpTransportOverSPDY.class);
61
62 private final Connector connector;
63 private final HttpConfiguration configuration;
64 private final EndPoint endPoint;
65 private final PushStrategy pushStrategy;
66 private final Stream stream;
67 private final short version;
68 private final Fields requestHeaders;
69 private final BlockingCallback streamBlocker = new BlockingCallback();
70 private final AtomicBoolean committed = new AtomicBoolean();
71
72 public HttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders)
73 {
74 this.connector = connector;
75 this.configuration = configuration;
76 this.endPoint = endPoint;
77 this.pushStrategy = pushStrategy == null ? new PushStrategy.None() : pushStrategy;
78 this.stream = stream;
79 this.requestHeaders = requestHeaders;
80 Session session = stream.getSession();
81 this.version = session.getVersion();
82 }
83
84 protected Stream getStream()
85 {
86 return stream;
87 }
88
89 protected Fields getRequestHeaders()
90 {
91 return requestHeaders;
92 }
93
94
95 @Override
96 public void send(ByteBuffer responseBodyContent, boolean lastContent, Callback callback)
97 {
98
99 send(null, responseBodyContent, lastContent, callback);
100 }
101
102 @Override
103 public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, final Callback callback)
104 {
105 if (LOG.isDebugEnabled())
106 LOG.debug("Sending {} {} {} {} last={}", this, stream, info, BufferUtil.toDetailString(content), lastContent);
107
108 if (stream.isClosed() || stream.isReset())
109 {
110 EofException exception = new EofException("stream closed");
111 callback.failed(exception);
112 return;
113 }
114
115
116
117
118
119
120
121
122
123
124 boolean isHeadRequest = HttpMethod.HEAD.name().equalsIgnoreCase(requestHeaders.get(HTTPSPDYHeader.METHOD.name(version)).value());
125 boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
126 boolean close = !hasContent && lastContent;
127
128 if (info != null)
129 {
130 if (!committed.compareAndSet(false, true))
131 {
132 StreamException exception = new StreamException(stream.getId(), StreamStatus.PROTOCOL_ERROR,
133 "Stream already committed!");
134 callback.failed(exception);
135 LOG.warn("Committed response twice.", exception);
136 return;
137 }
138 sendReply(info, !hasContent ? callback : new Callback.Adapter()
139 {
140 @Override
141 public void failed(Throwable x)
142 {
143 callback.failed(x);
144 }
145 }, close);
146 }
147
148
149 if (hasContent)
150 {
151
152 LOG.debug("Send content: {} on stream: {} lastContent={}", BufferUtil.toDetailString(content), stream,
153 lastContent);
154 stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS, content, lastContent
155 ), callback);
156 }
157
158 else if (lastContent && info == null)
159 {
160
161 LOG.debug("No content and lastContent=true. Sending empty ByteBuffer to close stream: {}", stream);
162 stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS,
163 BufferUtil.EMPTY_BUFFER, lastContent), callback);
164 }
165 else if (!lastContent && !hasContent && info == null)
166 throw new IllegalStateException("not lastContent, no content and no responseInfo!");
167
168 }
169
170 private void sendReply(HttpGenerator.ResponseInfo info, Callback callback, boolean close)
171 {
172 Fields headers = new Fields();
173
174 HttpVersion httpVersion = HttpVersion.HTTP_1_1;
175 headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.asString());
176
177 int status = info.getStatus();
178 StringBuilder httpStatus = new StringBuilder().append(status);
179 String reason = info.getReason();
180 if (reason == null)
181 reason = HttpStatus.getMessage(status);
182 if (reason != null)
183 httpStatus.append(" ").append(reason);
184 headers.put(HTTPSPDYHeader.STATUS.name(version), httpStatus.toString());
185 LOG.debug("HTTP < {} {}", httpVersion, httpStatus);
186
187
188 HttpFields fields = info.getHttpFields();
189 if (fields != null)
190 {
191 for (int i = 0; i < fields.size(); ++i)
192 {
193 HttpField field = fields.getField(i);
194 String name = field.getName();
195 String value = field.getValue();
196 headers.add(name, value);
197 LOG.debug("HTTP < {}: {}", name, value);
198 }
199 }
200
201 if (configuration.getSendServerVersion())
202 headers.add(HttpHeader.SERVER.asString(), HttpConfiguration.SERVER_VERSION);
203 if (configuration.getSendXPoweredBy())
204 headers.add(HttpHeader.X_POWERED_BY.asString(), HttpConfiguration.SERVER_VERSION);
205
206 ReplyInfo reply = new ReplyInfo(headers, close);
207 LOG.debug("Sending reply: {} on stream: {}", reply, stream);
208 reply(stream, reply, callback);
209 }
210
211 @Override
212 public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
213 {
214 send(info, content, lastContent, streamBlocker);
215 try
216 {
217 streamBlocker.block();
218 }
219 catch (Exception e)
220 {
221 LOG.debug(e);
222 }
223 }
224
225 @Override
226 public void completed()
227 {
228 LOG.debug("Completed {}", this);
229 }
230
231 private void reply(Stream stream, ReplyInfo replyInfo, Callback callback)
232 {
233 if (!stream.isUnidirectional())
234 stream.reply(replyInfo, callback);
235 else
236 stream.headers(new HeadersInfo(replyInfo.getHeaders(), replyInfo.isClose()), callback);
237
238 Fields responseHeaders = replyInfo.getHeaders();
239 if (responseHeaders.get(HTTPSPDYHeader.STATUS.name(version)).value().startsWith("200") && !stream.isClosed())
240 {
241 Set<String> pushResources = pushStrategy.apply(stream, requestHeaders, responseHeaders);
242 if (pushResources.size() > 0)
243 {
244 PushResourceCoordinator pushResourceCoordinator = new PushResourceCoordinator(pushResources);
245 pushResourceCoordinator.coordinate();
246 }
247 }
248 }
249
250 private static class PushHttpTransportOverSPDY extends HttpTransportOverSPDY
251 {
252 private final PushResourceCoordinator coordinator;
253 private final short version;
254
255 private PushHttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint,
256 PushStrategy pushStrategy, Stream stream, Fields requestHeaders,
257 PushResourceCoordinator coordinator, short version)
258 {
259 super(connector, configuration, endPoint, pushStrategy, stream, requestHeaders);
260 this.coordinator = coordinator;
261 this.version = version;
262 }
263
264 @Override
265 public void completed()
266 {
267 Stream stream = getStream();
268 LOG.debug("Resource pushed for {} on {}",
269 getRequestHeaders().get(HTTPSPDYHeader.URI.name(version)), stream);
270 coordinator.complete();
271 }
272 }
273
274 private class PushResourceCoordinator
275 {
276 private final Queue<PushResource> queue = new ConcurrentArrayQueue<>();
277 private final Set<String> resources;
278 private AtomicBoolean active = new AtomicBoolean(false);
279
280 private PushResourceCoordinator(Set<String> resources)
281 {
282 this.resources = resources;
283 }
284
285 private void coordinate()
286 {
287 LOG.debug("Pushing resources: {}", resources);
288
289
290 for (String pushResource : resources)
291 pushResource(pushResource);
292 }
293
294 private void sendNextResourceData()
295 {
296 LOG.debug("{} sendNextResourceData active: {}", hashCode(), active.get());
297 if (active.compareAndSet(false, true))
298 {
299 PushResource resource = queue.poll();
300 if (resource != null)
301 {
302 LOG.debug("Opening new push channel for: {}", resource);
303 HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(resource.getPushStream(), resource.getPushRequestHeaders());
304 pushChannel.requestStart(resource.getPushRequestHeaders(), true);
305 return;
306 }
307
308 if (active.compareAndSet(true, false))
309 {
310 if (queue.peek() != null)
311 sendNextResourceData();
312 }
313 else
314 {
315 throw new IllegalStateException("active must not be false here! Concurrency bug!");
316 }
317 }
318 }
319
320 private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders)
321 {
322 HttpTransport transport = new PushHttpTransportOverSPDY(connector, configuration, endPoint, pushStrategy,
323 pushStream, pushRequestHeaders, this, version);
324 HttpInputOverSPDY input = new HttpInputOverSPDY();
325 return new HttpChannelOverSPDY(connector, configuration, endPoint, transport, input, pushStream);
326 }
327
328 private void pushResource(String pushResource)
329 {
330 Fields.Field scheme = requestHeaders.get(HTTPSPDYHeader.SCHEME.name(version));
331 Fields.Field host = requestHeaders.get(HTTPSPDYHeader.HOST.name(version));
332 Fields.Field uri = requestHeaders.get(HTTPSPDYHeader.URI.name(version));
333 final Fields pushHeaders = createPushHeaders(scheme, host, pushResource);
334 final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource);
335
336 stream.push(new PushInfo(pushHeaders, false), new Promise<Stream>()
337 {
338 @Override
339 public void succeeded(Stream pushStream)
340 {
341 LOG.debug("Headers pushed for {} on {}", pushHeaders.get(HTTPSPDYHeader.URI.name(version)), pushStream);
342 queue.offer(new PushResource(pushStream, pushRequestHeaders));
343 sendNextResourceData();
344 }
345
346 @Override
347 public void failed(Throwable x)
348 {
349 LOG.debug("Creating push stream failed.", x);
350 sendNextResourceData();
351 }
352 });
353 }
354
355 private void complete()
356 {
357 if (!active.compareAndSet(true, false))
358 throw new IllegalStateException();
359 sendNextResourceData();
360 }
361
362 private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath)
363 {
364 final Fields newRequestHeaders = new Fields(requestHeaders, false);
365 newRequestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
366 newRequestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
367 newRequestHeaders.put(scheme);
368 newRequestHeaders.put(host);
369 newRequestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
370 String referrer = scheme.value() + "://" + host.value() + uri.value();
371 newRequestHeaders.put("referer", referrer);
372 newRequestHeaders.put("x-spdy-push", "true");
373 return newRequestHeaders;
374 }
375
376 private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath)
377 {
378 final Fields pushHeaders = new Fields();
379 if (version == SPDY.V2)
380 pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath);
381 else
382 {
383 pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
384 pushHeaders.put(scheme);
385 pushHeaders.put(host);
386 }
387 return pushHeaders;
388 }
389 }
390
391 private static class PushResource
392 {
393 private final Stream pushStream;
394 private final Fields pushRequestHeaders;
395
396 public PushResource(Stream pushStream, Fields pushRequestHeaders)
397 {
398 this.pushStream = pushStream;
399 this.pushRequestHeaders = pushRequestHeaders;
400 }
401
402 public Stream getPushStream()
403 {
404 return pushStream;
405 }
406
407 public Fields getPushRequestHeaders()
408 {
409 return pushRequestHeaders;
410 }
411
412 @Override
413 public String toString()
414 {
415 return "PushResource{" +
416 "pushStream=" + pushStream +
417 ", pushRequestHeaders=" + pushRequestHeaders +
418 '}';
419 }
420 }
421 }