View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.spdy.proxy;
21  
22  import java.net.InetSocketAddress;
23  import java.util.LinkedList;
24  import java.util.Queue;
25  import java.util.concurrent.ConcurrentHashMap;
26  import java.util.concurrent.ConcurrentMap;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.eclipse.jetty.spdy.SPDYClient;
30  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
31  import org.eclipse.jetty.spdy.api.DataInfo;
32  import org.eclipse.jetty.spdy.api.GoAwayInfo;
33  import org.eclipse.jetty.spdy.api.Handler;
34  import org.eclipse.jetty.spdy.api.Headers;
35  import org.eclipse.jetty.spdy.api.HeadersInfo;
36  import org.eclipse.jetty.spdy.api.ReplyInfo;
37  import org.eclipse.jetty.spdy.api.RstInfo;
38  import org.eclipse.jetty.spdy.api.SPDY;
39  import org.eclipse.jetty.spdy.api.Session;
40  import org.eclipse.jetty.spdy.api.SessionFrameListener;
41  import org.eclipse.jetty.spdy.api.Stream;
42  import org.eclipse.jetty.spdy.api.StreamFrameListener;
43  import org.eclipse.jetty.spdy.api.StreamStatus;
44  import org.eclipse.jetty.spdy.api.SynInfo;
45  import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
46  
47  /**
48   * <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by
49   * clients into SPDY events for the servers.</p>
50   */
51  public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
52  {
53      private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler";
54      private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientStream";
55  
56      private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
57      private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
58      private final SPDYClient.Factory factory;
59      private volatile long connectTimeout = 15000;
60      private volatile long timeout = 60000;
61  
62      public SPDYProxyEngine(SPDYClient.Factory factory)
63      {
64          this.factory = factory;
65      }
66  
67      public long getConnectTimeout()
68      {
69          return connectTimeout;
70      }
71  
72      public void setConnectTimeout(long connectTimeout)
73      {
74          this.connectTimeout = connectTimeout;
75      }
76  
77      public long getTimeout()
78      {
79          return timeout;
80      }
81  
82      public void setTimeout(long timeout)
83      {
84          this.timeout = timeout;
85      }
86  
87      public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
88      {
89          Headers headers = new Headers(clientSynInfo.getHeaders(), false);
90  
91          short serverVersion = getVersion(proxyServerInfo.getProtocol());
92          InetSocketAddress address = proxyServerInfo.getAddress();
93          Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
94          if (serverSession == null)
95          {
96              rst(clientStream);
97              return null;
98          }
99  
100         final Session clientSession = clientStream.getSession();
101 
102         addRequestProxyHeaders(clientStream, headers);
103         customizeRequestHeaders(clientStream, headers);
104         convert(clientSession.getVersion(), serverVersion, headers);
105 
106         SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
107         StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
108         StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
109         clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
110         serverSession.syn(serverSynInfo, listener, timeout, TimeUnit.MILLISECONDS, handler);
111         return this;
112     }
113 
114     private static short getVersion(String protocol)
115     {
116         switch (protocol)
117         {
118             case "spdy/2":
119                 return SPDY.V2;
120             case "spdy/3":
121                 return SPDY.V3;
122             default:
123                 throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
124         }
125     }
126 
127     @Override
128     public void onReply(Stream stream, ReplyInfo replyInfo)
129     {
130         // Servers do not receive replies
131     }
132 
133     @Override
134     public void onHeaders(Stream stream, HeadersInfo headersInfo)
135     {
136         // TODO
137         throw new UnsupportedOperationException("Not Yet Implemented");
138     }
139 
140     @Override
141     public void onData(Stream clientStream, final DataInfo clientDataInfo)
142     {
143         logger.debug("C -> P {} on {}", clientDataInfo, clientStream);
144 
145         ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
146         {
147             @Override
148             public void consume(int delta)
149             {
150                 super.consume(delta);
151                 clientDataInfo.consume(delta);
152             }
153         };
154 
155         StreamHandler streamHandler = (StreamHandler)clientStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
156         streamHandler.data(serverDataInfo);
157     }
158 
159     private Session produceSession(String host, short version, InetSocketAddress address)
160     {
161         try
162         {
163             Session session = serverSessions.get(host);
164             if (session == null)
165             {
166                 SPDYClient client = factory.newSPDYClient(version);
167                 session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
168                 logger.debug("Proxy session connected to {}", address);
169                 Session existing = serverSessions.putIfAbsent(host, session);
170                 if (existing != null)
171                 {
172                     session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
173                     session = existing;
174                 }
175             }
176             return session;
177         }
178         catch (Exception x)
179         {
180             logger.debug(x);
181             return null;
182         }
183     }
184 
185     private void convert(short fromVersion, short toVersion, Headers headers)
186     {
187         if (fromVersion != toVersion)
188         {
189             for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
190             {
191                 Headers.Header header = headers.remove(httpHeader.name(fromVersion));
192                 if (header != null)
193                 {
194                     String toName = httpHeader.name(toVersion);
195                     for (String value : header.values())
196                         headers.add(toName, value);
197                 }
198             }
199         }
200     }
201 
202     private void rst(Stream stream)
203     {
204         RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
205         stream.getSession().rst(rstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
206     }
207 
208     private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
209     {
210         private final Stream clientStream;
211         private volatile ReplyInfo replyInfo;
212 
213         public ProxyStreamFrameListener(Stream clientStream)
214         {
215             this.clientStream = clientStream;
216         }
217 
218         @Override
219         public void onReply(final Stream stream, ReplyInfo replyInfo)
220         {
221             logger.debug("S -> P {} on {}", replyInfo, stream);
222 
223             short serverVersion = stream.getSession().getVersion();
224             Headers headers = new Headers(replyInfo.getHeaders(), false);
225 
226             addResponseProxyHeaders(stream, headers);
227             customizeResponseHeaders(stream, headers);
228             short clientVersion = this.clientStream.getSession().getVersion();
229             convert(serverVersion, clientVersion, headers);
230 
231             this.replyInfo = new ReplyInfo(headers, replyInfo.isClose());
232             if (replyInfo.isClose())
233                 reply(stream);
234         }
235 
236         @Override
237         public void onHeaders(Stream stream, HeadersInfo headersInfo)
238         {
239             // TODO
240             throw new UnsupportedOperationException("Not Yet Implemented");
241         }
242 
243         @Override
244         public void onData(final Stream stream, final DataInfo dataInfo)
245         {
246             logger.debug("S -> P {} on {}", dataInfo, stream);
247 
248             if (replyInfo != null)
249             {
250                 if (dataInfo.isClose())
251                     replyInfo.getHeaders().put("content-length", String.valueOf(dataInfo.available()));
252                 reply(stream);
253             }
254             data(stream, dataInfo);
255         }
256 
257         private void reply(final Stream stream)
258         {
259             final ReplyInfo replyInfo = this.replyInfo;
260             this.replyInfo = null;
261             clientStream.reply(replyInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler<Void>()
262             {
263                 @Override
264                 public void completed(Void context)
265                 {
266                     logger.debug("P -> C {} from {} to {}", replyInfo, stream, clientStream);
267                 }
268 
269                 @Override
270                 public void failed(Void context, Throwable x)
271                 {
272                     logger.debug(x);
273                     rst(clientStream);
274                 }
275             });
276         }
277 
278         private void data(final Stream stream, final DataInfo dataInfo)
279         {
280             clientStream.data(dataInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler<Void>()
281             {
282                 @Override
283                 public void completed(Void context)
284                 {
285                     dataInfo.consume(dataInfo.length());
286                     logger.debug("P -> C {} from {} to {}", dataInfo, stream, clientStream);
287                 }
288 
289                 @Override
290                 public void failed(Void context, Throwable x)
291                 {
292                     logger.debug(x);
293                     rst(clientStream);
294                 }
295             });
296         }
297     }
298 
299     /**
300      * <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p>
301      * <p>Instances of this class buffer DATA frames sent by clients and send them to the server.
302      * The buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive
303      * from the client before the SYN_STREAM has been fully sent), and between DATA frames, if the client
304      * is a fast producer and the server a slow consumer, or if the client is a SPDY v2 client (and hence
305      * without flow control) while the server is a SPDY v3 server (and hence with flow control).</p>
306      */
307     private class StreamHandler implements Handler<Stream>
308     {
309         private final Queue<DataInfoHandler> queue = new LinkedList<>();
310         private final Stream clientStream;
311         private final SynInfo serverSynInfo;
312         private Stream serverStream;
313 
314         private StreamHandler(Stream clientStream, SynInfo serverSynInfo)
315         {
316             this.clientStream = clientStream;
317             this.serverSynInfo = serverSynInfo;
318         }
319 
320         @Override
321         public void completed(Stream serverStream)
322         {
323             logger.debug("P -> S {} from {} to {}", serverSynInfo, clientStream, serverStream);
324 
325             serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
326 
327             DataInfoHandler dataInfoHandler;
328             synchronized (queue)
329             {
330                 this.serverStream = serverStream;
331                 dataInfoHandler = queue.peek();
332                 if (dataInfoHandler != null)
333                 {
334                     if (dataInfoHandler.flushing)
335                     {
336                         logger.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size());
337                         dataInfoHandler = null;
338                     }
339                     else
340                     {
341                         dataInfoHandler.flushing = true;
342                         logger.debug("SYN completed, queue size {}", queue.size());
343                     }
344                 }
345                 else
346                 {
347                     logger.debug("SYN completed, queue empty");
348                 }
349             }
350             if (dataInfoHandler != null)
351                 flush(serverStream, dataInfoHandler);
352         }
353 
354         @Override
355         public void failed(Stream serverStream, Throwable x)
356         {
357             logger.debug(x);
358             rst(clientStream);
359         }
360 
361         public void data(DataInfo dataInfo)
362         {
363             Stream serverStream;
364             DataInfoHandler dataInfoHandler = null;
365             DataInfoHandler item = new DataInfoHandler(dataInfo);
366             synchronized (queue)
367             {
368                 queue.offer(item);
369                 serverStream = this.serverStream;
370                 if (serverStream != null)
371                 {
372                     dataInfoHandler = queue.peek();
373                     if (dataInfoHandler.flushing)
374                     {
375                         logger.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size());
376                         serverStream = null;
377                     }
378                     else
379                     {
380                         dataInfoHandler.flushing = true;
381                         logger.debug("Queued {}, queue size {}", dataInfo, queue.size());
382                     }
383                 }
384                 else
385                 {
386                     logger.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
387                 }
388             }
389             if (serverStream != null)
390                 flush(serverStream, dataInfoHandler);
391         }
392 
393         private void flush(Stream serverStream, DataInfoHandler dataInfoHandler)
394         {
395             logger.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
396             serverStream.data(dataInfoHandler.dataInfo, getTimeout(), TimeUnit.MILLISECONDS, dataInfoHandler);
397         }
398 
399         private class DataInfoHandler implements Handler<Void>
400         {
401             private final DataInfo dataInfo;
402             private boolean flushing;
403 
404             private DataInfoHandler(DataInfo dataInfo)
405             {
406                 this.dataInfo = dataInfo;
407             }
408 
409             @Override
410             public void completed(Void context)
411             {
412                 Stream serverStream;
413                 DataInfoHandler dataInfoHandler;
414                 synchronized (queue)
415                 {
416                     serverStream = StreamHandler.this.serverStream;
417                     assert serverStream != null;
418                     dataInfoHandler = queue.poll();
419                     assert dataInfoHandler == this;
420                     dataInfoHandler = queue.peek();
421                     if (dataInfoHandler != null)
422                     {
423                         assert !dataInfoHandler.flushing;
424                         dataInfoHandler.flushing = true;
425                         logger.debug("Completed {}, queue size {}", dataInfo, queue.size());
426                     }
427                     else
428                     {
429                         logger.debug("Completed {}, queue empty", dataInfo);
430                     }
431                 }
432                 if (dataInfoHandler != null)
433                     flush(serverStream, dataInfoHandler);
434             }
435 
436             @Override
437             public void failed(Void context, Throwable x)
438             {
439                 logger.debug(x);
440                 rst(clientStream);
441             }
442         }
443     }
444 
445     private class ProxySessionFrameListener extends SessionFrameListener.Adapter implements StreamFrameListener
446     {
447         @Override
448         public StreamFrameListener onSyn(Stream serverStream, SynInfo serverSynInfo)
449         {
450             logger.debug("S -> P pushed {} on {}", serverSynInfo, serverStream);
451 
452             Headers headers = new Headers(serverSynInfo.getHeaders(), false);
453 
454             addResponseProxyHeaders(serverStream, headers);
455             customizeResponseHeaders(serverStream, headers);
456             Stream clientStream = (Stream)serverStream.getAssociatedStream().getAttribute(CLIENT_STREAM_ATTRIBUTE);
457             convert(serverStream.getSession().getVersion(), clientStream.getSession().getVersion(), headers);
458 
459             StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
460             serverStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
461             clientStream.syn(new SynInfo(headers, serverSynInfo.isClose()), getTimeout(), TimeUnit.MILLISECONDS, handler);
462 
463             return this;
464         }
465 
466         @Override
467         public void onRst(Session serverSession, RstInfo serverRstInfo)
468         {
469             Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
470             if (serverStream != null)
471             {
472                 Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
473                 if (clientStream != null)
474                 {
475                     Session clientSession = clientStream.getSession();
476                     RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
477                     clientSession.rst(clientRstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
478                 }
479             }
480         }
481 
482         @Override
483         public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo)
484         {
485             serverSessions.values().remove(serverSession);
486         }
487 
488         @Override
489         public void onReply(Stream stream, ReplyInfo replyInfo)
490         {
491             // Push streams never send a reply
492         }
493 
494         @Override
495         public void onHeaders(Stream stream, HeadersInfo headersInfo)
496         {
497             throw new UnsupportedOperationException(); //TODO
498         }
499 
500         @Override
501         public void onData(Stream serverStream, final DataInfo serverDataInfo)
502         {
503             logger.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
504 
505             ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
506             {
507                 @Override
508                 public void consume(int delta)
509                 {
510                     super.consume(delta);
511                     serverDataInfo.consume(delta);
512                 }
513             };
514 
515             StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
516             handler.data(clientDataInfo);
517         }
518     }
519 }