1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.spdy.server.proxy;
21
22 import java.net.InetSocketAddress;
23 import java.util.LinkedList;
24 import java.util.Queue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.TimeUnit;
28
29 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
30 import org.eclipse.jetty.spdy.api.DataInfo;
31 import org.eclipse.jetty.spdy.api.GoAwayInfo;
32 import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
33 import org.eclipse.jetty.spdy.api.HeadersInfo;
34 import org.eclipse.jetty.spdy.api.Info;
35 import org.eclipse.jetty.spdy.api.PushInfo;
36 import org.eclipse.jetty.spdy.api.ReplyInfo;
37 import org.eclipse.jetty.spdy.api.RstInfo;
38 import org.eclipse.jetty.spdy.api.SPDY;
39 import org.eclipse.jetty.spdy.api.Session;
40 import org.eclipse.jetty.spdy.api.SessionFrameListener;
41 import org.eclipse.jetty.spdy.api.Stream;
42 import org.eclipse.jetty.spdy.api.StreamFrameListener;
43 import org.eclipse.jetty.spdy.api.StreamStatus;
44 import org.eclipse.jetty.spdy.api.SynInfo;
45 import org.eclipse.jetty.spdy.client.SPDYClient;
46 import org.eclipse.jetty.spdy.server.http.HTTPSPDYHeader;
47 import org.eclipse.jetty.util.Callback;
48 import org.eclipse.jetty.util.Fields;
49 import org.eclipse.jetty.util.Promise;
50 import org.eclipse.jetty.util.log.Log;
51 import org.eclipse.jetty.util.log.Logger;
52
53
54
55
56
57 public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
58 {
59 private static final Logger LOG = Log.getLogger(SPDYProxyEngine.class);
60
61 private static final String STREAM_PROMISE_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamPromise";
62 private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.clientStream";
63
64 private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
65 private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
66 private final SPDYClient.Factory factory;
67 private volatile long connectTimeout = 15000;
68 private volatile long timeout = 60000;
69
70 public SPDYProxyEngine(SPDYClient.Factory factory)
71 {
72 this.factory = factory;
73 }
74
75 public long getConnectTimeout()
76 {
77 return connectTimeout;
78 }
79
80 public void setConnectTimeout(long connectTimeout)
81 {
82 this.connectTimeout = connectTimeout;
83 }
84
85 public long getTimeout()
86 {
87 return timeout;
88 }
89
90 public void setTimeout(long timeout)
91 {
92 this.timeout = timeout;
93 }
94
95 public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
96 {
97 Fields headers = new Fields(clientSynInfo.getHeaders(), false);
98
99 short serverVersion = getVersion(proxyServerInfo.getProtocol());
100 InetSocketAddress address = proxyServerInfo.getAddress();
101 Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
102 if (serverSession == null)
103 {
104 rst(clientStream);
105 return null;
106 }
107
108 final Session clientSession = clientStream.getSession();
109
110 addRequestProxyHeaders(clientStream, headers);
111 customizeRequestHeaders(clientStream, headers);
112 convert(clientSession.getVersion(), serverVersion, headers);
113
114 SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
115 StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
116 StreamPromise promise = new StreamPromise(clientStream, serverSynInfo);
117 clientStream.setAttribute(STREAM_PROMISE_ATTRIBUTE, promise);
118 serverSession.syn(serverSynInfo, listener, promise);
119 return this;
120 }
121
122 private static short getVersion(String protocol)
123 {
124 switch (protocol)
125 {
126 case "spdy/2":
127 return SPDY.V2;
128 case "spdy/3":
129 return SPDY.V3;
130 default:
131 throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
132 }
133 }
134
135 @Override
136 public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
137 {
138 throw new IllegalStateException("We shouldn't receive pushes from clients");
139 }
140
141 @Override
142 public void onReply(Stream stream, ReplyInfo replyInfo)
143 {
144 throw new IllegalStateException("Servers do not receive replies");
145 }
146
147 @Override
148 public void onHeaders(Stream stream, HeadersInfo headersInfo)
149 {
150
151 throw new UnsupportedOperationException("Not Yet Implemented");
152 }
153
154 @Override
155 public void onData(Stream clientStream, final DataInfo clientDataInfo)
156 {
157 LOG.debug("C -> P {} on {}", clientDataInfo, clientStream);
158
159 ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
160 {
161 @Override
162 public void consume(int delta)
163 {
164 super.consume(delta);
165 clientDataInfo.consume(delta);
166 }
167 };
168
169 StreamPromise streamPromise = (StreamPromise)clientStream.getAttribute(STREAM_PROMISE_ATTRIBUTE);
170 streamPromise.data(serverDataInfo);
171 }
172
173 private Session produceSession(String host, short version, InetSocketAddress address)
174 {
175 try
176 {
177 Session session = serverSessions.get(host);
178 if (session == null)
179 {
180 SPDYClient client = factory.newSPDYClient(version);
181 session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
182 LOG.debug("Proxy session connected to {}", address);
183 Session existing = serverSessions.putIfAbsent(host, session);
184 if (existing != null)
185 {
186 session.goAway(new GoAwayInfo(), new Callback.Adapter());
187 session = existing;
188 }
189 }
190 return session;
191 }
192 catch (Exception x)
193 {
194 LOG.debug(x);
195 return null;
196 }
197 }
198
199 private void convert(short fromVersion, short toVersion, Fields headers)
200 {
201 if (fromVersion != toVersion)
202 {
203 for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
204 {
205 Fields.Field header = headers.remove(httpHeader.name(fromVersion));
206 if (header != null)
207 {
208 String toName = httpHeader.name(toVersion);
209 for (String value : header.values())
210 headers.add(toName, value);
211 }
212 }
213 }
214 }
215
216 private void rst(Stream stream)
217 {
218 RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
219 stream.getSession().rst(rstInfo, new Callback.Adapter());
220 }
221
222 private class ProxyPushStreamFrameListener implements StreamFrameListener
223 {
224 private PushStreamPromise pushStreamPromise;
225
226 private ProxyPushStreamFrameListener(PushStreamPromise pushStreamPromise)
227 {
228 this.pushStreamPromise = pushStreamPromise;
229 }
230
231 @Override
232 public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
233 {
234 LOG.debug("S -> P pushed {} on {}. Opening new PushStream P -> C now.", pushInfo, stream);
235 PushStreamPromise newPushStreamPromise = new PushStreamPromise(stream, pushInfo);
236 this.pushStreamPromise.push(newPushStreamPromise);
237 return new ProxyPushStreamFrameListener(newPushStreamPromise);
238 }
239
240 @Override
241 public void onReply(Stream stream, ReplyInfo replyInfo)
242 {
243
244 throw new UnsupportedOperationException();
245 }
246
247 @Override
248 public void onHeaders(Stream stream, HeadersInfo headersInfo)
249 {
250 throw new UnsupportedOperationException();
251 }
252
253 @Override
254 public void onData(Stream serverStream, final DataInfo serverDataInfo)
255 {
256 LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
257
258 ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
259 {
260 @Override
261 public void consume(int delta)
262 {
263 super.consume(delta);
264 serverDataInfo.consume(delta);
265 }
266 };
267
268 pushStreamPromise.data(clientDataInfo);
269 }
270 }
271
272 private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
273 {
274 private final Stream receiverStream;
275
276 public ProxyStreamFrameListener(Stream receiverStream)
277 {
278 this.receiverStream = receiverStream;
279 }
280
281 @Override
282 public StreamFrameListener onPush(Stream senderStream, PushInfo pushInfo)
283 {
284 LOG.debug("S -> P {} on {}");
285 PushInfo newPushInfo = convertPushInfo(pushInfo, senderStream, receiverStream);
286 PushStreamPromise pushStreamPromise = new PushStreamPromise(senderStream, newPushInfo);
287 receiverStream.push(newPushInfo, pushStreamPromise);
288 return new ProxyPushStreamFrameListener(pushStreamPromise);
289 }
290
291 @Override
292 public void onReply(final Stream stream, ReplyInfo replyInfo)
293 {
294 LOG.debug("S -> P {} on {}", replyInfo, stream);
295 final ReplyInfo clientReplyInfo = new ReplyInfo(convertHeaders(stream, receiverStream, replyInfo.getHeaders()),
296 replyInfo.isClose());
297 reply(stream, clientReplyInfo);
298 }
299
300 private void reply(final Stream stream, final ReplyInfo clientReplyInfo)
301 {
302 receiverStream.reply(clientReplyInfo, new Callback()
303 {
304 @Override
305 public void succeeded()
306 {
307 LOG.debug("P -> C {} from {} to {}", clientReplyInfo, stream, receiverStream);
308 }
309
310 @Override
311 public void failed(Throwable x)
312 {
313 LOG.debug(x);
314 rst(receiverStream);
315 }
316 });
317 }
318
319 @Override
320 public void onHeaders(Stream stream, HeadersInfo headersInfo)
321 {
322
323 throw new UnsupportedOperationException("Not Yet Implemented");
324 }
325
326 @Override
327 public void onData(final Stream stream, final DataInfo dataInfo)
328 {
329 LOG.debug("S -> P {} on {}", dataInfo, stream);
330 data(stream, dataInfo);
331 }
332
333 private void data(final Stream stream, final DataInfo serverDataInfo)
334 {
335 final ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
336 {
337 @Override
338 public void consume(int delta)
339 {
340 super.consume(delta);
341 serverDataInfo.consume(delta);
342 }
343 };
344
345 receiverStream.data(clientDataInfo, new Callback()
346 {
347 @Override
348 public void succeeded()
349 {
350 LOG.debug("P -> C {} from {} to {}", clientDataInfo, stream, receiverStream);
351 }
352
353 @Override
354 public void failed(Throwable x)
355 {
356 LOG.debug(x);
357 rst(receiverStream);
358 }
359 });
360 }
361 }
362
363
364
365
366
367
368
369
370
371 private class StreamPromise implements Promise<Stream>
372 {
373 private final Queue<DataInfoCallback> queue = new LinkedList<>();
374 private final Stream senderStream;
375 private final Info info;
376 private Stream receiverStream;
377
378 private StreamPromise(Stream senderStream, Info info)
379 {
380 this.senderStream = senderStream;
381 this.info = info;
382 }
383
384 @Override
385 public void succeeded(Stream stream)
386 {
387 LOG.debug("P -> S {} from {} to {}", info, senderStream, stream);
388
389 stream.setAttribute(CLIENT_STREAM_ATTRIBUTE, senderStream);
390
391 DataInfoCallback dataInfoCallback;
392 synchronized (queue)
393 {
394 this.receiverStream = stream;
395 dataInfoCallback = queue.peek();
396 if (dataInfoCallback != null)
397 {
398 if (dataInfoCallback.flushing)
399 {
400 LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoCallback.dataInfo, queue.size());
401 dataInfoCallback = null;
402 }
403 else
404 {
405 dataInfoCallback.flushing = true;
406 LOG.debug("SYN completed, queue size {}", queue.size());
407 }
408 }
409 else
410 {
411 LOG.debug("SYN completed, queue empty");
412 }
413 }
414 if (dataInfoCallback != null)
415 flush(stream, dataInfoCallback);
416 }
417
418 @Override
419 public void failed(Throwable x)
420 {
421 LOG.debug(x);
422 rst(senderStream);
423 }
424
425 public void data(DataInfo dataInfo)
426 {
427 Stream receiverStream;
428 DataInfoCallback dataInfoCallbackToFlush = null;
429 DataInfoCallback dataInfoCallBackToQueue = new DataInfoCallback(dataInfo);
430 synchronized (queue)
431 {
432 queue.offer(dataInfoCallBackToQueue);
433 receiverStream = this.receiverStream;
434 if (receiverStream != null)
435 {
436 dataInfoCallbackToFlush = queue.peek();
437 if (dataInfoCallbackToFlush.flushing)
438 {
439 LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoCallbackToFlush.dataInfo, queue.size());
440 receiverStream = null;
441 }
442 else
443 {
444 dataInfoCallbackToFlush.flushing = true;
445 LOG.debug("Queued {}, queue size {}", dataInfo, queue.size());
446 }
447 }
448 else
449 {
450 LOG.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
451 }
452 }
453 if (receiverStream != null)
454 flush(receiverStream, dataInfoCallbackToFlush);
455 }
456
457 private void flush(Stream receiverStream, DataInfoCallback dataInfoCallback)
458 {
459 LOG.debug("P -> S {} on {}", dataInfoCallback.dataInfo, receiverStream);
460 receiverStream.data(dataInfoCallback.dataInfo, dataInfoCallback);
461 }
462
463 private class DataInfoCallback implements Callback
464 {
465 private final DataInfo dataInfo;
466 private boolean flushing;
467
468 private DataInfoCallback(DataInfo dataInfo)
469 {
470 this.dataInfo = dataInfo;
471 }
472
473 @Override
474 public void succeeded()
475 {
476 Stream serverStream;
477 DataInfoCallback dataInfoCallback;
478 synchronized (queue)
479 {
480 serverStream = StreamPromise.this.receiverStream;
481 assert serverStream != null;
482 dataInfoCallback = queue.poll();
483 assert dataInfoCallback == this;
484 dataInfoCallback = queue.peek();
485 if (dataInfoCallback != null)
486 {
487 assert !dataInfoCallback.flushing;
488 dataInfoCallback.flushing = true;
489 LOG.debug("Completed {}, queue size {}", dataInfo, queue.size());
490 }
491 else
492 {
493 LOG.debug("Completed {}, queue empty", dataInfo);
494 }
495 }
496 if (dataInfoCallback != null)
497 flush(serverStream, dataInfoCallback);
498 }
499
500 @Override
501 public void failed(Throwable x)
502 {
503 LOG.debug(x);
504 rst(senderStream);
505 }
506 }
507
508 public Stream getSenderStream()
509 {
510 return senderStream;
511 }
512
513 public Info getInfo()
514 {
515 return info;
516 }
517
518 public Stream getReceiverStream()
519 {
520 synchronized (queue)
521 {
522 return receiverStream;
523 }
524 }
525 }
526
527 private class PushStreamPromise extends StreamPromise
528 {
529 private volatile PushStreamPromise pushStreamPromise;
530
531 private PushStreamPromise(Stream senderStream, PushInfo pushInfo)
532 {
533 super(senderStream, pushInfo);
534 }
535
536 @Override
537 public void succeeded(Stream receiverStream)
538 {
539 super.succeeded(receiverStream);
540
541 LOG.debug("P -> C PushStreamPromise.succeeded() called with pushStreamPromise: {}", pushStreamPromise);
542
543 PushStreamPromise promise = pushStreamPromise;
544 if (promise != null)
545 receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
546 }
547
548 public void push(PushStreamPromise pushStreamPromise)
549 {
550 Stream receiverStream = getReceiverStream();
551
552 if (receiverStream != null)
553 receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
554 else
555 this.pushStreamPromise = pushStreamPromise;
556 }
557 }
558
559 private class ProxySessionFrameListener extends SessionFrameListener.Adapter
560 {
561 @Override
562 public void onRst(Session serverSession, RstInfo serverRstInfo)
563 {
564 Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
565 if (serverStream != null)
566 {
567 Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
568 if (clientStream != null)
569 {
570 Session clientSession = clientStream.getSession();
571 RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
572 clientSession.rst(clientRstInfo, new Callback.Adapter());
573 }
574 }
575 }
576
577 @Override
578 public void onGoAway(Session serverSession, GoAwayResultInfo goAwayResultInfo)
579 {
580 serverSessions.values().remove(serverSession);
581 }
582 }
583
584 private PushInfo convertPushInfo(PushInfo pushInfo, Stream from, Stream to)
585 {
586 Fields headersToConvert = pushInfo.getHeaders();
587 Fields headers = convertHeaders(from, to, headersToConvert);
588 return new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, pushInfo.isClose());
589 }
590
591 private Fields convertHeaders(Stream from, Stream to, Fields headersToConvert)
592 {
593 Fields headers = new Fields(headersToConvert, false);
594 addResponseProxyHeaders(from, headers);
595 customizeResponseHeaders(from, headers);
596 convert(from.getSession().getVersion(), to.getSession().getVersion(), headers);
597 return headers;
598 }
599 }