View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.spdy.server.proxy;
21  
22  import java.net.InetSocketAddress;
23  import java.util.LinkedList;
24  import java.util.Queue;
25  import java.util.concurrent.ConcurrentHashMap;
26  import java.util.concurrent.ConcurrentMap;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
30  import org.eclipse.jetty.spdy.api.DataInfo;
31  import org.eclipse.jetty.spdy.api.GoAwayInfo;
32  import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
33  import org.eclipse.jetty.spdy.api.HeadersInfo;
34  import org.eclipse.jetty.spdy.api.Info;
35  import org.eclipse.jetty.spdy.api.PushInfo;
36  import org.eclipse.jetty.spdy.api.ReplyInfo;
37  import org.eclipse.jetty.spdy.api.RstInfo;
38  import org.eclipse.jetty.spdy.api.SPDY;
39  import org.eclipse.jetty.spdy.api.Session;
40  import org.eclipse.jetty.spdy.api.SessionFrameListener;
41  import org.eclipse.jetty.spdy.api.Stream;
42  import org.eclipse.jetty.spdy.api.StreamFrameListener;
43  import org.eclipse.jetty.spdy.api.StreamStatus;
44  import org.eclipse.jetty.spdy.api.SynInfo;
45  import org.eclipse.jetty.spdy.client.SPDYClient;
46  import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
47  import org.eclipse.jetty.util.Callback;
48  import org.eclipse.jetty.util.Fields;
49  import org.eclipse.jetty.util.Promise;
50  import org.eclipse.jetty.util.log.Log;
51  import org.eclipse.jetty.util.log.Logger;
52  
53  /**
54   * <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by clients into
55   * SPDY events for the servers.</p>
56   */
57  public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
58  {
59      private static final Logger LOG = Log.getLogger(SPDYProxyEngine.class);
60  
61      private static final String STREAM_PROMISE_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamPromise";
62      private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.clientStream";
63  
64      private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
65      private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
66      private final SPDYClient.Factory factory;
67      private volatile long connectTimeout = 15000;
68      private volatile long timeout = 60000;
69  
70      public SPDYProxyEngine(SPDYClient.Factory factory)
71      {
72          this.factory = factory;
73      }
74  
75      public long getConnectTimeout()
76      {
77          return connectTimeout;
78      }
79  
80      public void setConnectTimeout(long connectTimeout)
81      {
82          this.connectTimeout = connectTimeout;
83      }
84  
85      public long getTimeout()
86      {
87          return timeout;
88      }
89  
90      public void setTimeout(long timeout)
91      {
92          this.timeout = timeout;
93      }
94  
95      public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
96      {
97          Fields headers = new Fields(clientSynInfo.getHeaders(), false);
98  
99          short serverVersion = getVersion(proxyServerInfo.getProtocol());
100         InetSocketAddress address = proxyServerInfo.getAddress();
101         Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
102         if (serverSession == null)
103         {
104             rst(clientStream);
105             return null;
106         }
107 
108         final Session clientSession = clientStream.getSession();
109 
110         addRequestProxyHeaders(clientStream, headers);
111         customizeRequestHeaders(clientStream, headers);
112         convert(clientSession.getVersion(), serverVersion, headers);
113 
114         SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
115         StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
116         StreamPromise promise = new StreamPromise(clientStream, serverSynInfo);
117         clientStream.setAttribute(STREAM_PROMISE_ATTRIBUTE, promise);
118         serverSession.syn(serverSynInfo, listener, promise);
119         return this;
120     }
121 
122     private static short getVersion(String protocol)
123     {
124         switch (protocol)
125         {
126             case "spdy/2":
127                 return SPDY.V2;
128             case "spdy/3":
129                 return SPDY.V3;
130             default:
131                 throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
132         }
133     }
134 
135     @Override
136     public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
137     {
138         throw new IllegalStateException("We shouldn't receive pushes from clients");
139     }
140 
141     @Override
142     public void onReply(Stream stream, ReplyInfo replyInfo)
143     {
144         throw new IllegalStateException("Servers do not receive replies");
145     }
146 
147     @Override
148     public void onHeaders(Stream stream, HeadersInfo headersInfo)
149     {
150         // TODO
151         throw new UnsupportedOperationException("Not Yet Implemented");
152     }
153 
154     @Override
155     public void onData(Stream clientStream, final DataInfo clientDataInfo)
156     {
157         if (LOG.isDebugEnabled())
158             LOG.debug("C -> P {} on {}", clientDataInfo, clientStream);
159 
160         ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
161         {
162             @Override
163             public void consume(int delta)
164             {
165                 super.consume(delta);
166                 clientDataInfo.consume(delta);
167             }
168         };
169 
170         StreamPromise streamPromise = (StreamPromise)clientStream.getAttribute(STREAM_PROMISE_ATTRIBUTE);
171         streamPromise.data(serverDataInfo);
172     }
173 
174     @Override
175     public void onFailure(Stream stream, Throwable x)
176     {
177         LOG.debug(x);
178     }
179 
180     private Session produceSession(String host, short version, InetSocketAddress address)
181     {
182         try
183         {
184             Session session = serverSessions.get(host);
185             if (session == null)
186             {
187                 SPDYClient client = factory.newSPDYClient(version);
188                 session = client.connect(address, sessionListener);
189                 if (LOG.isDebugEnabled())
190                     LOG.debug("Proxy session connected to {}", address);
191                 Session existing = serverSessions.putIfAbsent(host, session);
192                 if (existing != null)
193                 {
194                     session.goAway(new GoAwayInfo(), Callback.Adapter.INSTANCE);
195                     session = existing;
196                 }
197             }
198             return session;
199         }
200         catch (Exception x)
201         {
202             LOG.debug(x);
203             return null;
204         }
205     }
206 
207     private void convert(short fromVersion, short toVersion, Fields headers)
208     {
209         if (fromVersion != toVersion)
210         {
211             for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
212             {
213                 Fields.Field header = headers.remove(httpHeader.name(fromVersion));
214                 if (header != null)
215                 {
216                     String toName = httpHeader.name(toVersion);
217                     for (String value : header.getValues())
218                         headers.add(toName, value);
219                 }
220             }
221         }
222     }
223 
224     private void rst(Stream stream)
225     {
226         RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
227         stream.getSession().rst(rstInfo, Callback.Adapter.INSTANCE);
228     }
229 
230     private class ProxyPushStreamFrameListener implements StreamFrameListener
231     {
232         private PushStreamPromise pushStreamPromise;
233 
234         private ProxyPushStreamFrameListener(PushStreamPromise pushStreamPromise)
235         {
236             this.pushStreamPromise = pushStreamPromise;
237         }
238 
239         @Override
240         public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
241         {
242             if (LOG.isDebugEnabled())
243                 LOG.debug("S -> P pushed {} on {}. Opening new PushStream P -> C now.", pushInfo, stream);
244             PushStreamPromise newPushStreamPromise = new PushStreamPromise(stream, pushInfo);
245             this.pushStreamPromise.push(newPushStreamPromise);
246             return new ProxyPushStreamFrameListener(newPushStreamPromise);
247         }
248 
249         @Override
250         public void onReply(Stream stream, ReplyInfo replyInfo)
251         {
252             // Push streams never send a reply
253             throw new UnsupportedOperationException();
254         }
255 
256         @Override
257         public void onHeaders(Stream stream, HeadersInfo headersInfo)
258         {
259             throw new UnsupportedOperationException();
260         }
261 
262         @Override
263         public void onData(Stream serverStream, final DataInfo serverDataInfo)
264         {
265             if (LOG.isDebugEnabled())
266                 LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
267 
268             ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
269             {
270                 @Override
271                 public void consume(int delta)
272                 {
273                     super.consume(delta);
274                     serverDataInfo.consume(delta);
275                 }
276             };
277 
278             pushStreamPromise.data(clientDataInfo);
279         }
280 
281         @Override
282         public void onFailure(Stream stream, Throwable x)
283         {
284             LOG.debug(x);
285         }
286     }
287 
288     private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
289     {
290         private final Stream receiverStream;
291 
292         public ProxyStreamFrameListener(Stream receiverStream)
293         {
294             this.receiverStream = receiverStream;
295         }
296 
297         @Override
298         public StreamFrameListener onPush(Stream senderStream, PushInfo pushInfo)
299         {
300             if (LOG.isDebugEnabled())
301                 LOG.debug("S -> P {} on {}");
302             PushInfo newPushInfo = convertPushInfo(pushInfo, senderStream, receiverStream);
303             PushStreamPromise pushStreamPromise = new PushStreamPromise(senderStream, newPushInfo);
304             receiverStream.push(newPushInfo, pushStreamPromise);
305             return new ProxyPushStreamFrameListener(pushStreamPromise);
306         }
307 
308         @Override
309         public void onReply(final Stream stream, ReplyInfo replyInfo)
310         {
311             if (LOG.isDebugEnabled())
312                 LOG.debug("S -> P {} on {}", replyInfo, stream);
313             final ReplyInfo clientReplyInfo = new ReplyInfo(convertHeaders(stream, receiverStream, replyInfo.getHeaders()),
314                     replyInfo.isClose());
315             reply(stream, clientReplyInfo);
316         }
317 
318         private void reply(final Stream stream, final ReplyInfo clientReplyInfo)
319         {
320             receiverStream.reply(clientReplyInfo, new Callback()
321             {
322                 @Override
323                 public void succeeded()
324                 {
325                     if (LOG.isDebugEnabled())
326                         LOG.debug("P -> C {} from {} to {}", clientReplyInfo, stream, receiverStream);
327                 }
328 
329                 @Override
330                 public void failed(Throwable x)
331                 {
332                     LOG.debug(x);
333                     rst(receiverStream);
334                 }
335             });
336         }
337 
338         @Override
339         public void onHeaders(Stream stream, HeadersInfo headersInfo)
340         {
341             // TODO
342             throw new UnsupportedOperationException("Not Yet Implemented");
343         }
344 
345         @Override
346         public void onData(final Stream stream, final DataInfo dataInfo)
347         {
348             if (LOG.isDebugEnabled())
349                 LOG.debug("S -> P {} on {}", dataInfo, stream);
350             data(stream, dataInfo);
351         }
352 
353         private void data(final Stream stream, final DataInfo serverDataInfo)
354         {
355             final ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
356             {
357                 @Override
358                 public void consume(int delta)
359                 {
360                     super.consume(delta);
361                     serverDataInfo.consume(delta);
362                 }
363             };
364 
365             receiverStream.data(clientDataInfo, new Callback() //TODO: timeout???
366             {
367                 @Override
368                 public void succeeded()
369                 {
370                     if (LOG.isDebugEnabled())
371                         LOG.debug("P -> C {} from {} to {}", clientDataInfo, stream, receiverStream);
372                 }
373 
374                 @Override
375                 public void failed(Throwable x)
376                 {
377                     LOG.debug(x);
378                     rst(receiverStream);
379                 }
380             });
381         }
382     }
383 
384     /**
385      * <p>{@link StreamPromise} implements the forwarding of DATA frames from the client to the server and vice
386      * versa.</p> <p>Instances of this class buffer DATA frames sent by clients and send them to the server. The
387      * buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive from the client
388      * before the SYN_STREAM has been fully sent), and between DATA frames, if the client is a fast producer and the
389      * server a slow consumer, or if the client is a SPDY v2 client (and hence without flow control) while the server is
390      * a SPDY v3 server (and hence with flow control).</p>
391      */
392     private class StreamPromise implements Promise<Stream>
393     {
394         private final Queue<DataInfoCallback> queue = new LinkedList<>();
395         private final Stream senderStream;
396         private final Info info;
397         private Stream receiverStream;
398 
399         private StreamPromise(Stream senderStream, Info info)
400         {
401             this.senderStream = senderStream;
402             this.info = info;
403         }
404 
405         @Override
406         public void succeeded(Stream stream)
407         {
408             if (LOG.isDebugEnabled())
409                 LOG.debug("P -> S {} from {} to {}", info, senderStream, stream);
410 
411             stream.setAttribute(CLIENT_STREAM_ATTRIBUTE, senderStream);
412 
413             DataInfoCallback dataInfoCallback;
414             synchronized (queue)
415             {
416                 this.receiverStream = stream;
417                 dataInfoCallback = queue.peek();
418                 if (dataInfoCallback != null)
419                 {
420                     if (dataInfoCallback.flushing)
421                     {
422                         if (LOG.isDebugEnabled())
423                             LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoCallback.dataInfo, queue.size());
424                         dataInfoCallback = null;
425                     }
426                     else
427                     {
428                         dataInfoCallback.flushing = true;
429                         if (LOG.isDebugEnabled())
430                             LOG.debug("SYN completed, queue size {}", queue.size());
431                     }
432                 }
433                 else
434                 {
435                     if (LOG.isDebugEnabled())
436                         LOG.debug("SYN completed, queue empty");
437                 }
438             }
439             if (dataInfoCallback != null)
440                 flush(stream, dataInfoCallback);
441         }
442 
443         @Override
444         public void failed(Throwable x)
445         {
446             LOG.debug(x);
447             rst(senderStream);
448         }
449 
450         public void data(DataInfo dataInfo)
451         {
452             Stream receiverStream;
453             DataInfoCallback dataInfoCallbackToFlush = null;
454             DataInfoCallback dataInfoCallBackToQueue = new DataInfoCallback(dataInfo);
455             synchronized (queue)
456             {
457                 queue.offer(dataInfoCallBackToQueue);
458                 receiverStream = this.receiverStream;
459                 if (receiverStream != null)
460                 {
461                     dataInfoCallbackToFlush = queue.peek();
462                     if (dataInfoCallbackToFlush.flushing)
463                     {
464                         if (LOG.isDebugEnabled())
465                             LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoCallbackToFlush.dataInfo, queue.size());
466                         receiverStream = null;
467                     }
468                     else
469                     {
470                         dataInfoCallbackToFlush.flushing = true;
471                         if (LOG.isDebugEnabled())
472                             LOG.debug("Queued {}, queue size {}", dataInfo, queue.size());
473                     }
474                 }
475                 else
476                 {
477                     if (LOG.isDebugEnabled())
478                         LOG.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
479                 }
480             }
481             if (receiverStream != null)
482                 flush(receiverStream, dataInfoCallbackToFlush);
483         }
484 
485         private void flush(Stream receiverStream, DataInfoCallback dataInfoCallback)
486         {
487             if (LOG.isDebugEnabled())
488                 LOG.debug("P -> S {} on {}", dataInfoCallback.dataInfo, receiverStream);
489             receiverStream.data(dataInfoCallback.dataInfo, dataInfoCallback); //TODO: timeout???
490         }
491 
492         private class DataInfoCallback implements Callback
493         {
494             private final DataInfo dataInfo;
495             private boolean flushing;
496 
497             private DataInfoCallback(DataInfo dataInfo)
498             {
499                 this.dataInfo = dataInfo;
500             }
501 
502             @Override
503             public void succeeded()
504             {
505                 Stream serverStream;
506                 DataInfoCallback dataInfoCallback;
507                 synchronized (queue)
508                 {
509                     serverStream = StreamPromise.this.receiverStream;
510                     assert serverStream != null;
511                     dataInfoCallback = queue.poll();
512                     assert dataInfoCallback == this;
513                     dataInfoCallback = queue.peek();
514                     if (dataInfoCallback != null)
515                     {
516                         assert !dataInfoCallback.flushing;
517                         dataInfoCallback.flushing = true;
518                         if (LOG.isDebugEnabled())
519                             LOG.debug("Completed {}, queue size {}", dataInfo, queue.size());
520                     }
521                     else
522                     {
523                         if (LOG.isDebugEnabled())
524                             LOG.debug("Completed {}, queue empty", dataInfo);
525                     }
526                 }
527                 if (dataInfoCallback != null)
528                     flush(serverStream, dataInfoCallback);
529             }
530 
531             @Override
532             public void failed(Throwable x)
533             {
534                 LOG.debug(x);
535                 rst(senderStream);
536             }
537         }
538 
539         public Stream getSenderStream()
540         {
541             return senderStream;
542         }
543 
544         public Info getInfo()
545         {
546             return info;
547         }
548 
549         public Stream getReceiverStream()
550         {
551             synchronized (queue)
552             {
553                 return receiverStream;
554             }
555         }
556     }
557 
558     private class PushStreamPromise extends StreamPromise
559     {
560         private volatile PushStreamPromise pushStreamPromise;
561 
562         private PushStreamPromise(Stream senderStream, PushInfo pushInfo)
563         {
564             super(senderStream, pushInfo);
565         }
566 
567         @Override
568         public void succeeded(Stream receiverStream)
569         {
570             super.succeeded(receiverStream);
571 
572             if (LOG.isDebugEnabled())
573                 LOG.debug("P -> C PushStreamPromise.succeeded() called with pushStreamPromise: {}", pushStreamPromise);
574 
575             PushStreamPromise promise = pushStreamPromise;
576             if (promise != null)
577                 receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
578         }
579 
580         public void push(PushStreamPromise pushStreamPromise)
581         {
582             Stream receiverStream = getReceiverStream();
583 
584             if (receiverStream != null)
585                 receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
586             else
587                 this.pushStreamPromise = pushStreamPromise;
588         }
589     }
590 
591     private class ProxySessionFrameListener extends SessionFrameListener.Adapter
592     {
593         @Override
594         public void onRst(Session serverSession, RstInfo serverRstInfo)
595         {
596             Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
597             if (serverStream != null)
598             {
599                 Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
600                 if (clientStream != null)
601                 {
602                     Session clientSession = clientStream.getSession();
603                     RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
604                     clientSession.rst(clientRstInfo, Callback.Adapter.INSTANCE);
605                 }
606             }
607         }
608 
609         @Override
610         public void onGoAway(Session serverSession, GoAwayResultInfo goAwayResultInfo)
611         {
612             serverSessions.values().remove(serverSession);
613         }
614     }
615 
616     private PushInfo convertPushInfo(PushInfo pushInfo, Stream from, Stream to)
617     {
618         Fields headersToConvert = pushInfo.getHeaders();
619         Fields headers = convertHeaders(from, to, headersToConvert);
620         return new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, pushInfo.isClose());
621     }
622 
623     private Fields convertHeaders(Stream from, Stream to, Fields headersToConvert)
624     {
625         Fields headers = new Fields(headersToConvert, false);
626         addResponseProxyHeaders(from, headers);
627         customizeResponseHeaders(from, headers);
628         convert(from.getSession().getVersion(), to.getSession().getVersion(), headers);
629         return headers;
630     }
631 }