1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.spdy.proxy;
21
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.util.Locale;
25 import java.util.concurrent.TimeUnit;
26 import java.util.regex.Matcher;
27 import java.util.regex.Pattern;
28
29 import org.eclipse.jetty.http.HttpFields;
30 import org.eclipse.jetty.http.HttpGenerator;
31 import org.eclipse.jetty.io.AsyncEndPoint;
32 import org.eclipse.jetty.io.Buffer;
33 import org.eclipse.jetty.io.ByteArrayBuffer;
34 import org.eclipse.jetty.io.EndPoint;
35 import org.eclipse.jetty.io.nio.DirectNIOBuffer;
36 import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
37 import org.eclipse.jetty.io.nio.NIOBuffer;
38 import org.eclipse.jetty.server.AsyncHttpConnection;
39 import org.eclipse.jetty.spdy.ISession;
40 import org.eclipse.jetty.spdy.IStream;
41 import org.eclipse.jetty.spdy.SPDYServerConnector;
42 import org.eclipse.jetty.spdy.StandardSession;
43 import org.eclipse.jetty.spdy.StandardStream;
44 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
45 import org.eclipse.jetty.spdy.api.BytesDataInfo;
46 import org.eclipse.jetty.spdy.api.DataInfo;
47 import org.eclipse.jetty.spdy.api.GoAwayInfo;
48 import org.eclipse.jetty.spdy.api.Handler;
49 import org.eclipse.jetty.spdy.api.Headers;
50 import org.eclipse.jetty.spdy.api.HeadersInfo;
51 import org.eclipse.jetty.spdy.api.ReplyInfo;
52 import org.eclipse.jetty.spdy.api.RstInfo;
53 import org.eclipse.jetty.spdy.api.SessionStatus;
54 import org.eclipse.jetty.spdy.api.Stream;
55 import org.eclipse.jetty.spdy.api.StreamFrameListener;
56 import org.eclipse.jetty.spdy.api.SynInfo;
57 import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
58
59 public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
60 {
61 private final Headers headers = new Headers();
62 private final short version;
63 private final ProxyEngineSelector proxyEngineSelector;
64 private final HttpGenerator generator;
65 private final ISession session;
66 private HTTPStream stream;
67 private Buffer content;
68
69 public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endPoint, short version, ProxyEngineSelector proxyEngineSelector)
70 {
71 super(connector, endPoint, connector.getServer());
72 this.version = version;
73 this.proxyEngineSelector = proxyEngineSelector;
74 this.generator = (HttpGenerator)_generator;
75 this.session = new HTTPSession(version, connector);
76 this.session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddr());
77 }
78
79 @Override
80 public AsyncEndPoint getEndPoint()
81 {
82 return (AsyncEndPoint)super.getEndPoint();
83 }
84
85 @Override
86 protected void startRequest(Buffer method, Buffer uri, Buffer httpVersion) throws IOException
87 {
88 SPDYServerConnector connector = (SPDYServerConnector)getConnector();
89 String scheme = connector.getSslContextFactory() != null ? "https" : "http";
90 headers.put(HTTPSPDYHeader.SCHEME.name(version), scheme);
91 headers.put(HTTPSPDYHeader.METHOD.name(version), method.toString("UTF-8"));
92 headers.put(HTTPSPDYHeader.URI.name(version), uri.toString("UTF-8"));
93 headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.toString("UTF-8"));
94 }
95
96 @Override
97 protected void parsedHeader(Buffer name, Buffer value) throws IOException
98 {
99 String headerName = name.toString("UTF-8").toLowerCase(Locale.ENGLISH);
100 String headerValue = value.toString("UTF-8");
101 switch (headerName)
102 {
103 case "host":
104 headers.put(HTTPSPDYHeader.HOST.name(version), headerValue);
105 break;
106 default:
107 headers.put(headerName, headerValue);
108 break;
109 }
110 }
111
112 @Override
113 protected void headerComplete() throws IOException
114 {
115 }
116
117 @Override
118 protected void content(Buffer buffer) throws IOException
119 {
120 if (content == null)
121 {
122 stream = syn(false);
123 content = buffer;
124 }
125 else
126 {
127 stream.getStreamFrameListener().onData(stream, toDataInfo(buffer, false));
128 }
129 }
130
131 @Override
132 public void messageComplete(long contentLength) throws IOException
133 {
134 if (stream == null)
135 {
136 assert content == null;
137 if (headers.isEmpty())
138 proxyEngineSelector.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK));
139 else
140 syn(true);
141 }
142 else
143 {
144 stream.getStreamFrameListener().onData(stream, toDataInfo(content, true));
145 }
146 headers.clear();
147 stream = null;
148 content = null;
149 }
150
151 private HTTPStream syn(boolean close)
152 {
153 HTTPStream stream = new HTTPStream(1, (byte)0, session, null);
154 StreamFrameListener streamFrameListener = proxyEngineSelector.onSyn(stream, new SynInfo(headers, close));
155 stream.setStreamFrameListener(streamFrameListener);
156 return stream;
157 }
158
159 private DataInfo toDataInfo(Buffer buffer, boolean close)
160 {
161 if (buffer instanceof ByteArrayBuffer)
162 return new BytesDataInfo(buffer.array(), buffer.getIndex(), buffer.length(), close);
163
164 if (buffer instanceof NIOBuffer)
165 {
166 ByteBuffer byteBuffer = ((NIOBuffer)buffer).getByteBuffer();
167 byteBuffer.limit(buffer.putIndex());
168 byteBuffer.position(buffer.getIndex());
169 return new ByteBufferDataInfo(byteBuffer, close);
170 }
171
172 return new BytesDataInfo(buffer.asArray(), close);
173 }
174
175 private class HTTPSession extends StandardSession
176 {
177 private HTTPSession(short version, SPDYServerConnector connector)
178 {
179 super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngineSelector, null, null);
180 }
181
182 @Override
183 public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler<Void> handler)
184 {
185
186 goAway(timeout, unit, handler);
187 }
188
189 @Override
190 public void goAway(long timeout, TimeUnit unit, Handler<Void> handler)
191 {
192 try
193 {
194 getEndPoint().close();
195 handler.completed(null);
196 }
197 catch (IOException x)
198 {
199 handler.failed(null, x);
200 }
201 }
202 }
203
204
205
206
207 private class HTTPStream extends StandardStream
208 {
209 private final Pattern statusRegexp = Pattern.compile("(\\d{3})\\s*(.*)");
210
211 private HTTPStream(int id, byte priority, ISession session, IStream associatedStream)
212 {
213 super(id, priority, session, associatedStream);
214 }
215
216 @Override
217 public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler)
218 {
219
220 handler.completed(new HTTPPushStream(2, getPriority(), getSession(), this));
221 }
222
223 @Override
224 public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
225 {
226
227 throw new UnsupportedOperationException("Not Yet Implemented");
228 }
229
230 @Override
231 public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler)
232 {
233 try
234 {
235 Headers headers = new Headers(replyInfo.getHeaders(), false);
236
237 headers.remove(HTTPSPDYHeader.SCHEME.name(version));
238
239 String status = headers.remove(HTTPSPDYHeader.STATUS.name(version)).value();
240 Matcher matcher = statusRegexp.matcher(status);
241 matcher.matches();
242 int code = Integer.parseInt(matcher.group(1));
243 String reason = matcher.group(2);
244 generator.setResponse(code, reason);
245
246 String httpVersion = headers.remove(HTTPSPDYHeader.VERSION.name(version)).value();
247 generator.setVersion(Integer.parseInt(httpVersion.replaceAll("\\D", "")));
248
249 Headers.Header host = headers.remove(HTTPSPDYHeader.HOST.name(version));
250 if (host != null)
251 headers.put("host", host.value());
252
253 HttpFields fields = new HttpFields();
254 for (Headers.Header header : headers)
255 {
256 String name = camelize(header.name());
257 fields.put(name, header.value());
258 }
259 generator.completeHeader(fields, replyInfo.isClose());
260
261 if (replyInfo.isClose())
262 complete();
263
264 handler.completed(null);
265 }
266 catch (IOException x)
267 {
268 handler.failed(null, x);
269 }
270 }
271
272 private String camelize(String name)
273 {
274 char[] chars = name.toCharArray();
275 chars[0] = Character.toUpperCase(chars[0]);
276
277 for (int i = 0; i < chars.length; ++i)
278 {
279 char c = chars[i];
280 int j = i + 1;
281 if (c == '-' && j < chars.length)
282 chars[j] = Character.toUpperCase(chars[j]);
283 }
284 return new String(chars);
285 }
286
287 @Override
288 public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
289 {
290 try
291 {
292
293 ByteBuffer byteBuffer = dataInfo.asByteBuffer(false);
294
295 Buffer buffer = byteBuffer.isDirect() ?
296 new DirectNIOBuffer(byteBuffer, false) :
297 new IndirectNIOBuffer(byteBuffer, false);
298
299 generator.addContent(buffer, dataInfo.isClose());
300 generator.flush(unit.toMillis(timeout));
301
302 if (dataInfo.isClose())
303 complete();
304
305 handler.completed(null);
306 }
307 catch (IOException x)
308 {
309 handler.failed(null, x);
310 }
311 }
312
313 private void complete() throws IOException
314 {
315 generator.complete();
316
317
318 getEndPoint().asyncDispatch();
319 }
320 }
321
322 private class HTTPPushStream extends StandardStream
323 {
324 private HTTPPushStream(int id, byte priority, ISession session, IStream associatedStream)
325 {
326 super(id, priority, session, associatedStream);
327 }
328
329 @Override
330 public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
331 {
332
333 handler.completed(null);
334 }
335
336 @Override
337 public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
338 {
339
340 handler.completed(null);
341 }
342 }
343 }