1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.spdy.server.proxy;
21
22 import java.net.InetSocketAddress;
23 import java.util.LinkedList;
24 import java.util.Queue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.TimeUnit;
28
29 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
30 import org.eclipse.jetty.spdy.api.DataInfo;
31 import org.eclipse.jetty.spdy.api.GoAwayInfo;
32 import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
33 import org.eclipse.jetty.spdy.api.HeadersInfo;
34 import org.eclipse.jetty.spdy.api.Info;
35 import org.eclipse.jetty.spdy.api.PushInfo;
36 import org.eclipse.jetty.spdy.api.ReplyInfo;
37 import org.eclipse.jetty.spdy.api.RstInfo;
38 import org.eclipse.jetty.spdy.api.SPDY;
39 import org.eclipse.jetty.spdy.api.Session;
40 import org.eclipse.jetty.spdy.api.SessionFrameListener;
41 import org.eclipse.jetty.spdy.api.Stream;
42 import org.eclipse.jetty.spdy.api.StreamFrameListener;
43 import org.eclipse.jetty.spdy.api.StreamStatus;
44 import org.eclipse.jetty.spdy.api.SynInfo;
45 import org.eclipse.jetty.spdy.client.SPDYClient;
46 import org.eclipse.jetty.spdy.server.http.HTTPSPDYHeader;
47 import org.eclipse.jetty.util.Callback;
48 import org.eclipse.jetty.util.Fields;
49 import org.eclipse.jetty.util.Promise;
50 import org.eclipse.jetty.util.log.Log;
51 import org.eclipse.jetty.util.log.Logger;
52
53
54
55
56
57 public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
58 {
59 private static final Logger LOG = Log.getLogger(SPDYProxyEngine.class);
60
61 private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamHandler";
62 private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.clientStream";
63
64 private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
65 private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
66 private final SPDYClient.Factory factory;
67 private volatile long connectTimeout = 15000;
68 private volatile long timeout = 60000;
69
70 public SPDYProxyEngine(SPDYClient.Factory factory)
71 {
72 this.factory = factory;
73 }
74
75 public long getConnectTimeout()
76 {
77 return connectTimeout;
78 }
79
80 public void setConnectTimeout(long connectTimeout)
81 {
82 this.connectTimeout = connectTimeout;
83 }
84
85 public long getTimeout()
86 {
87 return timeout;
88 }
89
90 public void setTimeout(long timeout)
91 {
92 this.timeout = timeout;
93 }
94
95 public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
96 {
97 Fields headers = new Fields(clientSynInfo.getHeaders(), false);
98
99 short serverVersion = getVersion(proxyServerInfo.getProtocol());
100 InetSocketAddress address = proxyServerInfo.getAddress();
101 Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
102 if (serverSession == null)
103 {
104 rst(clientStream);
105 return null;
106 }
107
108 final Session clientSession = clientStream.getSession();
109
110 addRequestProxyHeaders(clientStream, headers);
111 customizeRequestHeaders(clientStream, headers);
112 convert(clientSession.getVersion(), serverVersion, headers);
113
114 SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
115 StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
116 StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
117 clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
118 serverSession.syn(serverSynInfo, listener, handler);
119 return this;
120 }
121
122 private static short getVersion(String protocol)
123 {
124 switch (protocol)
125 {
126 case "spdy/2":
127 return SPDY.V2;
128 case "spdy/3":
129 return SPDY.V3;
130 default:
131 throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
132 }
133 }
134
135 @Override
136 public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
137 {
138 throw new IllegalStateException("We shouldn't receive pushes from clients");
139 }
140
141 @Override
142 public void onReply(Stream stream, ReplyInfo replyInfo)
143 {
144 throw new IllegalStateException("Servers do not receive replies");
145 }
146
147 @Override
148 public void onHeaders(Stream stream, HeadersInfo headersInfo)
149 {
150
151 throw new UnsupportedOperationException("Not Yet Implemented");
152 }
153
154 @Override
155 public void onData(Stream clientStream, final DataInfo clientDataInfo)
156 {
157 LOG.debug("C -> P {} on {}", clientDataInfo, clientStream);
158
159 ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
160 {
161 @Override
162 public void consume(int delta)
163 {
164 super.consume(delta);
165 clientDataInfo.consume(delta);
166 }
167 };
168
169 StreamHandler streamHandler = (StreamHandler)clientStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
170 streamHandler.data(serverDataInfo);
171 }
172
173 private Session produceSession(String host, short version, InetSocketAddress address)
174 {
175 try
176 {
177 Session session = serverSessions.get(host);
178 if (session == null)
179 {
180 SPDYClient client = factory.newSPDYClient(version);
181 session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
182 LOG.debug("Proxy session connected to {}", address);
183 Session existing = serverSessions.putIfAbsent(host, session);
184 if (existing != null)
185 {
186 session.goAway(new GoAwayInfo(), new Callback.Adapter());
187 session = existing;
188 }
189 }
190 return session;
191 }
192 catch (Exception x)
193 {
194 LOG.debug(x);
195 return null;
196 }
197 }
198
199 private void convert(short fromVersion, short toVersion, Fields headers)
200 {
201 if (fromVersion != toVersion)
202 {
203 for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
204 {
205 Fields.Field header = headers.remove(httpHeader.name(fromVersion));
206 if (header != null)
207 {
208 String toName = httpHeader.name(toVersion);
209 for (String value : header.values())
210 headers.add(toName, value);
211 }
212 }
213 }
214 }
215
216 private void rst(Stream stream)
217 {
218 RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
219 stream.getSession().rst(rstInfo, new Callback.Adapter());
220 }
221
222 private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
223 {
224 private final Stream clientStream;
225 private volatile ReplyInfo replyInfo;
226
227 public ProxyStreamFrameListener(Stream clientStream)
228 {
229 this.clientStream = clientStream;
230 }
231
232 @Override
233 public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
234 {
235 LOG.debug("S -> P pushed {} on {}", pushInfo, stream);
236
237 Fields headers = new Fields(pushInfo.getHeaders(), false);
238
239 addResponseProxyHeaders(stream, headers);
240 customizeResponseHeaders(stream, headers);
241 Stream clientStream = (Stream)stream.getAssociatedStream().getAttribute
242 (CLIENT_STREAM_ATTRIBUTE);
243 convert(stream.getSession().getVersion(), clientStream.getSession().getVersion(),
244 headers);
245
246 StreamHandler handler = new StreamHandler(clientStream, pushInfo);
247 stream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
248 clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers,
249 pushInfo.isClose()),
250 handler);
251 return new Adapter()
252 {
253 @Override
254 public void onReply(Stream stream, ReplyInfo replyInfo)
255 {
256
257 throw new UnsupportedOperationException();
258 }
259
260 @Override
261 public void onHeaders(Stream stream, HeadersInfo headersInfo)
262 {
263 throw new UnsupportedOperationException();
264 }
265
266 @Override
267 public void onData(Stream serverStream, final DataInfo serverDataInfo)
268 {
269 LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
270
271 ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
272 {
273 @Override
274 public void consume(int delta)
275 {
276 super.consume(delta);
277 serverDataInfo.consume(delta);
278 }
279 };
280
281 StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
282 handler.data(clientDataInfo);
283 }
284 };
285 }
286
287 @Override
288 public void onReply(final Stream stream, ReplyInfo replyInfo)
289 {
290 LOG.debug("S -> P {} on {}", replyInfo, stream);
291
292 short serverVersion = stream.getSession().getVersion();
293 Fields headers = new Fields(replyInfo.getHeaders(), false);
294
295 addResponseProxyHeaders(stream, headers);
296 customizeResponseHeaders(stream, headers);
297 short clientVersion = this.clientStream.getSession().getVersion();
298 convert(serverVersion, clientVersion, headers);
299
300 this.replyInfo = new ReplyInfo(headers, replyInfo.isClose());
301 if (replyInfo.isClose())
302 reply(stream);
303 }
304
305 @Override
306 public void onHeaders(Stream stream, HeadersInfo headersInfo)
307 {
308
309 throw new UnsupportedOperationException("Not Yet Implemented");
310 }
311
312 @Override
313 public void onData(final Stream stream, final DataInfo dataInfo)
314 {
315 LOG.debug("S -> P {} on {}", dataInfo, stream);
316
317 if (replyInfo != null)
318 {
319 if (dataInfo.isClose())
320 replyInfo.getHeaders().put("content-length", String.valueOf(dataInfo.available()));
321 reply(stream);
322 }
323 data(stream, dataInfo);
324 }
325
326 private void reply(final Stream stream)
327 {
328 final ReplyInfo replyInfo = this.replyInfo;
329 this.replyInfo = null;
330 clientStream.reply(replyInfo, new Callback()
331 {
332 @Override
333 public void succeeded()
334 {
335 LOG.debug("P -> C {} from {} to {}", replyInfo, stream, clientStream);
336 }
337
338 @Override
339 public void failed(Throwable x)
340 {
341 LOG.debug(x);
342 rst(clientStream);
343 }
344 });
345 }
346
347 private void data(final Stream stream, final DataInfo dataInfo)
348 {
349 clientStream.data(dataInfo, new Callback()
350 {
351 @Override
352 public void succeeded()
353 {
354 dataInfo.consume(dataInfo.length());
355 LOG.debug("P -> C {} from {} to {}", dataInfo, stream, clientStream);
356 }
357
358 @Override
359 public void failed(Throwable x)
360 {
361 LOG.debug(x);
362 rst(clientStream);
363 }
364 });
365 }
366 }
367
368
369
370
371
372
373
374
375
376 private class StreamHandler implements Promise<Stream>
377 {
378 private final Queue<DataInfoHandler> queue = new LinkedList<>();
379 private final Stream clientStream;
380 private final Info info;
381 private Stream serverStream;
382
383 private StreamHandler(Stream clientStream, Info info)
384 {
385 this.clientStream = clientStream;
386 this.info = info;
387 }
388
389 @Override
390 public void succeeded(Stream serverStream)
391 {
392 LOG.debug("P -> S {} from {} to {}", info, clientStream, serverStream);
393
394 serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
395
396 DataInfoHandler dataInfoHandler;
397 synchronized (queue)
398 {
399 this.serverStream = serverStream;
400 dataInfoHandler = queue.peek();
401 if (dataInfoHandler != null)
402 {
403 if (dataInfoHandler.flushing)
404 {
405 LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size());
406 dataInfoHandler = null;
407 }
408 else
409 {
410 dataInfoHandler.flushing = true;
411 LOG.debug("SYN completed, queue size {}", queue.size());
412 }
413 }
414 else
415 {
416 LOG.debug("SYN completed, queue empty");
417 }
418 }
419 if (dataInfoHandler != null)
420 flush(serverStream, dataInfoHandler);
421 }
422
423 @Override
424 public void failed(Throwable x)
425 {
426 LOG.debug(x);
427 rst(clientStream);
428 }
429
430 public void data(DataInfo dataInfo)
431 {
432 Stream serverStream;
433 DataInfoHandler dataInfoHandler = null;
434 DataInfoHandler item = new DataInfoHandler(dataInfo);
435 synchronized (queue)
436 {
437 queue.offer(item);
438 serverStream = this.serverStream;
439 if (serverStream != null)
440 {
441 dataInfoHandler = queue.peek();
442 if (dataInfoHandler.flushing)
443 {
444 LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size());
445 serverStream = null;
446 }
447 else
448 {
449 dataInfoHandler.flushing = true;
450 LOG.debug("Queued {}, queue size {}", dataInfo, queue.size());
451 }
452 }
453 else
454 {
455 LOG.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
456 }
457 }
458 if (serverStream != null)
459 flush(serverStream, dataInfoHandler);
460 }
461
462 private void flush(Stream serverStream, DataInfoHandler dataInfoHandler)
463 {
464 LOG.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
465 serverStream.data(dataInfoHandler.dataInfo, dataInfoHandler);
466 }
467
468 private class DataInfoHandler implements Callback
469 {
470 private final DataInfo dataInfo;
471 private boolean flushing;
472
473 private DataInfoHandler(DataInfo dataInfo)
474 {
475 this.dataInfo = dataInfo;
476 }
477
478 @Override
479 public void succeeded()
480 {
481 Stream serverStream;
482 DataInfoHandler dataInfoHandler;
483 synchronized (queue)
484 {
485 serverStream = StreamHandler.this.serverStream;
486 assert serverStream != null;
487 dataInfoHandler = queue.poll();
488 assert dataInfoHandler == this;
489 dataInfoHandler = queue.peek();
490 if (dataInfoHandler != null)
491 {
492 assert !dataInfoHandler.flushing;
493 dataInfoHandler.flushing = true;
494 LOG.debug("Completed {}, queue size {}", dataInfo, queue.size());
495 }
496 else
497 {
498 LOG.debug("Completed {}, queue empty", dataInfo);
499 }
500 }
501 if (dataInfoHandler != null)
502 flush(serverStream, dataInfoHandler);
503 }
504
505 @Override
506 public void failed(Throwable x)
507 {
508 LOG.debug(x);
509 rst(clientStream);
510 }
511 }
512 }
513
514 private class ProxySessionFrameListener extends SessionFrameListener.Adapter
515 {
516
517 @Override
518 public void onRst(Session serverSession, RstInfo serverRstInfo)
519 {
520 Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
521 if (serverStream != null)
522 {
523 Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
524 if (clientStream != null)
525 {
526 Session clientSession = clientStream.getSession();
527 RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
528 clientSession.rst(clientRstInfo, new Callback.Adapter());
529 }
530 }
531 }
532
533 @Override
534 public void onGoAway(Session serverSession, GoAwayResultInfo goAwayResultInfo)
535 {
536 serverSessions.values().remove(serverSession);
537 }
538 }
539 }