View Javadoc

1   //========================================================================
2   //Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //All rights reserved. This program and the accompanying materials
5   //are made available under the terms of the Eclipse Public License v1.0
6   //and Apache License v2.0 which accompanies this distribution.
7   //The Eclipse Public License is available at
8   //http://www.eclipse.org/legal/epl-v10.html
9   //The Apache License v2.0 is available at
10  //http://www.opensource.org/licenses/apache2.0.php
11  //You may elect to redistribute this code under either of these licenses.
12  //========================================================================
13  
14  
15  package org.eclipse.jetty.spdy.proxy;
16  
17  import java.net.InetSocketAddress;
18  import java.util.LinkedList;
19  import java.util.Queue;
20  import java.util.concurrent.ConcurrentHashMap;
21  import java.util.concurrent.ConcurrentMap;
22  import java.util.concurrent.TimeUnit;
23  
24  import org.eclipse.jetty.spdy.SPDYClient;
25  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
26  import org.eclipse.jetty.spdy.api.DataInfo;
27  import org.eclipse.jetty.spdy.api.GoAwayInfo;
28  import org.eclipse.jetty.spdy.api.Handler;
29  import org.eclipse.jetty.spdy.api.Headers;
30  import org.eclipse.jetty.spdy.api.HeadersInfo;
31  import org.eclipse.jetty.spdy.api.ReplyInfo;
32  import org.eclipse.jetty.spdy.api.RstInfo;
33  import org.eclipse.jetty.spdy.api.SPDY;
34  import org.eclipse.jetty.spdy.api.Session;
35  import org.eclipse.jetty.spdy.api.SessionFrameListener;
36  import org.eclipse.jetty.spdy.api.Stream;
37  import org.eclipse.jetty.spdy.api.StreamFrameListener;
38  import org.eclipse.jetty.spdy.api.StreamStatus;
39  import org.eclipse.jetty.spdy.api.SynInfo;
40  import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
41  
42  /**
43   * <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by
44   * clients into SPDY events for the servers.</p>
45   */
46  public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
47  {
48      private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler";
49      private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientStream";
50  
51      private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
52      private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
53      private final SPDYClient.Factory factory;
54      private volatile long connectTimeout = 15000;
55      private volatile long timeout = 60000;
56  
57      public SPDYProxyEngine(SPDYClient.Factory factory)
58      {
59          this.factory = factory;
60      }
61  
62      public long getConnectTimeout()
63      {
64          return connectTimeout;
65      }
66  
67      public void setConnectTimeout(long connectTimeout)
68      {
69          this.connectTimeout = connectTimeout;
70      }
71  
72      public long getTimeout()
73      {
74          return timeout;
75      }
76  
77      public void setTimeout(long timeout)
78      {
79          this.timeout = timeout;
80      }
81  
82      public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
83      {
84          Headers headers = new Headers(clientSynInfo.getHeaders(), false);
85  
86          short serverVersion = getVersion(proxyServerInfo.getProtocol());
87          InetSocketAddress address = proxyServerInfo.getAddress();
88          Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
89          if (serverSession == null)
90          {
91              rst(clientStream);
92              return null;
93          }
94  
95          final Session clientSession = clientStream.getSession();
96  
97          addRequestProxyHeaders(clientStream, headers);
98          customizeRequestHeaders(clientStream, headers);
99          convert(clientSession.getVersion(), serverVersion, headers);
100 
101         SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
102         StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
103         StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
104         clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
105         serverSession.syn(serverSynInfo, listener, timeout, TimeUnit.MILLISECONDS, handler);
106         return this;
107     }
108 
109     private static short getVersion(String protocol)
110     {
111         switch (protocol)
112         {
113             case "spdy/2":
114                 return SPDY.V2;
115             case "spdy/3":
116                 return SPDY.V3;
117             default:
118                 throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
119         }
120     }
121 
122     @Override
123     public void onReply(Stream stream, ReplyInfo replyInfo)
124     {
125         // Servers do not receive replies
126     }
127 
128     @Override
129     public void onHeaders(Stream stream, HeadersInfo headersInfo)
130     {
131         // TODO
132         throw new UnsupportedOperationException("Not Yet Implemented");
133     }
134 
135     @Override
136     public void onData(Stream clientStream, final DataInfo clientDataInfo)
137     {
138         logger.debug("C -> P {} on {}", clientDataInfo, clientStream);
139 
140         ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
141         {
142             @Override
143             public void consume(int delta)
144             {
145                 super.consume(delta);
146                 clientDataInfo.consume(delta);
147             }
148         };
149 
150         StreamHandler streamHandler = (StreamHandler)clientStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
151         streamHandler.data(serverDataInfo);
152     }
153 
154     private Session produceSession(String host, short version, InetSocketAddress address)
155     {
156         try
157         {
158             Session session = serverSessions.get(host);
159             if (session == null)
160             {
161                 SPDYClient client = factory.newSPDYClient(version);
162                 session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
163                 logger.debug("Proxy session connected to {}", address);
164                 Session existing = serverSessions.putIfAbsent(host, session);
165                 if (existing != null)
166                 {
167                     session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
168                     session = existing;
169                 }
170             }
171             return session;
172         }
173         catch (Exception x)
174         {
175             logger.debug(x);
176             return null;
177         }
178     }
179 
180     private void convert(short fromVersion, short toVersion, Headers headers)
181     {
182         if (fromVersion != toVersion)
183         {
184             for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
185             {
186                 Headers.Header header = headers.remove(httpHeader.name(fromVersion));
187                 if (header != null)
188                 {
189                     String toName = httpHeader.name(toVersion);
190                     for (String value : header.values())
191                         headers.add(toName, value);
192                 }
193             }
194         }
195     }
196 
197     private void rst(Stream stream)
198     {
199         RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
200         stream.getSession().rst(rstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
201     }
202 
203     private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
204     {
205         private final Stream clientStream;
206         private volatile ReplyInfo replyInfo;
207 
208         public ProxyStreamFrameListener(Stream clientStream)
209         {
210             this.clientStream = clientStream;
211         }
212 
213         @Override
214         public void onReply(final Stream stream, ReplyInfo replyInfo)
215         {
216             logger.debug("S -> P {} on {}", replyInfo, stream);
217 
218             short serverVersion = stream.getSession().getVersion();
219             Headers headers = new Headers(replyInfo.getHeaders(), false);
220 
221             addResponseProxyHeaders(stream, headers);
222             customizeResponseHeaders(stream, headers);
223             short clientVersion = this.clientStream.getSession().getVersion();
224             convert(serverVersion, clientVersion, headers);
225 
226             this.replyInfo = new ReplyInfo(headers, replyInfo.isClose());
227             if (replyInfo.isClose())
228                 reply(stream);
229         }
230 
231         @Override
232         public void onHeaders(Stream stream, HeadersInfo headersInfo)
233         {
234             // TODO
235             throw new UnsupportedOperationException("Not Yet Implemented");
236         }
237 
238         @Override
239         public void onData(final Stream stream, final DataInfo dataInfo)
240         {
241             logger.debug("S -> P {} on {}", dataInfo, stream);
242 
243             if (replyInfo != null)
244             {
245                 if (dataInfo.isClose())
246                     replyInfo.getHeaders().put("content-length", String.valueOf(dataInfo.available()));
247                 reply(stream);
248             }
249             data(stream, dataInfo);
250         }
251 
252         private void reply(final Stream stream)
253         {
254             final ReplyInfo replyInfo = this.replyInfo;
255             this.replyInfo = null;
256             clientStream.reply(replyInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler<Void>()
257             {
258                 @Override
259                 public void completed(Void context)
260                 {
261                     logger.debug("P -> C {} from {} to {}", replyInfo, stream, clientStream);
262                 }
263 
264                 @Override
265                 public void failed(Void context, Throwable x)
266                 {
267                     logger.debug(x);
268                     rst(clientStream);
269                 }
270             });
271         }
272 
273         private void data(final Stream stream, final DataInfo dataInfo)
274         {
275             clientStream.data(dataInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler<Void>()
276             {
277                 @Override
278                 public void completed(Void context)
279                 {
280                     dataInfo.consume(dataInfo.length());
281                     logger.debug("P -> C {} from {} to {}", dataInfo, stream, clientStream);
282                 }
283 
284                 @Override
285                 public void failed(Void context, Throwable x)
286                 {
287                     logger.debug(x);
288                     rst(clientStream);
289                 }
290             });
291         }
292     }
293 
294     /**
295      * <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p>
296      * <p>Instances of this class buffer DATA frames sent by clients and send them to the server.
297      * The buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive
298      * from the client before the SYN_STREAM has been fully sent), and between DATA frames, if the client
299      * is a fast producer and the server a slow consumer, or if the client is a SPDY v2 client (and hence
300      * without flow control) while the server is a SPDY v3 server (and hence with flow control).</p>
301      */
302     private class StreamHandler implements Handler<Stream>
303     {
304         private final Queue<DataInfoHandler> queue = new LinkedList<>();
305         private final Stream clientStream;
306         private final SynInfo serverSynInfo;
307         private Stream serverStream;
308 
309         private StreamHandler(Stream clientStream, SynInfo serverSynInfo)
310         {
311             this.clientStream = clientStream;
312             this.serverSynInfo = serverSynInfo;
313         }
314 
315         @Override
316         public void completed(Stream serverStream)
317         {
318             logger.debug("P -> S {} from {} to {}", serverSynInfo, clientStream, serverStream);
319 
320             serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
321 
322             DataInfoHandler dataInfoHandler;
323             synchronized (queue)
324             {
325                 this.serverStream = serverStream;
326                 dataInfoHandler = queue.peek();
327                 if (dataInfoHandler != null)
328                 {
329                     if (dataInfoHandler.flushing)
330                     {
331                         logger.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size());
332                         dataInfoHandler = null;
333                     }
334                     else
335                     {
336                         dataInfoHandler.flushing = true;
337                         logger.debug("SYN completed, queue size {}", queue.size());
338                     }
339                 }
340                 else
341                 {
342                     logger.debug("SYN completed, queue empty");
343                 }
344             }
345             if (dataInfoHandler != null)
346                 flush(serverStream, dataInfoHandler);
347         }
348 
349         @Override
350         public void failed(Stream serverStream, Throwable x)
351         {
352             logger.debug(x);
353             rst(clientStream);
354         }
355 
356         public void data(DataInfo dataInfo)
357         {
358             Stream serverStream;
359             DataInfoHandler dataInfoHandler = null;
360             DataInfoHandler item = new DataInfoHandler(dataInfo);
361             synchronized (queue)
362             {
363                 queue.offer(item);
364                 serverStream = this.serverStream;
365                 if (serverStream != null)
366                 {
367                     dataInfoHandler = queue.peek();
368                     if (dataInfoHandler.flushing)
369                     {
370                         logger.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size());
371                         serverStream = null;
372                     }
373                     else
374                     {
375                         dataInfoHandler.flushing = true;
376                         logger.debug("Queued {}, queue size {}", dataInfo, queue.size());
377                     }
378                 }
379                 else
380                 {
381                     logger.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
382                 }
383             }
384             if (serverStream != null)
385                 flush(serverStream, dataInfoHandler);
386         }
387 
388         private void flush(Stream serverStream, DataInfoHandler dataInfoHandler)
389         {
390             logger.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
391             serverStream.data(dataInfoHandler.dataInfo, getTimeout(), TimeUnit.MILLISECONDS, dataInfoHandler);
392         }
393 
394         private class DataInfoHandler implements Handler<Void>
395         {
396             private final DataInfo dataInfo;
397             private boolean flushing;
398 
399             private DataInfoHandler(DataInfo dataInfo)
400             {
401                 this.dataInfo = dataInfo;
402             }
403 
404             @Override
405             public void completed(Void context)
406             {
407                 Stream serverStream;
408                 DataInfoHandler dataInfoHandler;
409                 synchronized (queue)
410                 {
411                     serverStream = StreamHandler.this.serverStream;
412                     assert serverStream != null;
413                     dataInfoHandler = queue.poll();
414                     assert dataInfoHandler == this;
415                     dataInfoHandler = queue.peek();
416                     if (dataInfoHandler != null)
417                     {
418                         assert !dataInfoHandler.flushing;
419                         dataInfoHandler.flushing = true;
420                         logger.debug("Completed {}, queue size {}", dataInfo, queue.size());
421                     }
422                     else
423                     {
424                         logger.debug("Completed {}, queue empty", dataInfo);
425                     }
426                 }
427                 if (dataInfoHandler != null)
428                     flush(serverStream, dataInfoHandler);
429             }
430 
431             @Override
432             public void failed(Void context, Throwable x)
433             {
434                 logger.debug(x);
435                 rst(clientStream);
436             }
437         }
438     }
439 
440     private class ProxySessionFrameListener extends SessionFrameListener.Adapter implements StreamFrameListener
441     {
442         @Override
443         public StreamFrameListener onSyn(Stream serverStream, SynInfo serverSynInfo)
444         {
445             logger.debug("S -> P pushed {} on {}", serverSynInfo, serverStream);
446 
447             Headers headers = new Headers(serverSynInfo.getHeaders(), false);
448 
449             addResponseProxyHeaders(serverStream, headers);
450             customizeResponseHeaders(serverStream, headers);
451             Stream clientStream = (Stream)serverStream.getAssociatedStream().getAttribute(CLIENT_STREAM_ATTRIBUTE);
452             convert(serverStream.getSession().getVersion(), clientStream.getSession().getVersion(), headers);
453 
454             StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
455             serverStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
456             clientStream.syn(new SynInfo(headers, serverSynInfo.isClose()), getTimeout(), TimeUnit.MILLISECONDS, handler);
457 
458             return this;
459         }
460 
461         @Override
462         public void onRst(Session serverSession, RstInfo serverRstInfo)
463         {
464             Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
465             if (serverStream != null)
466             {
467                 Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
468                 if (clientStream != null)
469                 {
470                     Session clientSession = clientStream.getSession();
471                     RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
472                     clientSession.rst(clientRstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
473                 }
474             }
475         }
476 
477         @Override
478         public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo)
479         {
480             serverSessions.values().remove(serverSession);
481         }
482 
483         @Override
484         public void onReply(Stream stream, ReplyInfo replyInfo)
485         {
486             // Push streams never send a reply
487         }
488 
489         @Override
490         public void onHeaders(Stream stream, HeadersInfo headersInfo)
491         {
492             throw new UnsupportedOperationException(); //TODO
493         }
494 
495         @Override
496         public void onData(Stream serverStream, final DataInfo serverDataInfo)
497         {
498             logger.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
499 
500             ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
501             {
502                 @Override
503                 public void consume(int delta)
504                 {
505                     super.consume(delta);
506                     serverDataInfo.consume(delta);
507                 }
508             };
509 
510             StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
511             handler.data(clientDataInfo);
512         }
513     }
514 }