1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy.server.http;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.Set;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicBoolean;
27
28 import org.eclipse.jetty.http.HttpField;
29 import org.eclipse.jetty.http.HttpFields;
30 import org.eclipse.jetty.http.HttpGenerator;
31 import org.eclipse.jetty.http.HttpStatus;
32 import org.eclipse.jetty.http.HttpVersion;
33 import org.eclipse.jetty.io.EndPoint;
34 import org.eclipse.jetty.io.EofException;
35 import org.eclipse.jetty.server.Connector;
36 import org.eclipse.jetty.server.HttpConfiguration;
37 import org.eclipse.jetty.server.HttpTransport;
38 import org.eclipse.jetty.spdy.StreamException;
39 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
40 import org.eclipse.jetty.spdy.api.PushInfo;
41 import org.eclipse.jetty.spdy.api.ReplyInfo;
42 import org.eclipse.jetty.spdy.api.SPDY;
43 import org.eclipse.jetty.spdy.api.Stream;
44 import org.eclipse.jetty.spdy.api.StreamStatus;
45 import org.eclipse.jetty.util.BlockingCallback;
46 import org.eclipse.jetty.util.BufferUtil;
47 import org.eclipse.jetty.util.Callback;
48 import org.eclipse.jetty.util.Fields;
49 import org.eclipse.jetty.util.Promise;
50 import org.eclipse.jetty.util.log.Log;
51 import org.eclipse.jetty.util.log.Logger;
52
53 public class HttpTransportOverSPDY implements HttpTransport
54 {
55 private static final Logger LOG = Log.getLogger(HttpTransportOverSPDY.class);
56
57 private final Connector connector;
58 private final HttpConfiguration configuration;
59 private final EndPoint endPoint;
60 private final PushStrategy pushStrategy;
61 private final Stream stream;
62 private final Fields requestHeaders;
63 private final BlockingCallback streamBlocker = new BlockingCallback();
64 private final AtomicBoolean committed = new AtomicBoolean();
65
66 public HttpTransportOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders)
67 {
68 this.connector = connector;
69 this.configuration = configuration;
70 this.endPoint = endPoint;
71 this.pushStrategy = pushStrategy == null ? new PushStrategy.None() : pushStrategy;
72 this.stream = stream;
73 this.requestHeaders = requestHeaders;
74 }
75
76 @Override
77 public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
78 {
79 if (LOG.isDebugEnabled())
80 LOG.debug("send {} {} {} {} last={}", this, stream, info, BufferUtil.toDetailString(content), lastContent);
81
82 if (stream.isClosed() || stream.isReset())
83 {
84 EofException exception = new EofException("stream closed");
85 callback.failed(exception);
86 return;
87 }
88
89
90
91
92
93
94
95
96
97
98
99 boolean hasContent = BufferUtil.hasContent(content);
100
101 if (info != null)
102 {
103 if (!committed.compareAndSet(false, true))
104 {
105 StreamException exception = new StreamException(stream.getId(), StreamStatus.PROTOCOL_ERROR,
106 "Stream already committed!");
107 callback.failed(exception);
108 LOG.warn("Committed response twice.", exception);
109 return;
110 }
111 short version = stream.getSession().getVersion();
112 Fields headers = new Fields();
113
114 HttpVersion httpVersion = HttpVersion.HTTP_1_1;
115 headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.asString());
116
117 int status = info.getStatus();
118 StringBuilder httpStatus = new StringBuilder().append(status);
119 String reason = info.getReason();
120 if (reason == null)
121 reason = HttpStatus.getMessage(status);
122 if (reason != null)
123 httpStatus.append(" ").append(reason);
124 headers.put(HTTPSPDYHeader.STATUS.name(version), httpStatus.toString());
125 LOG.debug("HTTP < {} {}", httpVersion, httpStatus);
126
127
128 HttpFields fields = info.getHttpFields();
129 if (fields != null)
130 {
131 for (int i = 0; i < fields.size(); ++i)
132 {
133 HttpField field = fields.getField(i);
134 String name = field.getName();
135 String value = field.getValue();
136 headers.put(name, value);
137 LOG.debug("HTTP < {}: {}", name, value);
138 }
139 }
140
141 boolean close = !hasContent && lastContent;
142 ReplyInfo reply = new ReplyInfo(headers, close);
143 reply(stream, reply);
144 }
145
146
147 if (hasContent)
148 {
149
150 if (stream.isClosed() || stream.isReset())
151
152 callback.failed(new EofException("stream closed"));
153 else
154
155 stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS, content, lastContent
156 ), callback);
157 }
158
159 else if (lastContent)
160 {
161
162 if (stream.isClosed() || stream.isReset())
163
164 callback.succeeded();
165 else
166
167 stream.data(new ByteBufferDataInfo(endPoint.getIdleTimeout(), TimeUnit.MILLISECONDS,
168 BufferUtil.EMPTY_BUFFER, lastContent), callback);
169 }
170 else
171
172 callback.succeeded();
173
174 }
175
176 @Override
177 public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws EofException
178 {
179 send(info, content, lastContent, streamBlocker);
180 try
181 {
182 streamBlocker.block();
183 }
184 catch (InterruptedException | TimeoutException | IOException e)
185 {
186 LOG.debug(e);
187 }
188 }
189
190
191 @Override
192 public void completed()
193 {
194 LOG.debug("completed");
195 }
196
197 private void reply(Stream stream, ReplyInfo replyInfo)
198 {
199 if (!stream.isUnidirectional())
200 stream.reply(replyInfo, new Callback.Adapter());
201
202 Fields responseHeaders = replyInfo.getHeaders();
203 short version = stream.getSession().getVersion();
204 if (responseHeaders.get(HTTPSPDYHeader.STATUS.name(version)).value().startsWith("200") && !stream.isClosed())
205 {
206
207 Fields.Field scheme = requestHeaders.get(HTTPSPDYHeader.SCHEME.name(version));
208 Fields.Field host = requestHeaders.get(HTTPSPDYHeader.HOST.name(version));
209 Fields.Field uri = requestHeaders.get(HTTPSPDYHeader.URI.name(version));
210 Set<String> pushResources = pushStrategy.apply(stream, requestHeaders, responseHeaders);
211
212 for (String pushResource : pushResources)
213 {
214 Fields pushHeaders = createPushHeaders(scheme, host, pushResource);
215 final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource);
216
217
218 stream.push(new PushInfo(0, TimeUnit.MILLISECONDS, pushHeaders, false), new Promise.Adapter<Stream>()
219 {
220 @Override
221 public void succeeded(Stream pushStream)
222 {
223 HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(pushStream, pushRequestHeaders);
224 pushChannel.requestStart(pushRequestHeaders, true);
225 }
226 });
227 }
228 }
229 }
230
231 private Fields createRequestHeaders(Fields.Field scheme, Fields.Field host, Fields.Field uri, String pushResourcePath)
232 {
233 final Fields requestHeaders = new Fields();
234 short version = stream.getSession().getVersion();
235 requestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
236 requestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
237 requestHeaders.put(scheme);
238 requestHeaders.put(host);
239 requestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
240 String referrer = scheme.value() + "://" + host.value() + uri.value();
241 requestHeaders.put("referer", referrer);
242
243 requestHeaders.put(requestHeaders.get("accept-encoding"));
244 requestHeaders.put("x-spdy-push", "true");
245 return requestHeaders;
246 }
247
248 private Fields createPushHeaders(Fields.Field scheme, Fields.Field host, String pushResourcePath)
249 {
250 final Fields pushHeaders = new Fields();
251 short version = stream.getSession().getVersion();
252 if (version == SPDY.V2)
253 pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath);
254 else
255 {
256 pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
257 pushHeaders.put(scheme);
258 pushHeaders.put(host);
259 }
260 pushHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200");
261 pushHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
262 return pushHeaders;
263 }
264
265 private HttpChannelOverSPDY newHttpChannelOverSPDY(Stream pushStream, Fields pushRequestHeaders)
266 {
267 HttpTransport transport = new HttpTransportOverSPDY(connector, configuration, endPoint, pushStrategy, pushStream, pushRequestHeaders);
268 HttpInputOverSPDY input = new HttpInputOverSPDY();
269 return new HttpChannelOverSPDY(connector, configuration, endPoint, transport, input, pushStream);
270 }
271 }