1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.spdy.server.proxy;
21
22 import java.net.InetSocketAddress;
23 import java.util.LinkedList;
24 import java.util.Queue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.TimeUnit;
28
29 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
30 import org.eclipse.jetty.spdy.api.DataInfo;
31 import org.eclipse.jetty.spdy.api.GoAwayInfo;
32 import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
33 import org.eclipse.jetty.spdy.api.HeadersInfo;
34 import org.eclipse.jetty.spdy.api.Info;
35 import org.eclipse.jetty.spdy.api.PushInfo;
36 import org.eclipse.jetty.spdy.api.ReplyInfo;
37 import org.eclipse.jetty.spdy.api.RstInfo;
38 import org.eclipse.jetty.spdy.api.SPDY;
39 import org.eclipse.jetty.spdy.api.Session;
40 import org.eclipse.jetty.spdy.api.SessionFrameListener;
41 import org.eclipse.jetty.spdy.api.Stream;
42 import org.eclipse.jetty.spdy.api.StreamFrameListener;
43 import org.eclipse.jetty.spdy.api.StreamStatus;
44 import org.eclipse.jetty.spdy.api.SynInfo;
45 import org.eclipse.jetty.spdy.client.SPDYClient;
46 import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
47 import org.eclipse.jetty.util.Callback;
48 import org.eclipse.jetty.util.Fields;
49 import org.eclipse.jetty.util.Promise;
50 import org.eclipse.jetty.util.log.Log;
51 import org.eclipse.jetty.util.log.Logger;
52
53
54
55
56
57 public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
58 {
59 private static final Logger LOG = Log.getLogger(SPDYProxyEngine.class);
60
61 private static final String STREAM_PROMISE_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamPromise";
62 private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.clientStream";
63
64 private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
65 private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
66 private final SPDYClient.Factory factory;
67 private volatile long connectTimeout = 15000;
68 private volatile long timeout = 60000;
69
70 public SPDYProxyEngine(SPDYClient.Factory factory)
71 {
72 this.factory = factory;
73 }
74
75 public long getConnectTimeout()
76 {
77 return connectTimeout;
78 }
79
80 public void setConnectTimeout(long connectTimeout)
81 {
82 this.connectTimeout = connectTimeout;
83 }
84
85 public long getTimeout()
86 {
87 return timeout;
88 }
89
90 public void setTimeout(long timeout)
91 {
92 this.timeout = timeout;
93 }
94
95 public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
96 {
97 Fields headers = new Fields(clientSynInfo.getHeaders(), false);
98
99 short serverVersion = getVersion(proxyServerInfo.getProtocol());
100 InetSocketAddress address = proxyServerInfo.getAddress();
101 Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
102 if (serverSession == null)
103 {
104 rst(clientStream);
105 return null;
106 }
107
108 final Session clientSession = clientStream.getSession();
109
110 addRequestProxyHeaders(clientStream, headers);
111 customizeRequestHeaders(clientStream, headers);
112 convert(clientSession.getVersion(), serverVersion, headers);
113
114 SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
115 StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
116 StreamPromise promise = new StreamPromise(clientStream, serverSynInfo);
117 clientStream.setAttribute(STREAM_PROMISE_ATTRIBUTE, promise);
118 serverSession.syn(serverSynInfo, listener, promise);
119 return this;
120 }
121
122 private static short getVersion(String protocol)
123 {
124 switch (protocol)
125 {
126 case "spdy/2":
127 return SPDY.V2;
128 case "spdy/3":
129 return SPDY.V3;
130 default:
131 throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
132 }
133 }
134
135 @Override
136 public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
137 {
138 throw new IllegalStateException("We shouldn't receive pushes from clients");
139 }
140
141 @Override
142 public void onReply(Stream stream, ReplyInfo replyInfo)
143 {
144 throw new IllegalStateException("Servers do not receive replies");
145 }
146
147 @Override
148 public void onHeaders(Stream stream, HeadersInfo headersInfo)
149 {
150
151 throw new UnsupportedOperationException("Not Yet Implemented");
152 }
153
154 @Override
155 public void onData(Stream clientStream, final DataInfo clientDataInfo)
156 {
157 if (LOG.isDebugEnabled())
158 LOG.debug("C -> P {} on {}", clientDataInfo, clientStream);
159
160 ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
161 {
162 @Override
163 public void consume(int delta)
164 {
165 super.consume(delta);
166 clientDataInfo.consume(delta);
167 }
168 };
169
170 StreamPromise streamPromise = (StreamPromise)clientStream.getAttribute(STREAM_PROMISE_ATTRIBUTE);
171 streamPromise.data(serverDataInfo);
172 }
173
174 @Override
175 public void onFailure(Stream stream, Throwable x)
176 {
177 LOG.debug(x);
178 }
179
180 private Session produceSession(String host, short version, InetSocketAddress address)
181 {
182 try
183 {
184 Session session = serverSessions.get(host);
185 if (session == null)
186 {
187 SPDYClient client = factory.newSPDYClient(version);
188 session = client.connect(address, sessionListener);
189 if (LOG.isDebugEnabled())
190 LOG.debug("Proxy session connected to {}", address);
191 Session existing = serverSessions.putIfAbsent(host, session);
192 if (existing != null)
193 {
194 session.goAway(new GoAwayInfo(), Callback.Adapter.INSTANCE);
195 session = existing;
196 }
197 }
198 return session;
199 }
200 catch (Exception x)
201 {
202 LOG.debug(x);
203 return null;
204 }
205 }
206
207 private void convert(short fromVersion, short toVersion, Fields headers)
208 {
209 if (fromVersion != toVersion)
210 {
211 for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
212 {
213 Fields.Field header = headers.remove(httpHeader.name(fromVersion));
214 if (header != null)
215 {
216 String toName = httpHeader.name(toVersion);
217 for (String value : header.getValues())
218 headers.add(toName, value);
219 }
220 }
221 }
222 }
223
224 private void rst(Stream stream)
225 {
226 RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
227 stream.getSession().rst(rstInfo, Callback.Adapter.INSTANCE);
228 }
229
230 private class ProxyPushStreamFrameListener implements StreamFrameListener
231 {
232 private PushStreamPromise pushStreamPromise;
233
234 private ProxyPushStreamFrameListener(PushStreamPromise pushStreamPromise)
235 {
236 this.pushStreamPromise = pushStreamPromise;
237 }
238
239 @Override
240 public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
241 {
242 if (LOG.isDebugEnabled())
243 LOG.debug("S -> P pushed {} on {}. Opening new PushStream P -> C now.", pushInfo, stream);
244 PushStreamPromise newPushStreamPromise = new PushStreamPromise(stream, pushInfo);
245 this.pushStreamPromise.push(newPushStreamPromise);
246 return new ProxyPushStreamFrameListener(newPushStreamPromise);
247 }
248
249 @Override
250 public void onReply(Stream stream, ReplyInfo replyInfo)
251 {
252
253 throw new UnsupportedOperationException();
254 }
255
256 @Override
257 public void onHeaders(Stream stream, HeadersInfo headersInfo)
258 {
259 throw new UnsupportedOperationException();
260 }
261
262 @Override
263 public void onData(Stream serverStream, final DataInfo serverDataInfo)
264 {
265 if (LOG.isDebugEnabled())
266 LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
267
268 ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
269 {
270 @Override
271 public void consume(int delta)
272 {
273 super.consume(delta);
274 serverDataInfo.consume(delta);
275 }
276 };
277
278 pushStreamPromise.data(clientDataInfo);
279 }
280
281 @Override
282 public void onFailure(Stream stream, Throwable x)
283 {
284 LOG.debug(x);
285 }
286 }
287
288 private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
289 {
290 private final Stream receiverStream;
291
292 public ProxyStreamFrameListener(Stream receiverStream)
293 {
294 this.receiverStream = receiverStream;
295 }
296
297 @Override
298 public StreamFrameListener onPush(Stream senderStream, PushInfo pushInfo)
299 {
300 if (LOG.isDebugEnabled())
301 LOG.debug("S -> P {} on {}");
302 PushInfo newPushInfo = convertPushInfo(pushInfo, senderStream, receiverStream);
303 PushStreamPromise pushStreamPromise = new PushStreamPromise(senderStream, newPushInfo);
304 receiverStream.push(newPushInfo, pushStreamPromise);
305 return new ProxyPushStreamFrameListener(pushStreamPromise);
306 }
307
308 @Override
309 public void onReply(final Stream stream, ReplyInfo replyInfo)
310 {
311 if (LOG.isDebugEnabled())
312 LOG.debug("S -> P {} on {}", replyInfo, stream);
313 final ReplyInfo clientReplyInfo = new ReplyInfo(convertHeaders(stream, receiverStream, replyInfo.getHeaders()),
314 replyInfo.isClose());
315 reply(stream, clientReplyInfo);
316 }
317
318 private void reply(final Stream stream, final ReplyInfo clientReplyInfo)
319 {
320 receiverStream.reply(clientReplyInfo, new Callback()
321 {
322 @Override
323 public void succeeded()
324 {
325 if (LOG.isDebugEnabled())
326 LOG.debug("P -> C {} from {} to {}", clientReplyInfo, stream, receiverStream);
327 }
328
329 @Override
330 public void failed(Throwable x)
331 {
332 LOG.debug(x);
333 rst(receiverStream);
334 }
335 });
336 }
337
338 @Override
339 public void onHeaders(Stream stream, HeadersInfo headersInfo)
340 {
341
342 throw new UnsupportedOperationException("Not Yet Implemented");
343 }
344
345 @Override
346 public void onData(final Stream stream, final DataInfo dataInfo)
347 {
348 if (LOG.isDebugEnabled())
349 LOG.debug("S -> P {} on {}", dataInfo, stream);
350 data(stream, dataInfo);
351 }
352
353 private void data(final Stream stream, final DataInfo serverDataInfo)
354 {
355 final ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
356 {
357 @Override
358 public void consume(int delta)
359 {
360 super.consume(delta);
361 serverDataInfo.consume(delta);
362 }
363 };
364
365 receiverStream.data(clientDataInfo, new Callback()
366 {
367 @Override
368 public void succeeded()
369 {
370 if (LOG.isDebugEnabled())
371 LOG.debug("P -> C {} from {} to {}", clientDataInfo, stream, receiverStream);
372 }
373
374 @Override
375 public void failed(Throwable x)
376 {
377 LOG.debug(x);
378 rst(receiverStream);
379 }
380 });
381 }
382 }
383
384
385
386
387
388
389
390
391
392 private class StreamPromise implements Promise<Stream>
393 {
394 private final Queue<DataInfoCallback> queue = new LinkedList<>();
395 private final Stream senderStream;
396 private final Info info;
397 private Stream receiverStream;
398
399 private StreamPromise(Stream senderStream, Info info)
400 {
401 this.senderStream = senderStream;
402 this.info = info;
403 }
404
405 @Override
406 public void succeeded(Stream stream)
407 {
408 if (LOG.isDebugEnabled())
409 LOG.debug("P -> S {} from {} to {}", info, senderStream, stream);
410
411 stream.setAttribute(CLIENT_STREAM_ATTRIBUTE, senderStream);
412
413 DataInfoCallback dataInfoCallback;
414 synchronized (queue)
415 {
416 this.receiverStream = stream;
417 dataInfoCallback = queue.peek();
418 if (dataInfoCallback != null)
419 {
420 if (dataInfoCallback.flushing)
421 {
422 if (LOG.isDebugEnabled())
423 LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoCallback.dataInfo, queue.size());
424 dataInfoCallback = null;
425 }
426 else
427 {
428 dataInfoCallback.flushing = true;
429 if (LOG.isDebugEnabled())
430 LOG.debug("SYN completed, queue size {}", queue.size());
431 }
432 }
433 else
434 {
435 if (LOG.isDebugEnabled())
436 LOG.debug("SYN completed, queue empty");
437 }
438 }
439 if (dataInfoCallback != null)
440 flush(stream, dataInfoCallback);
441 }
442
443 @Override
444 public void failed(Throwable x)
445 {
446 LOG.debug(x);
447 rst(senderStream);
448 }
449
450 public void data(DataInfo dataInfo)
451 {
452 Stream receiverStream;
453 DataInfoCallback dataInfoCallbackToFlush = null;
454 DataInfoCallback dataInfoCallBackToQueue = new DataInfoCallback(dataInfo);
455 synchronized (queue)
456 {
457 queue.offer(dataInfoCallBackToQueue);
458 receiverStream = this.receiverStream;
459 if (receiverStream != null)
460 {
461 dataInfoCallbackToFlush = queue.peek();
462 if (dataInfoCallbackToFlush.flushing)
463 {
464 if (LOG.isDebugEnabled())
465 LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoCallbackToFlush.dataInfo, queue.size());
466 receiverStream = null;
467 }
468 else
469 {
470 dataInfoCallbackToFlush.flushing = true;
471 if (LOG.isDebugEnabled())
472 LOG.debug("Queued {}, queue size {}", dataInfo, queue.size());
473 }
474 }
475 else
476 {
477 if (LOG.isDebugEnabled())
478 LOG.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
479 }
480 }
481 if (receiverStream != null)
482 flush(receiverStream, dataInfoCallbackToFlush);
483 }
484
485 private void flush(Stream receiverStream, DataInfoCallback dataInfoCallback)
486 {
487 if (LOG.isDebugEnabled())
488 LOG.debug("P -> S {} on {}", dataInfoCallback.dataInfo, receiverStream);
489 receiverStream.data(dataInfoCallback.dataInfo, dataInfoCallback);
490 }
491
492 private class DataInfoCallback implements Callback
493 {
494 private final DataInfo dataInfo;
495 private boolean flushing;
496
497 private DataInfoCallback(DataInfo dataInfo)
498 {
499 this.dataInfo = dataInfo;
500 }
501
502 @Override
503 public void succeeded()
504 {
505 Stream serverStream;
506 DataInfoCallback dataInfoCallback;
507 synchronized (queue)
508 {
509 serverStream = StreamPromise.this.receiverStream;
510 assert serverStream != null;
511 dataInfoCallback = queue.poll();
512 assert dataInfoCallback == this;
513 dataInfoCallback = queue.peek();
514 if (dataInfoCallback != null)
515 {
516 assert !dataInfoCallback.flushing;
517 dataInfoCallback.flushing = true;
518 if (LOG.isDebugEnabled())
519 LOG.debug("Completed {}, queue size {}", dataInfo, queue.size());
520 }
521 else
522 {
523 if (LOG.isDebugEnabled())
524 LOG.debug("Completed {}, queue empty", dataInfo);
525 }
526 }
527 if (dataInfoCallback != null)
528 flush(serverStream, dataInfoCallback);
529 }
530
531 @Override
532 public void failed(Throwable x)
533 {
534 LOG.debug(x);
535 rst(senderStream);
536 }
537 }
538
539 public Stream getSenderStream()
540 {
541 return senderStream;
542 }
543
544 public Info getInfo()
545 {
546 return info;
547 }
548
549 public Stream getReceiverStream()
550 {
551 synchronized (queue)
552 {
553 return receiverStream;
554 }
555 }
556 }
557
558 private class PushStreamPromise extends StreamPromise
559 {
560 private volatile PushStreamPromise pushStreamPromise;
561
562 private PushStreamPromise(Stream senderStream, PushInfo pushInfo)
563 {
564 super(senderStream, pushInfo);
565 }
566
567 @Override
568 public void succeeded(Stream receiverStream)
569 {
570 super.succeeded(receiverStream);
571
572 if (LOG.isDebugEnabled())
573 LOG.debug("P -> C PushStreamPromise.succeeded() called with pushStreamPromise: {}", pushStreamPromise);
574
575 PushStreamPromise promise = pushStreamPromise;
576 if (promise != null)
577 receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
578 }
579
580 public void push(PushStreamPromise pushStreamPromise)
581 {
582 Stream receiverStream = getReceiverStream();
583
584 if (receiverStream != null)
585 receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
586 else
587 this.pushStreamPromise = pushStreamPromise;
588 }
589 }
590
591 private class ProxySessionFrameListener extends SessionFrameListener.Adapter
592 {
593 @Override
594 public void onRst(Session serverSession, RstInfo serverRstInfo)
595 {
596 Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
597 if (serverStream != null)
598 {
599 Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
600 if (clientStream != null)
601 {
602 Session clientSession = clientStream.getSession();
603 RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
604 clientSession.rst(clientRstInfo, Callback.Adapter.INSTANCE);
605 }
606 }
607 }
608
609 @Override
610 public void onGoAway(Session serverSession, GoAwayResultInfo goAwayResultInfo)
611 {
612 serverSessions.values().remove(serverSession);
613 }
614 }
615
616 private PushInfo convertPushInfo(PushInfo pushInfo, Stream from, Stream to)
617 {
618 Fields headersToConvert = pushInfo.getHeaders();
619 Fields headers = convertHeaders(from, to, headersToConvert);
620 return new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, pushInfo.isClose());
621 }
622
623 private Fields convertHeaders(Stream from, Stream to, Fields headersToConvert)
624 {
625 Fields headers = new Fields(headersToConvert, false);
626 addResponseProxyHeaders(from, headers);
627 customizeResponseHeaders(from, headers);
628 convert(from.getSession().getVersion(), to.getSession().getVersion(), headers);
629 return headers;
630 }
631 }