View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.spdy.server.proxy;
21  
22  import java.net.InetSocketAddress;
23  import java.util.LinkedList;
24  import java.util.Queue;
25  import java.util.concurrent.ConcurrentHashMap;
26  import java.util.concurrent.ConcurrentMap;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
30  import org.eclipse.jetty.spdy.api.DataInfo;
31  import org.eclipse.jetty.spdy.api.GoAwayInfo;
32  import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
33  import org.eclipse.jetty.spdy.api.HeadersInfo;
34  import org.eclipse.jetty.spdy.api.Info;
35  import org.eclipse.jetty.spdy.api.PushInfo;
36  import org.eclipse.jetty.spdy.api.ReplyInfo;
37  import org.eclipse.jetty.spdy.api.RstInfo;
38  import org.eclipse.jetty.spdy.api.SPDY;
39  import org.eclipse.jetty.spdy.api.Session;
40  import org.eclipse.jetty.spdy.api.SessionFrameListener;
41  import org.eclipse.jetty.spdy.api.Stream;
42  import org.eclipse.jetty.spdy.api.StreamFrameListener;
43  import org.eclipse.jetty.spdy.api.StreamStatus;
44  import org.eclipse.jetty.spdy.api.SynInfo;
45  import org.eclipse.jetty.spdy.client.SPDYClient;
46  import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
47  import org.eclipse.jetty.util.Callback;
48  import org.eclipse.jetty.util.Fields;
49  import org.eclipse.jetty.util.Promise;
50  import org.eclipse.jetty.util.log.Log;
51  import org.eclipse.jetty.util.log.Logger;
52  
53  /**
54   * <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by clients into
55   * SPDY events for the servers.</p>
56   */
57  public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
58  {
59      private static final Logger LOG = Log.getLogger(SPDYProxyEngine.class);
60  
61      private static final String STREAM_PROMISE_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamPromise";
62      private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.clientStream";
63  
64      private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
65      private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
66      private final SPDYClient.Factory factory;
67      private volatile long connectTimeout = 15000;
68      private volatile long timeout = 60000;
69  
70      public SPDYProxyEngine(SPDYClient.Factory factory)
71      {
72          this.factory = factory;
73      }
74  
75      public long getConnectTimeout()
76      {
77          return connectTimeout;
78      }
79  
80      public void setConnectTimeout(long connectTimeout)
81      {
82          this.connectTimeout = connectTimeout;
83      }
84  
85      public long getTimeout()
86      {
87          return timeout;
88      }
89  
90      public void setTimeout(long timeout)
91      {
92          this.timeout = timeout;
93      }
94  
95      public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
96      {
97          Fields headers = new Fields(clientSynInfo.getHeaders(), false);
98  
99          short serverVersion = getVersion(proxyServerInfo.getProtocol());
100         InetSocketAddress address = proxyServerInfo.getAddress();
101         Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
102         if (serverSession == null)
103         {
104             rst(clientStream);
105             return null;
106         }
107 
108         final Session clientSession = clientStream.getSession();
109 
110         addRequestProxyHeaders(clientStream, headers);
111         customizeRequestHeaders(clientStream, headers);
112         convert(clientSession.getVersion(), serverVersion, headers);
113 
114         SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
115         StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
116         StreamPromise promise = new StreamPromise(clientStream, serverSynInfo);
117         clientStream.setAttribute(STREAM_PROMISE_ATTRIBUTE, promise);
118         serverSession.syn(serverSynInfo, listener, promise);
119         return this;
120     }
121 
122     private static short getVersion(String protocol)
123     {
124         switch (protocol)
125         {
126             case "spdy/2":
127                 return SPDY.V2;
128             case "spdy/3":
129                 return SPDY.V3;
130             default:
131                 throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
132         }
133     }
134 
135     @Override
136     public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
137     {
138         throw new IllegalStateException("We shouldn't receive pushes from clients");
139     }
140 
141     @Override
142     public void onReply(Stream stream, ReplyInfo replyInfo)
143     {
144         throw new IllegalStateException("Servers do not receive replies");
145     }
146 
147     @Override
148     public void onHeaders(Stream stream, HeadersInfo headersInfo)
149     {
150         // TODO
151         throw new UnsupportedOperationException("Not Yet Implemented");
152     }
153 
154     @Override
155     public void onData(Stream clientStream, final DataInfo clientDataInfo)
156     {
157         LOG.debug("C -> P {} on {}", clientDataInfo, clientStream);
158 
159         ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
160         {
161             @Override
162             public void consume(int delta)
163             {
164                 super.consume(delta);
165                 clientDataInfo.consume(delta);
166             }
167         };
168 
169         StreamPromise streamPromise = (StreamPromise)clientStream.getAttribute(STREAM_PROMISE_ATTRIBUTE);
170         streamPromise.data(serverDataInfo);
171     }
172 
173     @Override
174     public void onFailure(Stream stream, Throwable x)
175     {
176         LOG.debug(x);
177     }
178 
179     private Session produceSession(String host, short version, InetSocketAddress address)
180     {
181         try
182         {
183             Session session = serverSessions.get(host);
184             if (session == null)
185             {
186                 SPDYClient client = factory.newSPDYClient(version);
187                 session = client.connect(address, sessionListener);
188                 LOG.debug("Proxy session connected to {}", address);
189                 Session existing = serverSessions.putIfAbsent(host, session);
190                 if (existing != null)
191                 {
192                     session.goAway(new GoAwayInfo(), new Callback.Adapter());
193                     session = existing;
194                 }
195             }
196             return session;
197         }
198         catch (Exception x)
199         {
200             LOG.debug(x);
201             return null;
202         }
203     }
204 
205     private void convert(short fromVersion, short toVersion, Fields headers)
206     {
207         if (fromVersion != toVersion)
208         {
209             for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
210             {
211                 Fields.Field header = headers.remove(httpHeader.name(fromVersion));
212                 if (header != null)
213                 {
214                     String toName = httpHeader.name(toVersion);
215                     for (String value : header.getValues())
216                         headers.add(toName, value);
217                 }
218             }
219         }
220     }
221 
222     private void rst(Stream stream)
223     {
224         RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
225         stream.getSession().rst(rstInfo, new Callback.Adapter());
226     }
227 
228     private class ProxyPushStreamFrameListener implements StreamFrameListener
229     {
230         private PushStreamPromise pushStreamPromise;
231 
232         private ProxyPushStreamFrameListener(PushStreamPromise pushStreamPromise)
233         {
234             this.pushStreamPromise = pushStreamPromise;
235         }
236 
237         @Override
238         public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
239         {
240             LOG.debug("S -> P pushed {} on {}. Opening new PushStream P -> C now.", pushInfo, stream);
241             PushStreamPromise newPushStreamPromise = new PushStreamPromise(stream, pushInfo);
242             this.pushStreamPromise.push(newPushStreamPromise);
243             return new ProxyPushStreamFrameListener(newPushStreamPromise);
244         }
245 
246         @Override
247         public void onReply(Stream stream, ReplyInfo replyInfo)
248         {
249             // Push streams never send a reply
250             throw new UnsupportedOperationException();
251         }
252 
253         @Override
254         public void onHeaders(Stream stream, HeadersInfo headersInfo)
255         {
256             throw new UnsupportedOperationException();
257         }
258 
259         @Override
260         public void onData(Stream serverStream, final DataInfo serverDataInfo)
261         {
262             LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
263 
264             ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
265             {
266                 @Override
267                 public void consume(int delta)
268                 {
269                     super.consume(delta);
270                     serverDataInfo.consume(delta);
271                 }
272             };
273 
274             pushStreamPromise.data(clientDataInfo);
275         }
276 
277         @Override
278         public void onFailure(Stream stream, Throwable x)
279         {
280             LOG.debug(x);
281         }
282     }
283 
284     private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
285     {
286         private final Stream receiverStream;
287 
288         public ProxyStreamFrameListener(Stream receiverStream)
289         {
290             this.receiverStream = receiverStream;
291         }
292 
293         @Override
294         public StreamFrameListener onPush(Stream senderStream, PushInfo pushInfo)
295         {
296             LOG.debug("S -> P {} on {}");
297             PushInfo newPushInfo = convertPushInfo(pushInfo, senderStream, receiverStream);
298             PushStreamPromise pushStreamPromise = new PushStreamPromise(senderStream, newPushInfo);
299             receiverStream.push(newPushInfo, pushStreamPromise);
300             return new ProxyPushStreamFrameListener(pushStreamPromise);
301         }
302 
303         @Override
304         public void onReply(final Stream stream, ReplyInfo replyInfo)
305         {
306             LOG.debug("S -> P {} on {}", replyInfo, stream);
307             final ReplyInfo clientReplyInfo = new ReplyInfo(convertHeaders(stream, receiverStream, replyInfo.getHeaders()),
308                     replyInfo.isClose());
309             reply(stream, clientReplyInfo);
310         }
311 
312         private void reply(final Stream stream, final ReplyInfo clientReplyInfo)
313         {
314             receiverStream.reply(clientReplyInfo, new Callback()
315             {
316                 @Override
317                 public void succeeded()
318                 {
319                     LOG.debug("P -> C {} from {} to {}", clientReplyInfo, stream, receiverStream);
320                 }
321 
322                 @Override
323                 public void failed(Throwable x)
324                 {
325                     LOG.debug(x);
326                     rst(receiverStream);
327                 }
328             });
329         }
330 
331         @Override
332         public void onHeaders(Stream stream, HeadersInfo headersInfo)
333         {
334             // TODO
335             throw new UnsupportedOperationException("Not Yet Implemented");
336         }
337 
338         @Override
339         public void onData(final Stream stream, final DataInfo dataInfo)
340         {
341             LOG.debug("S -> P {} on {}", dataInfo, stream);
342             data(stream, dataInfo);
343         }
344 
345         private void data(final Stream stream, final DataInfo serverDataInfo)
346         {
347             final ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
348             {
349                 @Override
350                 public void consume(int delta)
351                 {
352                     super.consume(delta);
353                     serverDataInfo.consume(delta);
354                 }
355             };
356 
357             receiverStream.data(clientDataInfo, new Callback() //TODO: timeout???
358             {
359                 @Override
360                 public void succeeded()
361                 {
362                     LOG.debug("P -> C {} from {} to {}", clientDataInfo, stream, receiverStream);
363                 }
364 
365                 @Override
366                 public void failed(Throwable x)
367                 {
368                     LOG.debug(x);
369                     rst(receiverStream);
370                 }
371             });
372         }
373     }
374 
375     /**
376      * <p>{@link StreamPromise} implements the forwarding of DATA frames from the client to the server and vice
377      * versa.</p> <p>Instances of this class buffer DATA frames sent by clients and send them to the server. The
378      * buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive from the client
379      * before the SYN_STREAM has been fully sent), and between DATA frames, if the client is a fast producer and the
380      * server a slow consumer, or if the client is a SPDY v2 client (and hence without flow control) while the server is
381      * a SPDY v3 server (and hence with flow control).</p>
382      */
383     private class StreamPromise implements Promise<Stream>
384     {
385         private final Queue<DataInfoCallback> queue = new LinkedList<>();
386         private final Stream senderStream;
387         private final Info info;
388         private Stream receiverStream;
389 
390         private StreamPromise(Stream senderStream, Info info)
391         {
392             this.senderStream = senderStream;
393             this.info = info;
394         }
395 
396         @Override
397         public void succeeded(Stream stream)
398         {
399             LOG.debug("P -> S {} from {} to {}", info, senderStream, stream);
400 
401             stream.setAttribute(CLIENT_STREAM_ATTRIBUTE, senderStream);
402 
403             DataInfoCallback dataInfoCallback;
404             synchronized (queue)
405             {
406                 this.receiverStream = stream;
407                 dataInfoCallback = queue.peek();
408                 if (dataInfoCallback != null)
409                 {
410                     if (dataInfoCallback.flushing)
411                     {
412                         LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoCallback.dataInfo, queue.size());
413                         dataInfoCallback = null;
414                     }
415                     else
416                     {
417                         dataInfoCallback.flushing = true;
418                         LOG.debug("SYN completed, queue size {}", queue.size());
419                     }
420                 }
421                 else
422                 {
423                     LOG.debug("SYN completed, queue empty");
424                 }
425             }
426             if (dataInfoCallback != null)
427                 flush(stream, dataInfoCallback);
428         }
429 
430         @Override
431         public void failed(Throwable x)
432         {
433             LOG.debug(x);
434             rst(senderStream);
435         }
436 
437         public void data(DataInfo dataInfo)
438         {
439             Stream receiverStream;
440             DataInfoCallback dataInfoCallbackToFlush = null;
441             DataInfoCallback dataInfoCallBackToQueue = new DataInfoCallback(dataInfo);
442             synchronized (queue)
443             {
444                 queue.offer(dataInfoCallBackToQueue);
445                 receiverStream = this.receiverStream;
446                 if (receiverStream != null)
447                 {
448                     dataInfoCallbackToFlush = queue.peek();
449                     if (dataInfoCallbackToFlush.flushing)
450                     {
451                         LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoCallbackToFlush.dataInfo, queue.size());
452                         receiverStream = null;
453                     }
454                     else
455                     {
456                         dataInfoCallbackToFlush.flushing = true;
457                         LOG.debug("Queued {}, queue size {}", dataInfo, queue.size());
458                     }
459                 }
460                 else
461                 {
462                     LOG.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
463                 }
464             }
465             if (receiverStream != null)
466                 flush(receiverStream, dataInfoCallbackToFlush);
467         }
468 
469         private void flush(Stream receiverStream, DataInfoCallback dataInfoCallback)
470         {
471             LOG.debug("P -> S {} on {}", dataInfoCallback.dataInfo, receiverStream);
472             receiverStream.data(dataInfoCallback.dataInfo, dataInfoCallback); //TODO: timeout???
473         }
474 
475         private class DataInfoCallback implements Callback
476         {
477             private final DataInfo dataInfo;
478             private boolean flushing;
479 
480             private DataInfoCallback(DataInfo dataInfo)
481             {
482                 this.dataInfo = dataInfo;
483             }
484 
485             @Override
486             public void succeeded()
487             {
488                 Stream serverStream;
489                 DataInfoCallback dataInfoCallback;
490                 synchronized (queue)
491                 {
492                     serverStream = StreamPromise.this.receiverStream;
493                     assert serverStream != null;
494                     dataInfoCallback = queue.poll();
495                     assert dataInfoCallback == this;
496                     dataInfoCallback = queue.peek();
497                     if (dataInfoCallback != null)
498                     {
499                         assert !dataInfoCallback.flushing;
500                         dataInfoCallback.flushing = true;
501                         LOG.debug("Completed {}, queue size {}", dataInfo, queue.size());
502                     }
503                     else
504                     {
505                         LOG.debug("Completed {}, queue empty", dataInfo);
506                     }
507                 }
508                 if (dataInfoCallback != null)
509                     flush(serverStream, dataInfoCallback);
510             }
511 
512             @Override
513             public void failed(Throwable x)
514             {
515                 LOG.debug(x);
516                 rst(senderStream);
517             }
518         }
519 
520         public Stream getSenderStream()
521         {
522             return senderStream;
523         }
524 
525         public Info getInfo()
526         {
527             return info;
528         }
529 
530         public Stream getReceiverStream()
531         {
532             synchronized (queue)
533             {
534                 return receiverStream;
535             }
536         }
537     }
538 
539     private class PushStreamPromise extends StreamPromise
540     {
541         private volatile PushStreamPromise pushStreamPromise;
542 
543         private PushStreamPromise(Stream senderStream, PushInfo pushInfo)
544         {
545             super(senderStream, pushInfo);
546         }
547 
548         @Override
549         public void succeeded(Stream receiverStream)
550         {
551             super.succeeded(receiverStream);
552 
553             LOG.debug("P -> C PushStreamPromise.succeeded() called with pushStreamPromise: {}", pushStreamPromise);
554 
555             PushStreamPromise promise = pushStreamPromise;
556             if (promise != null)
557                 receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
558         }
559 
560         public void push(PushStreamPromise pushStreamPromise)
561         {
562             Stream receiverStream = getReceiverStream();
563 
564             if (receiverStream != null)
565                 receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
566             else
567                 this.pushStreamPromise = pushStreamPromise;
568         }
569     }
570 
571     private class ProxySessionFrameListener extends SessionFrameListener.Adapter
572     {
573         @Override
574         public void onRst(Session serverSession, RstInfo serverRstInfo)
575         {
576             Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
577             if (serverStream != null)
578             {
579                 Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
580                 if (clientStream != null)
581                 {
582                     Session clientSession = clientStream.getSession();
583                     RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
584                     clientSession.rst(clientRstInfo, new Callback.Adapter());
585                 }
586             }
587         }
588 
589         @Override
590         public void onGoAway(Session serverSession, GoAwayResultInfo goAwayResultInfo)
591         {
592             serverSessions.values().remove(serverSession);
593         }
594     }
595 
596     private PushInfo convertPushInfo(PushInfo pushInfo, Stream from, Stream to)
597     {
598         Fields headersToConvert = pushInfo.getHeaders();
599         Fields headers = convertHeaders(from, to, headersToConvert);
600         return new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, pushInfo.isClose());
601     }
602 
603     private Fields convertHeaders(Stream from, Stream to, Fields headersToConvert)
604     {
605         Fields headers = new Fields(headersToConvert, false);
606         addResponseProxyHeaders(from, headers);
607         customizeResponseHeaders(from, headers);
608         convert(from.getSession().getVersion(), to.getSession().getVersion(), headers);
609         return headers;
610     }
611 }