View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.spdy.server.proxy;
21  
22  import java.net.InetSocketAddress;
23  import java.util.LinkedList;
24  import java.util.Queue;
25  import java.util.concurrent.ConcurrentHashMap;
26  import java.util.concurrent.ConcurrentMap;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
30  import org.eclipse.jetty.spdy.api.DataInfo;
31  import org.eclipse.jetty.spdy.api.GoAwayInfo;
32  import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
33  import org.eclipse.jetty.spdy.api.HeadersInfo;
34  import org.eclipse.jetty.spdy.api.Info;
35  import org.eclipse.jetty.spdy.api.PushInfo;
36  import org.eclipse.jetty.spdy.api.ReplyInfo;
37  import org.eclipse.jetty.spdy.api.RstInfo;
38  import org.eclipse.jetty.spdy.api.SPDY;
39  import org.eclipse.jetty.spdy.api.Session;
40  import org.eclipse.jetty.spdy.api.SessionFrameListener;
41  import org.eclipse.jetty.spdy.api.Stream;
42  import org.eclipse.jetty.spdy.api.StreamFrameListener;
43  import org.eclipse.jetty.spdy.api.StreamStatus;
44  import org.eclipse.jetty.spdy.api.SynInfo;
45  import org.eclipse.jetty.spdy.client.SPDYClient;
46  import org.eclipse.jetty.spdy.server.http.HTTPSPDYHeader;
47  import org.eclipse.jetty.util.Callback;
48  import org.eclipse.jetty.util.Fields;
49  import org.eclipse.jetty.util.Promise;
50  import org.eclipse.jetty.util.log.Log;
51  import org.eclipse.jetty.util.log.Logger;
52  
53  /**
54   * <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by clients into
55   * SPDY events for the servers.</p>
56   */
57  public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
58  {
59      private static final Logger LOG = Log.getLogger(SPDYProxyEngine.class);
60  
61      private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamHandler";
62      private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.clientStream";
63  
64      private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
65      private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
66      private final SPDYClient.Factory factory;
67      private volatile long connectTimeout = 15000;
68      private volatile long timeout = 60000;
69  
70      public SPDYProxyEngine(SPDYClient.Factory factory)
71      {
72          this.factory = factory;
73      }
74  
75      public long getConnectTimeout()
76      {
77          return connectTimeout;
78      }
79  
80      public void setConnectTimeout(long connectTimeout)
81      {
82          this.connectTimeout = connectTimeout;
83      }
84  
85      public long getTimeout()
86      {
87          return timeout;
88      }
89  
90      public void setTimeout(long timeout)
91      {
92          this.timeout = timeout;
93      }
94  
95      public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
96      {
97          Fields headers = new Fields(clientSynInfo.getHeaders(), false);
98  
99          short serverVersion = getVersion(proxyServerInfo.getProtocol());
100         InetSocketAddress address = proxyServerInfo.getAddress();
101         Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
102         if (serverSession == null)
103         {
104             rst(clientStream);
105             return null;
106         }
107 
108         final Session clientSession = clientStream.getSession();
109 
110         addRequestProxyHeaders(clientStream, headers);
111         customizeRequestHeaders(clientStream, headers);
112         convert(clientSession.getVersion(), serverVersion, headers);
113 
114         SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
115         StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
116         StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
117         clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
118         serverSession.syn(serverSynInfo, listener, handler);
119         return this;
120     }
121 
122     private static short getVersion(String protocol)
123     {
124         switch (protocol)
125         {
126             case "spdy/2":
127                 return SPDY.V2;
128             case "spdy/3":
129                 return SPDY.V3;
130             default:
131                 throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
132         }
133     }
134 
135     @Override
136     public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
137     {
138         throw new IllegalStateException("We shouldn't receive pushes from clients");
139     }
140 
141     @Override
142     public void onReply(Stream stream, ReplyInfo replyInfo)
143     {
144         throw new IllegalStateException("Servers do not receive replies");
145     }
146 
147     @Override
148     public void onHeaders(Stream stream, HeadersInfo headersInfo)
149     {
150         // TODO
151         throw new UnsupportedOperationException("Not Yet Implemented");
152     }
153 
154     @Override
155     public void onData(Stream clientStream, final DataInfo clientDataInfo)
156     {
157         LOG.debug("C -> P {} on {}", clientDataInfo, clientStream);
158 
159         ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
160         {
161             @Override
162             public void consume(int delta)
163             {
164                 super.consume(delta);
165                 clientDataInfo.consume(delta);
166             }
167         };
168 
169         StreamHandler streamHandler = (StreamHandler)clientStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
170         streamHandler.data(serverDataInfo);
171     }
172 
173     private Session produceSession(String host, short version, InetSocketAddress address)
174     {
175         try
176         {
177             Session session = serverSessions.get(host);
178             if (session == null)
179             {
180                 SPDYClient client = factory.newSPDYClient(version);
181                 session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
182                 LOG.debug("Proxy session connected to {}", address);
183                 Session existing = serverSessions.putIfAbsent(host, session);
184                 if (existing != null)
185                 {
186                     session.goAway(new GoAwayInfo(), new Callback.Adapter());
187                     session = existing;
188                 }
189             }
190             return session;
191         }
192         catch (Exception x)
193         {
194             LOG.debug(x);
195             return null;
196         }
197     }
198 
199     private void convert(short fromVersion, short toVersion, Fields headers)
200     {
201         if (fromVersion != toVersion)
202         {
203             for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
204             {
205                 Fields.Field header = headers.remove(httpHeader.name(fromVersion));
206                 if (header != null)
207                 {
208                     String toName = httpHeader.name(toVersion);
209                     for (String value : header.values())
210                         headers.add(toName, value);
211                 }
212             }
213         }
214     }
215 
216     private void rst(Stream stream)
217     {
218         RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
219         stream.getSession().rst(rstInfo, new Callback.Adapter());
220     }
221 
222     private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
223     {
224         private final Stream clientStream;
225         private volatile ReplyInfo replyInfo;
226 
227         public ProxyStreamFrameListener(Stream clientStream)
228         {
229             this.clientStream = clientStream;
230         }
231 
232         @Override
233         public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
234         {
235             LOG.debug("S -> P pushed {} on {}", pushInfo, stream);
236 
237             Fields headers = new Fields(pushInfo.getHeaders(), false);
238 
239             addResponseProxyHeaders(stream, headers);
240             customizeResponseHeaders(stream, headers);
241             Stream clientStream = (Stream)stream.getAssociatedStream().getAttribute
242                     (CLIENT_STREAM_ATTRIBUTE);
243             convert(stream.getSession().getVersion(), clientStream.getSession().getVersion(),
244                     headers);
245 
246             StreamHandler handler = new StreamHandler(clientStream, pushInfo);
247             stream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
248             clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers,
249                     pushInfo.isClose()),
250                     handler);
251             return new Adapter()
252             {
253                 @Override
254                 public void onReply(Stream stream, ReplyInfo replyInfo)
255                 {
256                     // Push streams never send a reply
257                     throw new UnsupportedOperationException();
258                 }
259 
260                 @Override
261                 public void onHeaders(Stream stream, HeadersInfo headersInfo)
262                 {
263                     throw new UnsupportedOperationException();
264                 }
265 
266                 @Override
267                 public void onData(Stream serverStream, final DataInfo serverDataInfo)
268                 {
269                     LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
270 
271                     ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
272                     {
273                         @Override
274                         public void consume(int delta)
275                         {
276                             super.consume(delta);
277                             serverDataInfo.consume(delta);
278                         }
279                     };
280 
281                     StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
282                     handler.data(clientDataInfo);
283                 }
284             };
285         }
286 
287         @Override
288         public void onReply(final Stream stream, ReplyInfo replyInfo)
289         {
290             LOG.debug("S -> P {} on {}", replyInfo, stream);
291 
292             short serverVersion = stream.getSession().getVersion();
293             Fields headers = new Fields(replyInfo.getHeaders(), false);
294 
295             addResponseProxyHeaders(stream, headers);
296             customizeResponseHeaders(stream, headers);
297             short clientVersion = this.clientStream.getSession().getVersion();
298             convert(serverVersion, clientVersion, headers);
299 
300             this.replyInfo = new ReplyInfo(headers, replyInfo.isClose());
301             if (replyInfo.isClose())
302                 reply(stream);
303         }
304 
305         @Override
306         public void onHeaders(Stream stream, HeadersInfo headersInfo)
307         {
308             // TODO
309             throw new UnsupportedOperationException("Not Yet Implemented");
310         }
311 
312         @Override
313         public void onData(final Stream stream, final DataInfo dataInfo)
314         {
315             LOG.debug("S -> P {} on {}", dataInfo, stream);
316 
317             if (replyInfo != null)
318             {
319                 if (dataInfo.isClose())
320                     replyInfo.getHeaders().put("content-length", String.valueOf(dataInfo.available()));
321                 reply(stream);
322             }
323             data(stream, dataInfo);
324         }
325 
326         private void reply(final Stream stream)
327         {
328             final ReplyInfo replyInfo = this.replyInfo;
329             this.replyInfo = null;
330             clientStream.reply(replyInfo, new Callback()
331             {
332                 @Override
333                 public void succeeded()
334                 {
335                     LOG.debug("P -> C {} from {} to {}", replyInfo, stream, clientStream);
336                 }
337 
338                 @Override
339                 public void failed(Throwable x)
340                 {
341                     LOG.debug(x);
342                     rst(clientStream);
343                 }
344             });
345         }
346 
347         private void data(final Stream stream, final DataInfo dataInfo)
348         {
349             clientStream.data(dataInfo, new Callback() //TODO: timeout???
350             {
351                 @Override
352                 public void succeeded()
353                 {
354                     dataInfo.consume(dataInfo.length());
355                     LOG.debug("P -> C {} from {} to {}", dataInfo, stream, clientStream);
356                 }
357 
358                 @Override
359                 public void failed(Throwable x)
360                 {
361                     LOG.debug(x);
362                     rst(clientStream);
363                 }
364             });
365         }
366     }
367 
368     /**
369      * <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p> <p>Instances
370      * of this class buffer DATA frames sent by clients and send them to the server. The buffering happens between the
371      * send of the SYN_STREAM to the server (where DATA frames may arrive from the client before the SYN_STREAM has been
372      * fully sent), and between DATA frames, if the client is a fast producer and the server a slow consumer, or if the
373      * client is a SPDY v2 client (and hence without flow control) while the server is a SPDY v3 server (and hence with
374      * flow control).</p>
375      */
376     private class StreamHandler implements Promise<Stream>
377     {
378         private final Queue<DataInfoHandler> queue = new LinkedList<>();
379         private final Stream clientStream;
380         private final Info info;
381         private Stream serverStream;
382 
383         private StreamHandler(Stream clientStream, Info info)
384         {
385             this.clientStream = clientStream;
386             this.info = info;
387         }
388 
389         @Override
390         public void succeeded(Stream serverStream)
391         {
392             LOG.debug("P -> S {} from {} to {}", info, clientStream, serverStream);
393 
394             serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
395 
396             DataInfoHandler dataInfoHandler;
397             synchronized (queue)
398             {
399                 this.serverStream = serverStream;
400                 dataInfoHandler = queue.peek();
401                 if (dataInfoHandler != null)
402                 {
403                     if (dataInfoHandler.flushing)
404                     {
405                         LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size());
406                         dataInfoHandler = null;
407                     }
408                     else
409                     {
410                         dataInfoHandler.flushing = true;
411                         LOG.debug("SYN completed, queue size {}", queue.size());
412                     }
413                 }
414                 else
415                 {
416                     LOG.debug("SYN completed, queue empty");
417                 }
418             }
419             if (dataInfoHandler != null)
420                 flush(serverStream, dataInfoHandler);
421         }
422 
423         @Override
424         public void failed(Throwable x)
425         {
426             LOG.debug(x);
427             rst(clientStream);
428         }
429 
430         public void data(DataInfo dataInfo)
431         {
432             Stream serverStream;
433             DataInfoHandler dataInfoHandler = null;
434             DataInfoHandler item = new DataInfoHandler(dataInfo);
435             synchronized (queue)
436             {
437                 queue.offer(item);
438                 serverStream = this.serverStream;
439                 if (serverStream != null)
440                 {
441                     dataInfoHandler = queue.peek();
442                     if (dataInfoHandler.flushing)
443                     {
444                         LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size());
445                         serverStream = null;
446                     }
447                     else
448                     {
449                         dataInfoHandler.flushing = true;
450                         LOG.debug("Queued {}, queue size {}", dataInfo, queue.size());
451                     }
452                 }
453                 else
454                 {
455                     LOG.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
456                 }
457             }
458             if (serverStream != null)
459                 flush(serverStream, dataInfoHandler);
460         }
461 
462         private void flush(Stream serverStream, DataInfoHandler dataInfoHandler)
463         {
464             LOG.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
465             serverStream.data(dataInfoHandler.dataInfo, dataInfoHandler); //TODO: timeout???
466         }
467 
468         private class DataInfoHandler implements Callback
469         {
470             private final DataInfo dataInfo;
471             private boolean flushing;
472 
473             private DataInfoHandler(DataInfo dataInfo)
474             {
475                 this.dataInfo = dataInfo;
476             }
477 
478             @Override
479             public void succeeded()
480             {
481                 Stream serverStream;
482                 DataInfoHandler dataInfoHandler;
483                 synchronized (queue)
484                 {
485                     serverStream = StreamHandler.this.serverStream;
486                     assert serverStream != null;
487                     dataInfoHandler = queue.poll();
488                     assert dataInfoHandler == this;
489                     dataInfoHandler = queue.peek();
490                     if (dataInfoHandler != null)
491                     {
492                         assert !dataInfoHandler.flushing;
493                         dataInfoHandler.flushing = true;
494                         LOG.debug("Completed {}, queue size {}", dataInfo, queue.size());
495                     }
496                     else
497                     {
498                         LOG.debug("Completed {}, queue empty", dataInfo);
499                     }
500                 }
501                 if (dataInfoHandler != null)
502                     flush(serverStream, dataInfoHandler);
503             }
504 
505             @Override
506             public void failed(Throwable x)
507             {
508                 LOG.debug(x);
509                 rst(clientStream);
510             }
511         }
512     }
513 
514     private class ProxySessionFrameListener extends SessionFrameListener.Adapter
515     {
516 
517         @Override
518         public void onRst(Session serverSession, RstInfo serverRstInfo)
519         {
520             Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
521             if (serverStream != null)
522             {
523                 Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
524                 if (clientStream != null)
525                 {
526                     Session clientSession = clientStream.getSession();
527                     RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
528                     clientSession.rst(clientRstInfo, new Callback.Adapter());
529                 }
530             }
531         }
532 
533         @Override
534         public void onGoAway(Session serverSession, GoAwayResultInfo goAwayResultInfo)
535         {
536             serverSessions.values().remove(serverSession);
537         }
538     }
539 }