1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.eclipse.jetty.spdy.proxy;
16
17 import java.net.InetSocketAddress;
18 import java.util.LinkedList;
19 import java.util.Queue;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.TimeUnit;
23
24 import org.eclipse.jetty.spdy.SPDYClient;
25 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
26 import org.eclipse.jetty.spdy.api.DataInfo;
27 import org.eclipse.jetty.spdy.api.GoAwayInfo;
28 import org.eclipse.jetty.spdy.api.Handler;
29 import org.eclipse.jetty.spdy.api.Headers;
30 import org.eclipse.jetty.spdy.api.HeadersInfo;
31 import org.eclipse.jetty.spdy.api.ReplyInfo;
32 import org.eclipse.jetty.spdy.api.RstInfo;
33 import org.eclipse.jetty.spdy.api.SPDY;
34 import org.eclipse.jetty.spdy.api.Session;
35 import org.eclipse.jetty.spdy.api.SessionFrameListener;
36 import org.eclipse.jetty.spdy.api.Stream;
37 import org.eclipse.jetty.spdy.api.StreamFrameListener;
38 import org.eclipse.jetty.spdy.api.StreamStatus;
39 import org.eclipse.jetty.spdy.api.SynInfo;
40 import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
41
42
43
44
45
46 public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
47 {
48 private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler";
49 private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientStream";
50
51 private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
52 private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
53 private final SPDYClient.Factory factory;
54 private volatile long connectTimeout = 15000;
55 private volatile long timeout = 60000;
56
57 public SPDYProxyEngine(SPDYClient.Factory factory)
58 {
59 this.factory = factory;
60 }
61
62 public long getConnectTimeout()
63 {
64 return connectTimeout;
65 }
66
67 public void setConnectTimeout(long connectTimeout)
68 {
69 this.connectTimeout = connectTimeout;
70 }
71
72 public long getTimeout()
73 {
74 return timeout;
75 }
76
77 public void setTimeout(long timeout)
78 {
79 this.timeout = timeout;
80 }
81
82 public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
83 {
84 Headers headers = new Headers(clientSynInfo.getHeaders(), false);
85
86 short serverVersion = getVersion(proxyServerInfo.getProtocol());
87 InetSocketAddress address = proxyServerInfo.getAddress();
88 Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
89 if (serverSession == null)
90 {
91 rst(clientStream);
92 return null;
93 }
94
95 final Session clientSession = clientStream.getSession();
96
97 addRequestProxyHeaders(clientStream, headers);
98 customizeRequestHeaders(clientStream, headers);
99 convert(clientSession.getVersion(), serverVersion, headers);
100
101 SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
102 StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
103 StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
104 clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
105 serverSession.syn(serverSynInfo, listener, timeout, TimeUnit.MILLISECONDS, handler);
106 return this;
107 }
108
109 private static short getVersion(String protocol)
110 {
111 switch (protocol)
112 {
113 case "spdy/2":
114 return SPDY.V2;
115 case "spdy/3":
116 return SPDY.V3;
117 default:
118 throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
119 }
120 }
121
122 @Override
123 public void onReply(Stream stream, ReplyInfo replyInfo)
124 {
125
126 }
127
128 @Override
129 public void onHeaders(Stream stream, HeadersInfo headersInfo)
130 {
131
132 throw new UnsupportedOperationException("Not Yet Implemented");
133 }
134
135 @Override
136 public void onData(Stream clientStream, final DataInfo clientDataInfo)
137 {
138 logger.debug("C -> P {} on {}", clientDataInfo, clientStream);
139
140 ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
141 {
142 @Override
143 public void consume(int delta)
144 {
145 super.consume(delta);
146 clientDataInfo.consume(delta);
147 }
148 };
149
150 StreamHandler streamHandler = (StreamHandler)clientStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
151 streamHandler.data(serverDataInfo);
152 }
153
154 private Session produceSession(String host, short version, InetSocketAddress address)
155 {
156 try
157 {
158 Session session = serverSessions.get(host);
159 if (session == null)
160 {
161 SPDYClient client = factory.newSPDYClient(version);
162 session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
163 logger.debug("Proxy session connected to {}", address);
164 Session existing = serverSessions.putIfAbsent(host, session);
165 if (existing != null)
166 {
167 session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
168 session = existing;
169 }
170 }
171 return session;
172 }
173 catch (Exception x)
174 {
175 logger.debug(x);
176 return null;
177 }
178 }
179
180 private void convert(short fromVersion, short toVersion, Headers headers)
181 {
182 if (fromVersion != toVersion)
183 {
184 for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
185 {
186 Headers.Header header = headers.remove(httpHeader.name(fromVersion));
187 if (header != null)
188 {
189 String toName = httpHeader.name(toVersion);
190 for (String value : header.values())
191 headers.add(toName, value);
192 }
193 }
194 }
195 }
196
197 private void rst(Stream stream)
198 {
199 RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
200 stream.getSession().rst(rstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
201 }
202
203 private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
204 {
205 private final Stream clientStream;
206 private volatile ReplyInfo replyInfo;
207
208 public ProxyStreamFrameListener(Stream clientStream)
209 {
210 this.clientStream = clientStream;
211 }
212
213 @Override
214 public void onReply(final Stream stream, ReplyInfo replyInfo)
215 {
216 logger.debug("S -> P {} on {}", replyInfo, stream);
217
218 short serverVersion = stream.getSession().getVersion();
219 Headers headers = new Headers(replyInfo.getHeaders(), false);
220
221 addResponseProxyHeaders(stream, headers);
222 customizeResponseHeaders(stream, headers);
223 short clientVersion = this.clientStream.getSession().getVersion();
224 convert(serverVersion, clientVersion, headers);
225
226 this.replyInfo = new ReplyInfo(headers, replyInfo.isClose());
227 if (replyInfo.isClose())
228 reply(stream);
229 }
230
231 @Override
232 public void onHeaders(Stream stream, HeadersInfo headersInfo)
233 {
234
235 throw new UnsupportedOperationException("Not Yet Implemented");
236 }
237
238 @Override
239 public void onData(final Stream stream, final DataInfo dataInfo)
240 {
241 logger.debug("S -> P {} on {}", dataInfo, stream);
242
243 if (replyInfo != null)
244 {
245 if (dataInfo.isClose())
246 replyInfo.getHeaders().put("content-length", String.valueOf(dataInfo.available()));
247 reply(stream);
248 }
249 data(stream, dataInfo);
250 }
251
252 private void reply(final Stream stream)
253 {
254 final ReplyInfo replyInfo = this.replyInfo;
255 this.replyInfo = null;
256 clientStream.reply(replyInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler<Void>()
257 {
258 @Override
259 public void completed(Void context)
260 {
261 logger.debug("P -> C {} from {} to {}", replyInfo, stream, clientStream);
262 }
263
264 @Override
265 public void failed(Void context, Throwable x)
266 {
267 logger.debug(x);
268 rst(clientStream);
269 }
270 });
271 }
272
273 private void data(final Stream stream, final DataInfo dataInfo)
274 {
275 clientStream.data(dataInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler<Void>()
276 {
277 @Override
278 public void completed(Void context)
279 {
280 dataInfo.consume(dataInfo.length());
281 logger.debug("P -> C {} from {} to {}", dataInfo, stream, clientStream);
282 }
283
284 @Override
285 public void failed(Void context, Throwable x)
286 {
287 logger.debug(x);
288 rst(clientStream);
289 }
290 });
291 }
292 }
293
294
295
296
297
298
299
300
301
302 private class StreamHandler implements Handler<Stream>
303 {
304 private final Queue<DataInfoHandler> queue = new LinkedList<>();
305 private final Stream clientStream;
306 private final SynInfo serverSynInfo;
307 private Stream serverStream;
308
309 private StreamHandler(Stream clientStream, SynInfo serverSynInfo)
310 {
311 this.clientStream = clientStream;
312 this.serverSynInfo = serverSynInfo;
313 }
314
315 @Override
316 public void completed(Stream serverStream)
317 {
318 logger.debug("P -> S {} from {} to {}", serverSynInfo, clientStream, serverStream);
319
320 serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
321
322 DataInfoHandler dataInfoHandler;
323 synchronized (queue)
324 {
325 this.serverStream = serverStream;
326 dataInfoHandler = queue.peek();
327 if (dataInfoHandler != null)
328 {
329 if (dataInfoHandler.flushing)
330 {
331 logger.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size());
332 dataInfoHandler = null;
333 }
334 else
335 {
336 dataInfoHandler.flushing = true;
337 logger.debug("SYN completed, queue size {}", queue.size());
338 }
339 }
340 else
341 {
342 logger.debug("SYN completed, queue empty");
343 }
344 }
345 if (dataInfoHandler != null)
346 flush(serverStream, dataInfoHandler);
347 }
348
349 @Override
350 public void failed(Stream serverStream, Throwable x)
351 {
352 logger.debug(x);
353 rst(clientStream);
354 }
355
356 public void data(DataInfo dataInfo)
357 {
358 Stream serverStream;
359 DataInfoHandler dataInfoHandler = null;
360 DataInfoHandler item = new DataInfoHandler(dataInfo);
361 synchronized (queue)
362 {
363 queue.offer(item);
364 serverStream = this.serverStream;
365 if (serverStream != null)
366 {
367 dataInfoHandler = queue.peek();
368 if (dataInfoHandler.flushing)
369 {
370 logger.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size());
371 serverStream = null;
372 }
373 else
374 {
375 dataInfoHandler.flushing = true;
376 logger.debug("Queued {}, queue size {}", dataInfo, queue.size());
377 }
378 }
379 else
380 {
381 logger.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
382 }
383 }
384 if (serverStream != null)
385 flush(serverStream, dataInfoHandler);
386 }
387
388 private void flush(Stream serverStream, DataInfoHandler dataInfoHandler)
389 {
390 logger.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
391 serverStream.data(dataInfoHandler.dataInfo, getTimeout(), TimeUnit.MILLISECONDS, dataInfoHandler);
392 }
393
394 private class DataInfoHandler implements Handler<Void>
395 {
396 private final DataInfo dataInfo;
397 private boolean flushing;
398
399 private DataInfoHandler(DataInfo dataInfo)
400 {
401 this.dataInfo = dataInfo;
402 }
403
404 @Override
405 public void completed(Void context)
406 {
407 Stream serverStream;
408 DataInfoHandler dataInfoHandler;
409 synchronized (queue)
410 {
411 serverStream = StreamHandler.this.serverStream;
412 assert serverStream != null;
413 dataInfoHandler = queue.poll();
414 assert dataInfoHandler == this;
415 dataInfoHandler = queue.peek();
416 if (dataInfoHandler != null)
417 {
418 assert !dataInfoHandler.flushing;
419 dataInfoHandler.flushing = true;
420 logger.debug("Completed {}, queue size {}", dataInfo, queue.size());
421 }
422 else
423 {
424 logger.debug("Completed {}, queue empty", dataInfo);
425 }
426 }
427 if (dataInfoHandler != null)
428 flush(serverStream, dataInfoHandler);
429 }
430
431 @Override
432 public void failed(Void context, Throwable x)
433 {
434 logger.debug(x);
435 rst(clientStream);
436 }
437 }
438 }
439
440 private class ProxySessionFrameListener extends SessionFrameListener.Adapter implements StreamFrameListener
441 {
442 @Override
443 public StreamFrameListener onSyn(Stream serverStream, SynInfo serverSynInfo)
444 {
445 logger.debug("S -> P pushed {} on {}", serverSynInfo, serverStream);
446
447 Headers headers = new Headers(serverSynInfo.getHeaders(), false);
448
449 addResponseProxyHeaders(serverStream, headers);
450 customizeResponseHeaders(serverStream, headers);
451 Stream clientStream = (Stream)serverStream.getAssociatedStream().getAttribute(CLIENT_STREAM_ATTRIBUTE);
452 convert(serverStream.getSession().getVersion(), clientStream.getSession().getVersion(), headers);
453
454 StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
455 serverStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
456 clientStream.syn(new SynInfo(headers, serverSynInfo.isClose()), getTimeout(), TimeUnit.MILLISECONDS, handler);
457
458 return this;
459 }
460
461 @Override
462 public void onRst(Session serverSession, RstInfo serverRstInfo)
463 {
464 Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
465 if (serverStream != null)
466 {
467 Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
468 if (clientStream != null)
469 {
470 Session clientSession = clientStream.getSession();
471 RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
472 clientSession.rst(clientRstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
473 }
474 }
475 }
476
477 @Override
478 public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo)
479 {
480 serverSessions.values().remove(serverSession);
481 }
482
483 @Override
484 public void onReply(Stream stream, ReplyInfo replyInfo)
485 {
486
487 }
488
489 @Override
490 public void onHeaders(Stream stream, HeadersInfo headersInfo)
491 {
492 throw new UnsupportedOperationException();
493 }
494
495 @Override
496 public void onData(Stream serverStream, final DataInfo serverDataInfo)
497 {
498 logger.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
499
500 ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
501 {
502 @Override
503 public void consume(int delta)
504 {
505 super.consume(delta);
506 serverDataInfo.consume(delta);
507 }
508 };
509
510 StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
511 handler.data(clientDataInfo);
512 }
513 }
514 }