1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.spdy.server.proxy;
21
22 import java.net.InetSocketAddress;
23 import java.util.LinkedList;
24 import java.util.Queue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.TimeUnit;
28
29 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
30 import org.eclipse.jetty.spdy.api.DataInfo;
31 import org.eclipse.jetty.spdy.api.GoAwayInfo;
32 import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
33 import org.eclipse.jetty.spdy.api.HeadersInfo;
34 import org.eclipse.jetty.spdy.api.Info;
35 import org.eclipse.jetty.spdy.api.PushInfo;
36 import org.eclipse.jetty.spdy.api.ReplyInfo;
37 import org.eclipse.jetty.spdy.api.RstInfo;
38 import org.eclipse.jetty.spdy.api.SPDY;
39 import org.eclipse.jetty.spdy.api.Session;
40 import org.eclipse.jetty.spdy.api.SessionFrameListener;
41 import org.eclipse.jetty.spdy.api.Stream;
42 import org.eclipse.jetty.spdy.api.StreamFrameListener;
43 import org.eclipse.jetty.spdy.api.StreamStatus;
44 import org.eclipse.jetty.spdy.api.SynInfo;
45 import org.eclipse.jetty.spdy.client.SPDYClient;
46 import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
47 import org.eclipse.jetty.util.Callback;
48 import org.eclipse.jetty.util.Fields;
49 import org.eclipse.jetty.util.Promise;
50 import org.eclipse.jetty.util.log.Log;
51 import org.eclipse.jetty.util.log.Logger;
52
53
54
55
56
57 public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
58 {
59 private static final Logger LOG = Log.getLogger(SPDYProxyEngine.class);
60
61 private static final String STREAM_PROMISE_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamPromise";
62 private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.clientStream";
63
64 private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
65 private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
66 private final SPDYClient.Factory factory;
67 private volatile long connectTimeout = 15000;
68 private volatile long timeout = 60000;
69
70 public SPDYProxyEngine(SPDYClient.Factory factory)
71 {
72 this.factory = factory;
73 }
74
75 public long getConnectTimeout()
76 {
77 return connectTimeout;
78 }
79
80 public void setConnectTimeout(long connectTimeout)
81 {
82 this.connectTimeout = connectTimeout;
83 }
84
85 public long getTimeout()
86 {
87 return timeout;
88 }
89
90 public void setTimeout(long timeout)
91 {
92 this.timeout = timeout;
93 }
94
95 public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
96 {
97 Fields headers = new Fields(clientSynInfo.getHeaders(), false);
98
99 short serverVersion = getVersion(proxyServerInfo.getProtocol());
100 InetSocketAddress address = proxyServerInfo.getAddress();
101 Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
102 if (serverSession == null)
103 {
104 rst(clientStream);
105 return null;
106 }
107
108 final Session clientSession = clientStream.getSession();
109
110 addRequestProxyHeaders(clientStream, headers);
111 customizeRequestHeaders(clientStream, headers);
112 convert(clientSession.getVersion(), serverVersion, headers);
113
114 SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
115 StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
116 StreamPromise promise = new StreamPromise(clientStream, serverSynInfo);
117 clientStream.setAttribute(STREAM_PROMISE_ATTRIBUTE, promise);
118 serverSession.syn(serverSynInfo, listener, promise);
119 return this;
120 }
121
122 private static short getVersion(String protocol)
123 {
124 switch (protocol)
125 {
126 case "spdy/2":
127 return SPDY.V2;
128 case "spdy/3":
129 return SPDY.V3;
130 default:
131 throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
132 }
133 }
134
135 @Override
136 public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
137 {
138 throw new IllegalStateException("We shouldn't receive pushes from clients");
139 }
140
141 @Override
142 public void onReply(Stream stream, ReplyInfo replyInfo)
143 {
144 throw new IllegalStateException("Servers do not receive replies");
145 }
146
147 @Override
148 public void onHeaders(Stream stream, HeadersInfo headersInfo)
149 {
150
151 throw new UnsupportedOperationException("Not Yet Implemented");
152 }
153
154 @Override
155 public void onData(Stream clientStream, final DataInfo clientDataInfo)
156 {
157 LOG.debug("C -> P {} on {}", clientDataInfo, clientStream);
158
159 ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
160 {
161 @Override
162 public void consume(int delta)
163 {
164 super.consume(delta);
165 clientDataInfo.consume(delta);
166 }
167 };
168
169 StreamPromise streamPromise = (StreamPromise)clientStream.getAttribute(STREAM_PROMISE_ATTRIBUTE);
170 streamPromise.data(serverDataInfo);
171 }
172
173 @Override
174 public void onFailure(Stream stream, Throwable x)
175 {
176 LOG.debug(x);
177 }
178
179 private Session produceSession(String host, short version, InetSocketAddress address)
180 {
181 try
182 {
183 Session session = serverSessions.get(host);
184 if (session == null)
185 {
186 SPDYClient client = factory.newSPDYClient(version);
187 session = client.connect(address, sessionListener);
188 LOG.debug("Proxy session connected to {}", address);
189 Session existing = serverSessions.putIfAbsent(host, session);
190 if (existing != null)
191 {
192 session.goAway(new GoAwayInfo(), new Callback.Adapter());
193 session = existing;
194 }
195 }
196 return session;
197 }
198 catch (Exception x)
199 {
200 LOG.debug(x);
201 return null;
202 }
203 }
204
205 private void convert(short fromVersion, short toVersion, Fields headers)
206 {
207 if (fromVersion != toVersion)
208 {
209 for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
210 {
211 Fields.Field header = headers.remove(httpHeader.name(fromVersion));
212 if (header != null)
213 {
214 String toName = httpHeader.name(toVersion);
215 for (String value : header.getValues())
216 headers.add(toName, value);
217 }
218 }
219 }
220 }
221
222 private void rst(Stream stream)
223 {
224 RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
225 stream.getSession().rst(rstInfo, new Callback.Adapter());
226 }
227
228 private class ProxyPushStreamFrameListener implements StreamFrameListener
229 {
230 private PushStreamPromise pushStreamPromise;
231
232 private ProxyPushStreamFrameListener(PushStreamPromise pushStreamPromise)
233 {
234 this.pushStreamPromise = pushStreamPromise;
235 }
236
237 @Override
238 public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
239 {
240 LOG.debug("S -> P pushed {} on {}. Opening new PushStream P -> C now.", pushInfo, stream);
241 PushStreamPromise newPushStreamPromise = new PushStreamPromise(stream, pushInfo);
242 this.pushStreamPromise.push(newPushStreamPromise);
243 return new ProxyPushStreamFrameListener(newPushStreamPromise);
244 }
245
246 @Override
247 public void onReply(Stream stream, ReplyInfo replyInfo)
248 {
249
250 throw new UnsupportedOperationException();
251 }
252
253 @Override
254 public void onHeaders(Stream stream, HeadersInfo headersInfo)
255 {
256 throw new UnsupportedOperationException();
257 }
258
259 @Override
260 public void onData(Stream serverStream, final DataInfo serverDataInfo)
261 {
262 LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
263
264 ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
265 {
266 @Override
267 public void consume(int delta)
268 {
269 super.consume(delta);
270 serverDataInfo.consume(delta);
271 }
272 };
273
274 pushStreamPromise.data(clientDataInfo);
275 }
276
277 @Override
278 public void onFailure(Stream stream, Throwable x)
279 {
280 LOG.debug(x);
281 }
282 }
283
284 private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
285 {
286 private final Stream receiverStream;
287
288 public ProxyStreamFrameListener(Stream receiverStream)
289 {
290 this.receiverStream = receiverStream;
291 }
292
293 @Override
294 public StreamFrameListener onPush(Stream senderStream, PushInfo pushInfo)
295 {
296 LOG.debug("S -> P {} on {}");
297 PushInfo newPushInfo = convertPushInfo(pushInfo, senderStream, receiverStream);
298 PushStreamPromise pushStreamPromise = new PushStreamPromise(senderStream, newPushInfo);
299 receiverStream.push(newPushInfo, pushStreamPromise);
300 return new ProxyPushStreamFrameListener(pushStreamPromise);
301 }
302
303 @Override
304 public void onReply(final Stream stream, ReplyInfo replyInfo)
305 {
306 LOG.debug("S -> P {} on {}", replyInfo, stream);
307 final ReplyInfo clientReplyInfo = new ReplyInfo(convertHeaders(stream, receiverStream, replyInfo.getHeaders()),
308 replyInfo.isClose());
309 reply(stream, clientReplyInfo);
310 }
311
312 private void reply(final Stream stream, final ReplyInfo clientReplyInfo)
313 {
314 receiverStream.reply(clientReplyInfo, new Callback()
315 {
316 @Override
317 public void succeeded()
318 {
319 LOG.debug("P -> C {} from {} to {}", clientReplyInfo, stream, receiverStream);
320 }
321
322 @Override
323 public void failed(Throwable x)
324 {
325 LOG.debug(x);
326 rst(receiverStream);
327 }
328 });
329 }
330
331 @Override
332 public void onHeaders(Stream stream, HeadersInfo headersInfo)
333 {
334
335 throw new UnsupportedOperationException("Not Yet Implemented");
336 }
337
338 @Override
339 public void onData(final Stream stream, final DataInfo dataInfo)
340 {
341 LOG.debug("S -> P {} on {}", dataInfo, stream);
342 data(stream, dataInfo);
343 }
344
345 private void data(final Stream stream, final DataInfo serverDataInfo)
346 {
347 final ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
348 {
349 @Override
350 public void consume(int delta)
351 {
352 super.consume(delta);
353 serverDataInfo.consume(delta);
354 }
355 };
356
357 receiverStream.data(clientDataInfo, new Callback()
358 {
359 @Override
360 public void succeeded()
361 {
362 LOG.debug("P -> C {} from {} to {}", clientDataInfo, stream, receiverStream);
363 }
364
365 @Override
366 public void failed(Throwable x)
367 {
368 LOG.debug(x);
369 rst(receiverStream);
370 }
371 });
372 }
373 }
374
375
376
377
378
379
380
381
382
383 private class StreamPromise implements Promise<Stream>
384 {
385 private final Queue<DataInfoCallback> queue = new LinkedList<>();
386 private final Stream senderStream;
387 private final Info info;
388 private Stream receiverStream;
389
390 private StreamPromise(Stream senderStream, Info info)
391 {
392 this.senderStream = senderStream;
393 this.info = info;
394 }
395
396 @Override
397 public void succeeded(Stream stream)
398 {
399 LOG.debug("P -> S {} from {} to {}", info, senderStream, stream);
400
401 stream.setAttribute(CLIENT_STREAM_ATTRIBUTE, senderStream);
402
403 DataInfoCallback dataInfoCallback;
404 synchronized (queue)
405 {
406 this.receiverStream = stream;
407 dataInfoCallback = queue.peek();
408 if (dataInfoCallback != null)
409 {
410 if (dataInfoCallback.flushing)
411 {
412 LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoCallback.dataInfo, queue.size());
413 dataInfoCallback = null;
414 }
415 else
416 {
417 dataInfoCallback.flushing = true;
418 LOG.debug("SYN completed, queue size {}", queue.size());
419 }
420 }
421 else
422 {
423 LOG.debug("SYN completed, queue empty");
424 }
425 }
426 if (dataInfoCallback != null)
427 flush(stream, dataInfoCallback);
428 }
429
430 @Override
431 public void failed(Throwable x)
432 {
433 LOG.debug(x);
434 rst(senderStream);
435 }
436
437 public void data(DataInfo dataInfo)
438 {
439 Stream receiverStream;
440 DataInfoCallback dataInfoCallbackToFlush = null;
441 DataInfoCallback dataInfoCallBackToQueue = new DataInfoCallback(dataInfo);
442 synchronized (queue)
443 {
444 queue.offer(dataInfoCallBackToQueue);
445 receiverStream = this.receiverStream;
446 if (receiverStream != null)
447 {
448 dataInfoCallbackToFlush = queue.peek();
449 if (dataInfoCallbackToFlush.flushing)
450 {
451 LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoCallbackToFlush.dataInfo, queue.size());
452 receiverStream = null;
453 }
454 else
455 {
456 dataInfoCallbackToFlush.flushing = true;
457 LOG.debug("Queued {}, queue size {}", dataInfo, queue.size());
458 }
459 }
460 else
461 {
462 LOG.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
463 }
464 }
465 if (receiverStream != null)
466 flush(receiverStream, dataInfoCallbackToFlush);
467 }
468
469 private void flush(Stream receiverStream, DataInfoCallback dataInfoCallback)
470 {
471 LOG.debug("P -> S {} on {}", dataInfoCallback.dataInfo, receiverStream);
472 receiverStream.data(dataInfoCallback.dataInfo, dataInfoCallback);
473 }
474
475 private class DataInfoCallback implements Callback
476 {
477 private final DataInfo dataInfo;
478 private boolean flushing;
479
480 private DataInfoCallback(DataInfo dataInfo)
481 {
482 this.dataInfo = dataInfo;
483 }
484
485 @Override
486 public void succeeded()
487 {
488 Stream serverStream;
489 DataInfoCallback dataInfoCallback;
490 synchronized (queue)
491 {
492 serverStream = StreamPromise.this.receiverStream;
493 assert serverStream != null;
494 dataInfoCallback = queue.poll();
495 assert dataInfoCallback == this;
496 dataInfoCallback = queue.peek();
497 if (dataInfoCallback != null)
498 {
499 assert !dataInfoCallback.flushing;
500 dataInfoCallback.flushing = true;
501 LOG.debug("Completed {}, queue size {}", dataInfo, queue.size());
502 }
503 else
504 {
505 LOG.debug("Completed {}, queue empty", dataInfo);
506 }
507 }
508 if (dataInfoCallback != null)
509 flush(serverStream, dataInfoCallback);
510 }
511
512 @Override
513 public void failed(Throwable x)
514 {
515 LOG.debug(x);
516 rst(senderStream);
517 }
518 }
519
520 public Stream getSenderStream()
521 {
522 return senderStream;
523 }
524
525 public Info getInfo()
526 {
527 return info;
528 }
529
530 public Stream getReceiverStream()
531 {
532 synchronized (queue)
533 {
534 return receiverStream;
535 }
536 }
537 }
538
539 private class PushStreamPromise extends StreamPromise
540 {
541 private volatile PushStreamPromise pushStreamPromise;
542
543 private PushStreamPromise(Stream senderStream, PushInfo pushInfo)
544 {
545 super(senderStream, pushInfo);
546 }
547
548 @Override
549 public void succeeded(Stream receiverStream)
550 {
551 super.succeeded(receiverStream);
552
553 LOG.debug("P -> C PushStreamPromise.succeeded() called with pushStreamPromise: {}", pushStreamPromise);
554
555 PushStreamPromise promise = pushStreamPromise;
556 if (promise != null)
557 receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
558 }
559
560 public void push(PushStreamPromise pushStreamPromise)
561 {
562 Stream receiverStream = getReceiverStream();
563
564 if (receiverStream != null)
565 receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
566 else
567 this.pushStreamPromise = pushStreamPromise;
568 }
569 }
570
571 private class ProxySessionFrameListener extends SessionFrameListener.Adapter
572 {
573 @Override
574 public void onRst(Session serverSession, RstInfo serverRstInfo)
575 {
576 Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
577 if (serverStream != null)
578 {
579 Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
580 if (clientStream != null)
581 {
582 Session clientSession = clientStream.getSession();
583 RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
584 clientSession.rst(clientRstInfo, new Callback.Adapter());
585 }
586 }
587 }
588
589 @Override
590 public void onGoAway(Session serverSession, GoAwayResultInfo goAwayResultInfo)
591 {
592 serverSessions.values().remove(serverSession);
593 }
594 }
595
596 private PushInfo convertPushInfo(PushInfo pushInfo, Stream from, Stream to)
597 {
598 Fields headersToConvert = pushInfo.getHeaders();
599 Fields headers = convertHeaders(from, to, headersToConvert);
600 return new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, pushInfo.isClose());
601 }
602
603 private Fields convertHeaders(Stream from, Stream to, Fields headersToConvert)
604 {
605 Fields headers = new Fields(headersToConvert, false);
606 addResponseProxyHeaders(from, headers);
607 customizeResponseHeaders(from, headers);
608 convert(from.getSession().getVersion(), to.getSession().getVersion(), headers);
609 return headers;
610 }
611 }