1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.spdy.proxy;
21
22 import java.net.InetSocketAddress;
23 import java.util.LinkedList;
24 import java.util.Queue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.TimeUnit;
28
29 import org.eclipse.jetty.spdy.SPDYClient;
30 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
31 import org.eclipse.jetty.spdy.api.DataInfo;
32 import org.eclipse.jetty.spdy.api.GoAwayInfo;
33 import org.eclipse.jetty.spdy.api.Handler;
34 import org.eclipse.jetty.spdy.api.Headers;
35 import org.eclipse.jetty.spdy.api.HeadersInfo;
36 import org.eclipse.jetty.spdy.api.ReplyInfo;
37 import org.eclipse.jetty.spdy.api.RstInfo;
38 import org.eclipse.jetty.spdy.api.SPDY;
39 import org.eclipse.jetty.spdy.api.Session;
40 import org.eclipse.jetty.spdy.api.SessionFrameListener;
41 import org.eclipse.jetty.spdy.api.Stream;
42 import org.eclipse.jetty.spdy.api.StreamFrameListener;
43 import org.eclipse.jetty.spdy.api.StreamStatus;
44 import org.eclipse.jetty.spdy.api.SynInfo;
45 import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
46
47
48
49
50
51 public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
52 {
53 private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler";
54 private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientStream";
55
56 private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
57 private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
58 private final SPDYClient.Factory factory;
59 private volatile long connectTimeout = 15000;
60 private volatile long timeout = 60000;
61
62 public SPDYProxyEngine(SPDYClient.Factory factory)
63 {
64 this.factory = factory;
65 }
66
67 public long getConnectTimeout()
68 {
69 return connectTimeout;
70 }
71
72 public void setConnectTimeout(long connectTimeout)
73 {
74 this.connectTimeout = connectTimeout;
75 }
76
77 public long getTimeout()
78 {
79 return timeout;
80 }
81
82 public void setTimeout(long timeout)
83 {
84 this.timeout = timeout;
85 }
86
87 public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
88 {
89 Headers headers = new Headers(clientSynInfo.getHeaders(), false);
90
91 short serverVersion = getVersion(proxyServerInfo.getProtocol());
92 InetSocketAddress address = proxyServerInfo.getAddress();
93 Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
94 if (serverSession == null)
95 {
96 rst(clientStream);
97 return null;
98 }
99
100 final Session clientSession = clientStream.getSession();
101
102 addRequestProxyHeaders(clientStream, headers);
103 customizeRequestHeaders(clientStream, headers);
104 convert(clientSession.getVersion(), serverVersion, headers);
105
106 SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
107 StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
108 StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
109 clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
110 serverSession.syn(serverSynInfo, listener, timeout, TimeUnit.MILLISECONDS, handler);
111 return this;
112 }
113
114 private static short getVersion(String protocol)
115 {
116 switch (protocol)
117 {
118 case "spdy/2":
119 return SPDY.V2;
120 case "spdy/3":
121 return SPDY.V3;
122 default:
123 throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
124 }
125 }
126
127 @Override
128 public void onReply(Stream stream, ReplyInfo replyInfo)
129 {
130
131 }
132
133 @Override
134 public void onHeaders(Stream stream, HeadersInfo headersInfo)
135 {
136
137 throw new UnsupportedOperationException("Not Yet Implemented");
138 }
139
140 @Override
141 public void onData(Stream clientStream, final DataInfo clientDataInfo)
142 {
143 logger.debug("C -> P {} on {}", clientDataInfo, clientStream);
144
145 ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
146 {
147 @Override
148 public void consume(int delta)
149 {
150 super.consume(delta);
151 clientDataInfo.consume(delta);
152 }
153 };
154
155 StreamHandler streamHandler = (StreamHandler)clientStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
156 streamHandler.data(serverDataInfo);
157 }
158
159 private Session produceSession(String host, short version, InetSocketAddress address)
160 {
161 try
162 {
163 Session session = serverSessions.get(host);
164 if (session == null)
165 {
166 SPDYClient client = factory.newSPDYClient(version);
167 session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
168 logger.debug("Proxy session connected to {}", address);
169 Session existing = serverSessions.putIfAbsent(host, session);
170 if (existing != null)
171 {
172 session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
173 session = existing;
174 }
175 }
176 return session;
177 }
178 catch (Exception x)
179 {
180 logger.debug(x);
181 return null;
182 }
183 }
184
185 private void convert(short fromVersion, short toVersion, Headers headers)
186 {
187 if (fromVersion != toVersion)
188 {
189 for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
190 {
191 Headers.Header header = headers.remove(httpHeader.name(fromVersion));
192 if (header != null)
193 {
194 String toName = httpHeader.name(toVersion);
195 for (String value : header.values())
196 headers.add(toName, value);
197 }
198 }
199 }
200 }
201
202 private void rst(Stream stream)
203 {
204 RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
205 stream.getSession().rst(rstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
206 }
207
208 private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
209 {
210 private final Stream clientStream;
211 private volatile ReplyInfo replyInfo;
212
213 public ProxyStreamFrameListener(Stream clientStream)
214 {
215 this.clientStream = clientStream;
216 }
217
218 @Override
219 public void onReply(final Stream stream, ReplyInfo replyInfo)
220 {
221 logger.debug("S -> P {} on {}", replyInfo, stream);
222
223 short serverVersion = stream.getSession().getVersion();
224 Headers headers = new Headers(replyInfo.getHeaders(), false);
225
226 addResponseProxyHeaders(stream, headers);
227 customizeResponseHeaders(stream, headers);
228 short clientVersion = this.clientStream.getSession().getVersion();
229 convert(serverVersion, clientVersion, headers);
230
231 this.replyInfo = new ReplyInfo(headers, replyInfo.isClose());
232 if (replyInfo.isClose())
233 reply(stream);
234 }
235
236 @Override
237 public void onHeaders(Stream stream, HeadersInfo headersInfo)
238 {
239
240 throw new UnsupportedOperationException("Not Yet Implemented");
241 }
242
243 @Override
244 public void onData(final Stream stream, final DataInfo dataInfo)
245 {
246 logger.debug("S -> P {} on {}", dataInfo, stream);
247
248 if (replyInfo != null)
249 {
250 if (dataInfo.isClose())
251 replyInfo.getHeaders().put("content-length", String.valueOf(dataInfo.available()));
252 reply(stream);
253 }
254 data(stream, dataInfo);
255 }
256
257 private void reply(final Stream stream)
258 {
259 final ReplyInfo replyInfo = this.replyInfo;
260 this.replyInfo = null;
261 clientStream.reply(replyInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler<Void>()
262 {
263 @Override
264 public void completed(Void context)
265 {
266 logger.debug("P -> C {} from {} to {}", replyInfo, stream, clientStream);
267 }
268
269 @Override
270 public void failed(Void context, Throwable x)
271 {
272 logger.debug(x);
273 rst(clientStream);
274 }
275 });
276 }
277
278 private void data(final Stream stream, final DataInfo dataInfo)
279 {
280 clientStream.data(dataInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler<Void>()
281 {
282 @Override
283 public void completed(Void context)
284 {
285 dataInfo.consume(dataInfo.length());
286 logger.debug("P -> C {} from {} to {}", dataInfo, stream, clientStream);
287 }
288
289 @Override
290 public void failed(Void context, Throwable x)
291 {
292 logger.debug(x);
293 rst(clientStream);
294 }
295 });
296 }
297 }
298
299
300
301
302
303
304
305
306
307 private class StreamHandler implements Handler<Stream>
308 {
309 private final Queue<DataInfoHandler> queue = new LinkedList<>();
310 private final Stream clientStream;
311 private final SynInfo serverSynInfo;
312 private Stream serverStream;
313
314 private StreamHandler(Stream clientStream, SynInfo serverSynInfo)
315 {
316 this.clientStream = clientStream;
317 this.serverSynInfo = serverSynInfo;
318 }
319
320 @Override
321 public void completed(Stream serverStream)
322 {
323 logger.debug("P -> S {} from {} to {}", serverSynInfo, clientStream, serverStream);
324
325 serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
326
327 DataInfoHandler dataInfoHandler;
328 synchronized (queue)
329 {
330 this.serverStream = serverStream;
331 dataInfoHandler = queue.peek();
332 if (dataInfoHandler != null)
333 {
334 if (dataInfoHandler.flushing)
335 {
336 logger.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size());
337 dataInfoHandler = null;
338 }
339 else
340 {
341 dataInfoHandler.flushing = true;
342 logger.debug("SYN completed, queue size {}", queue.size());
343 }
344 }
345 else
346 {
347 logger.debug("SYN completed, queue empty");
348 }
349 }
350 if (dataInfoHandler != null)
351 flush(serverStream, dataInfoHandler);
352 }
353
354 @Override
355 public void failed(Stream serverStream, Throwable x)
356 {
357 logger.debug(x);
358 rst(clientStream);
359 }
360
361 public void data(DataInfo dataInfo)
362 {
363 Stream serverStream;
364 DataInfoHandler dataInfoHandler = null;
365 DataInfoHandler item = new DataInfoHandler(dataInfo);
366 synchronized (queue)
367 {
368 queue.offer(item);
369 serverStream = this.serverStream;
370 if (serverStream != null)
371 {
372 dataInfoHandler = queue.peek();
373 if (dataInfoHandler.flushing)
374 {
375 logger.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size());
376 serverStream = null;
377 }
378 else
379 {
380 dataInfoHandler.flushing = true;
381 logger.debug("Queued {}, queue size {}", dataInfo, queue.size());
382 }
383 }
384 else
385 {
386 logger.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
387 }
388 }
389 if (serverStream != null)
390 flush(serverStream, dataInfoHandler);
391 }
392
393 private void flush(Stream serverStream, DataInfoHandler dataInfoHandler)
394 {
395 logger.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
396 serverStream.data(dataInfoHandler.dataInfo, getTimeout(), TimeUnit.MILLISECONDS, dataInfoHandler);
397 }
398
399 private class DataInfoHandler implements Handler<Void>
400 {
401 private final DataInfo dataInfo;
402 private boolean flushing;
403
404 private DataInfoHandler(DataInfo dataInfo)
405 {
406 this.dataInfo = dataInfo;
407 }
408
409 @Override
410 public void completed(Void context)
411 {
412 Stream serverStream;
413 DataInfoHandler dataInfoHandler;
414 synchronized (queue)
415 {
416 serverStream = StreamHandler.this.serverStream;
417 assert serverStream != null;
418 dataInfoHandler = queue.poll();
419 assert dataInfoHandler == this;
420 dataInfoHandler = queue.peek();
421 if (dataInfoHandler != null)
422 {
423 assert !dataInfoHandler.flushing;
424 dataInfoHandler.flushing = true;
425 logger.debug("Completed {}, queue size {}", dataInfo, queue.size());
426 }
427 else
428 {
429 logger.debug("Completed {}, queue empty", dataInfo);
430 }
431 }
432 if (dataInfoHandler != null)
433 flush(serverStream, dataInfoHandler);
434 }
435
436 @Override
437 public void failed(Void context, Throwable x)
438 {
439 logger.debug(x);
440 rst(clientStream);
441 }
442 }
443 }
444
445 private class ProxySessionFrameListener extends SessionFrameListener.Adapter implements StreamFrameListener
446 {
447 @Override
448 public StreamFrameListener onSyn(Stream serverStream, SynInfo serverSynInfo)
449 {
450 logger.debug("S -> P pushed {} on {}", serverSynInfo, serverStream);
451
452 Headers headers = new Headers(serverSynInfo.getHeaders(), false);
453
454 addResponseProxyHeaders(serverStream, headers);
455 customizeResponseHeaders(serverStream, headers);
456 Stream clientStream = (Stream)serverStream.getAssociatedStream().getAttribute(CLIENT_STREAM_ATTRIBUTE);
457 convert(serverStream.getSession().getVersion(), clientStream.getSession().getVersion(), headers);
458
459 StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
460 serverStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
461 clientStream.syn(new SynInfo(headers, serverSynInfo.isClose()), getTimeout(), TimeUnit.MILLISECONDS, handler);
462
463 return this;
464 }
465
466 @Override
467 public void onRst(Session serverSession, RstInfo serverRstInfo)
468 {
469 Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
470 if (serverStream != null)
471 {
472 Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
473 if (clientStream != null)
474 {
475 Session clientSession = clientStream.getSession();
476 RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
477 clientSession.rst(clientRstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
478 }
479 }
480 }
481
482 @Override
483 public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo)
484 {
485 serverSessions.values().remove(serverSession);
486 }
487
488 @Override
489 public void onReply(Stream stream, ReplyInfo replyInfo)
490 {
491
492 }
493
494 @Override
495 public void onHeaders(Stream stream, HeadersInfo headersInfo)
496 {
497 throw new UnsupportedOperationException();
498 }
499
500 @Override
501 public void onData(Stream serverStream, final DataInfo serverDataInfo)
502 {
503 logger.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
504
505 ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
506 {
507 @Override
508 public void consume(int delta)
509 {
510 super.consume(delta);
511 serverDataInfo.consume(delta);
512 }
513 };
514
515 StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
516 handler.data(clientDataInfo);
517 }
518 }
519 }