View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.spdy.server.proxy;
21  
22  import java.net.InetSocketAddress;
23  import java.util.LinkedList;
24  import java.util.Queue;
25  import java.util.concurrent.ConcurrentHashMap;
26  import java.util.concurrent.ConcurrentMap;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
30  import org.eclipse.jetty.spdy.api.DataInfo;
31  import org.eclipse.jetty.spdy.api.GoAwayInfo;
32  import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
33  import org.eclipse.jetty.spdy.api.HeadersInfo;
34  import org.eclipse.jetty.spdy.api.Info;
35  import org.eclipse.jetty.spdy.api.PushInfo;
36  import org.eclipse.jetty.spdy.api.ReplyInfo;
37  import org.eclipse.jetty.spdy.api.RstInfo;
38  import org.eclipse.jetty.spdy.api.SPDY;
39  import org.eclipse.jetty.spdy.api.Session;
40  import org.eclipse.jetty.spdy.api.SessionFrameListener;
41  import org.eclipse.jetty.spdy.api.Stream;
42  import org.eclipse.jetty.spdy.api.StreamFrameListener;
43  import org.eclipse.jetty.spdy.api.StreamStatus;
44  import org.eclipse.jetty.spdy.api.SynInfo;
45  import org.eclipse.jetty.spdy.client.SPDYClient;
46  import org.eclipse.jetty.spdy.server.http.HTTPSPDYHeader;
47  import org.eclipse.jetty.util.Callback;
48  import org.eclipse.jetty.util.Fields;
49  import org.eclipse.jetty.util.Promise;
50  import org.eclipse.jetty.util.log.Log;
51  import org.eclipse.jetty.util.log.Logger;
52  
53  /**
54   * <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by clients into
55   * SPDY events for the servers.</p>
56   */
57  public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
58  {
59      private static final Logger LOG = Log.getLogger(SPDYProxyEngine.class);
60  
61      private static final String STREAM_PROMISE_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamPromise";
62      private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.clientStream";
63  
64      private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
65      private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
66      private final SPDYClient.Factory factory;
67      private volatile long connectTimeout = 15000;
68      private volatile long timeout = 60000;
69  
70      public SPDYProxyEngine(SPDYClient.Factory factory)
71      {
72          this.factory = factory;
73      }
74  
75      public long getConnectTimeout()
76      {
77          return connectTimeout;
78      }
79  
80      public void setConnectTimeout(long connectTimeout)
81      {
82          this.connectTimeout = connectTimeout;
83      }
84  
85      public long getTimeout()
86      {
87          return timeout;
88      }
89  
90      public void setTimeout(long timeout)
91      {
92          this.timeout = timeout;
93      }
94  
95      public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
96      {
97          Fields headers = new Fields(clientSynInfo.getHeaders(), false);
98  
99          short serverVersion = getVersion(proxyServerInfo.getProtocol());
100         InetSocketAddress address = proxyServerInfo.getAddress();
101         Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
102         if (serverSession == null)
103         {
104             rst(clientStream);
105             return null;
106         }
107 
108         final Session clientSession = clientStream.getSession();
109 
110         addRequestProxyHeaders(clientStream, headers);
111         customizeRequestHeaders(clientStream, headers);
112         convert(clientSession.getVersion(), serverVersion, headers);
113 
114         SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
115         StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
116         StreamPromise promise = new StreamPromise(clientStream, serverSynInfo);
117         clientStream.setAttribute(STREAM_PROMISE_ATTRIBUTE, promise);
118         serverSession.syn(serverSynInfo, listener, promise);
119         return this;
120     }
121 
122     private static short getVersion(String protocol)
123     {
124         switch (protocol)
125         {
126             case "spdy/2":
127                 return SPDY.V2;
128             case "spdy/3":
129                 return SPDY.V3;
130             default:
131                 throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
132         }
133     }
134 
135     @Override
136     public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
137     {
138         throw new IllegalStateException("We shouldn't receive pushes from clients");
139     }
140 
141     @Override
142     public void onReply(Stream stream, ReplyInfo replyInfo)
143     {
144         throw new IllegalStateException("Servers do not receive replies");
145     }
146 
147     @Override
148     public void onHeaders(Stream stream, HeadersInfo headersInfo)
149     {
150         // TODO
151         throw new UnsupportedOperationException("Not Yet Implemented");
152     }
153 
154     @Override
155     public void onData(Stream clientStream, final DataInfo clientDataInfo)
156     {
157         LOG.debug("C -> P {} on {}", clientDataInfo, clientStream);
158 
159         ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
160         {
161             @Override
162             public void consume(int delta)
163             {
164                 super.consume(delta);
165                 clientDataInfo.consume(delta);
166             }
167         };
168 
169         StreamPromise streamPromise = (StreamPromise)clientStream.getAttribute(STREAM_PROMISE_ATTRIBUTE);
170         streamPromise.data(serverDataInfo);
171     }
172 
173     private Session produceSession(String host, short version, InetSocketAddress address)
174     {
175         try
176         {
177             Session session = serverSessions.get(host);
178             if (session == null)
179             {
180                 SPDYClient client = factory.newSPDYClient(version);
181                 session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
182                 LOG.debug("Proxy session connected to {}", address);
183                 Session existing = serverSessions.putIfAbsent(host, session);
184                 if (existing != null)
185                 {
186                     session.goAway(new GoAwayInfo(), new Callback.Adapter());
187                     session = existing;
188                 }
189             }
190             return session;
191         }
192         catch (Exception x)
193         {
194             LOG.debug(x);
195             return null;
196         }
197     }
198 
199     private void convert(short fromVersion, short toVersion, Fields headers)
200     {
201         if (fromVersion != toVersion)
202         {
203             for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
204             {
205                 Fields.Field header = headers.remove(httpHeader.name(fromVersion));
206                 if (header != null)
207                 {
208                     String toName = httpHeader.name(toVersion);
209                     for (String value : header.values())
210                         headers.add(toName, value);
211                 }
212             }
213         }
214     }
215 
216     private void rst(Stream stream)
217     {
218         RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
219         stream.getSession().rst(rstInfo, new Callback.Adapter());
220     }
221 
222     private class ProxyPushStreamFrameListener implements StreamFrameListener
223     {
224         private PushStreamPromise pushStreamPromise;
225 
226         private ProxyPushStreamFrameListener(PushStreamPromise pushStreamPromise)
227         {
228             this.pushStreamPromise = pushStreamPromise;
229         }
230 
231         @Override
232         public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
233         {
234             LOG.debug("S -> P pushed {} on {}. Opening new PushStream P -> C now.", pushInfo, stream);
235             PushStreamPromise newPushStreamPromise = new PushStreamPromise(stream, pushInfo);
236             this.pushStreamPromise.push(newPushStreamPromise);
237             return new ProxyPushStreamFrameListener(newPushStreamPromise);
238         }
239 
240         @Override
241         public void onReply(Stream stream, ReplyInfo replyInfo)
242         {
243             // Push streams never send a reply
244             throw new UnsupportedOperationException();
245         }
246 
247         @Override
248         public void onHeaders(Stream stream, HeadersInfo headersInfo)
249         {
250             throw new UnsupportedOperationException();
251         }
252 
253         @Override
254         public void onData(Stream serverStream, final DataInfo serverDataInfo)
255         {
256             LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
257 
258             ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
259             {
260                 @Override
261                 public void consume(int delta)
262                 {
263                     super.consume(delta);
264                     serverDataInfo.consume(delta);
265                 }
266             };
267 
268             pushStreamPromise.data(clientDataInfo);
269         }
270     }
271 
272     private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
273     {
274         private final Stream receiverStream;
275 
276         public ProxyStreamFrameListener(Stream receiverStream)
277         {
278             this.receiverStream = receiverStream;
279         }
280 
281         @Override
282         public StreamFrameListener onPush(Stream senderStream, PushInfo pushInfo)
283         {
284             LOG.debug("S -> P {} on {}");
285             PushInfo newPushInfo = convertPushInfo(pushInfo, senderStream, receiverStream);
286             PushStreamPromise pushStreamPromise = new PushStreamPromise(senderStream, newPushInfo);
287             receiverStream.push(newPushInfo, pushStreamPromise);
288             return new ProxyPushStreamFrameListener(pushStreamPromise);
289         }
290 
291         @Override
292         public void onReply(final Stream stream, ReplyInfo replyInfo)
293         {
294             LOG.debug("S -> P {} on {}", replyInfo, stream);
295             final ReplyInfo clientReplyInfo = new ReplyInfo(convertHeaders(stream, receiverStream, replyInfo.getHeaders()),
296                     replyInfo.isClose());
297             reply(stream, clientReplyInfo);
298         }
299 
300         private void reply(final Stream stream, final ReplyInfo clientReplyInfo)
301         {
302             receiverStream.reply(clientReplyInfo, new Callback()
303             {
304                 @Override
305                 public void succeeded()
306                 {
307                     LOG.debug("P -> C {} from {} to {}", clientReplyInfo, stream, receiverStream);
308                 }
309 
310                 @Override
311                 public void failed(Throwable x)
312                 {
313                     LOG.debug(x);
314                     rst(receiverStream);
315                 }
316             });
317         }
318 
319         @Override
320         public void onHeaders(Stream stream, HeadersInfo headersInfo)
321         {
322             // TODO
323             throw new UnsupportedOperationException("Not Yet Implemented");
324         }
325 
326         @Override
327         public void onData(final Stream stream, final DataInfo dataInfo)
328         {
329             LOG.debug("S -> P {} on {}", dataInfo, stream);
330             data(stream, dataInfo);
331         }
332 
333         private void data(final Stream stream, final DataInfo serverDataInfo)
334         {
335             final ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
336             {
337                 @Override
338                 public void consume(int delta)
339                 {
340                     super.consume(delta);
341                     serverDataInfo.consume(delta);
342                 }
343             };
344 
345             receiverStream.data(clientDataInfo, new Callback() //TODO: timeout???
346             {
347                 @Override
348                 public void succeeded()
349                 {
350                     LOG.debug("P -> C {} from {} to {}", clientDataInfo, stream, receiverStream);
351                 }
352 
353                 @Override
354                 public void failed(Throwable x)
355                 {
356                     LOG.debug(x);
357                     rst(receiverStream);
358                 }
359             });
360         }
361     }
362 
363     /**
364      * <p>{@link StreamPromise} implements the forwarding of DATA frames from the client to the server and vice
365      * versa.</p> <p>Instances of this class buffer DATA frames sent by clients and send them to the server. The
366      * buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive from the client
367      * before the SYN_STREAM has been fully sent), and between DATA frames, if the client is a fast producer and the
368      * server a slow consumer, or if the client is a SPDY v2 client (and hence without flow control) while the server is
369      * a SPDY v3 server (and hence with flow control).</p>
370      */
371     private class StreamPromise implements Promise<Stream>
372     {
373         private final Queue<DataInfoCallback> queue = new LinkedList<>();
374         private final Stream senderStream;
375         private final Info info;
376         private Stream receiverStream;
377 
378         private StreamPromise(Stream senderStream, Info info)
379         {
380             this.senderStream = senderStream;
381             this.info = info;
382         }
383 
384         @Override
385         public void succeeded(Stream stream)
386         {
387             LOG.debug("P -> S {} from {} to {}", info, senderStream, stream);
388 
389             stream.setAttribute(CLIENT_STREAM_ATTRIBUTE, senderStream);
390 
391             DataInfoCallback dataInfoCallback;
392             synchronized (queue)
393             {
394                 this.receiverStream = stream;
395                 dataInfoCallback = queue.peek();
396                 if (dataInfoCallback != null)
397                 {
398                     if (dataInfoCallback.flushing)
399                     {
400                         LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoCallback.dataInfo, queue.size());
401                         dataInfoCallback = null;
402                     }
403                     else
404                     {
405                         dataInfoCallback.flushing = true;
406                         LOG.debug("SYN completed, queue size {}", queue.size());
407                     }
408                 }
409                 else
410                 {
411                     LOG.debug("SYN completed, queue empty");
412                 }
413             }
414             if (dataInfoCallback != null)
415                 flush(stream, dataInfoCallback);
416         }
417 
418         @Override
419         public void failed(Throwable x)
420         {
421             LOG.debug(x);
422             rst(senderStream);
423         }
424 
425         public void data(DataInfo dataInfo)
426         {
427             Stream receiverStream;
428             DataInfoCallback dataInfoCallbackToFlush = null;
429             DataInfoCallback dataInfoCallBackToQueue = new DataInfoCallback(dataInfo);
430             synchronized (queue)
431             {
432                 queue.offer(dataInfoCallBackToQueue);
433                 receiverStream = this.receiverStream;
434                 if (receiverStream != null)
435                 {
436                     dataInfoCallbackToFlush = queue.peek();
437                     if (dataInfoCallbackToFlush.flushing)
438                     {
439                         LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoCallbackToFlush.dataInfo, queue.size());
440                         receiverStream = null;
441                     }
442                     else
443                     {
444                         dataInfoCallbackToFlush.flushing = true;
445                         LOG.debug("Queued {}, queue size {}", dataInfo, queue.size());
446                     }
447                 }
448                 else
449                 {
450                     LOG.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
451                 }
452             }
453             if (receiverStream != null)
454                 flush(receiverStream, dataInfoCallbackToFlush);
455         }
456 
457         private void flush(Stream receiverStream, DataInfoCallback dataInfoCallback)
458         {
459             LOG.debug("P -> S {} on {}", dataInfoCallback.dataInfo, receiverStream);
460             receiverStream.data(dataInfoCallback.dataInfo, dataInfoCallback); //TODO: timeout???
461         }
462 
463         private class DataInfoCallback implements Callback
464         {
465             private final DataInfo dataInfo;
466             private boolean flushing;
467 
468             private DataInfoCallback(DataInfo dataInfo)
469             {
470                 this.dataInfo = dataInfo;
471             }
472 
473             @Override
474             public void succeeded()
475             {
476                 Stream serverStream;
477                 DataInfoCallback dataInfoCallback;
478                 synchronized (queue)
479                 {
480                     serverStream = StreamPromise.this.receiverStream;
481                     assert serverStream != null;
482                     dataInfoCallback = queue.poll();
483                     assert dataInfoCallback == this;
484                     dataInfoCallback = queue.peek();
485                     if (dataInfoCallback != null)
486                     {
487                         assert !dataInfoCallback.flushing;
488                         dataInfoCallback.flushing = true;
489                         LOG.debug("Completed {}, queue size {}", dataInfo, queue.size());
490                     }
491                     else
492                     {
493                         LOG.debug("Completed {}, queue empty", dataInfo);
494                     }
495                 }
496                 if (dataInfoCallback != null)
497                     flush(serverStream, dataInfoCallback);
498             }
499 
500             @Override
501             public void failed(Throwable x)
502             {
503                 LOG.debug(x);
504                 rst(senderStream);
505             }
506         }
507 
508         public Stream getSenderStream()
509         {
510             return senderStream;
511         }
512 
513         public Info getInfo()
514         {
515             return info;
516         }
517 
518         public Stream getReceiverStream()
519         {
520             synchronized (queue)
521             {
522                 return receiverStream;
523             }
524         }
525     }
526 
527     private class PushStreamPromise extends StreamPromise
528     {
529         private volatile PushStreamPromise pushStreamPromise;
530 
531         private PushStreamPromise(Stream senderStream, PushInfo pushInfo)
532         {
533             super(senderStream, pushInfo);
534         }
535 
536         @Override
537         public void succeeded(Stream receiverStream)
538         {
539             super.succeeded(receiverStream);
540 
541             LOG.debug("P -> C PushStreamPromise.succeeded() called with pushStreamPromise: {}", pushStreamPromise);
542 
543             PushStreamPromise promise = pushStreamPromise;
544             if (promise != null)
545                 receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
546         }
547 
548         public void push(PushStreamPromise pushStreamPromise)
549         {
550             Stream receiverStream = getReceiverStream();
551 
552             if (receiverStream != null)
553                 receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
554             else
555                 this.pushStreamPromise = pushStreamPromise;
556         }
557     }
558 
559     private class ProxySessionFrameListener extends SessionFrameListener.Adapter
560     {
561         @Override
562         public void onRst(Session serverSession, RstInfo serverRstInfo)
563         {
564             Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
565             if (serverStream != null)
566             {
567                 Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
568                 if (clientStream != null)
569                 {
570                     Session clientSession = clientStream.getSession();
571                     RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
572                     clientSession.rst(clientRstInfo, new Callback.Adapter());
573                 }
574             }
575         }
576 
577         @Override
578         public void onGoAway(Session serverSession, GoAwayResultInfo goAwayResultInfo)
579         {
580             serverSessions.values().remove(serverSession);
581         }
582     }
583 
584     private PushInfo convertPushInfo(PushInfo pushInfo, Stream from, Stream to)
585     {
586         Fields headersToConvert = pushInfo.getHeaders();
587         Fields headers = convertHeaders(from, to, headersToConvert);
588         return new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, pushInfo.isClose());
589     }
590 
591     private Fields convertHeaders(Stream from, Stream to, Fields headersToConvert)
592     {
593         Fields headers = new Fields(headersToConvert, false);
594         addResponseProxyHeaders(from, headers);
595         customizeResponseHeaders(from, headers);
596         convert(from.getSession().getVersion(), to.getSession().getVersion(), headers);
597         return headers;
598     }
599 }