View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.spdy.proxy;
21  
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.util.Locale;
25  import java.util.concurrent.TimeUnit;
26  import java.util.regex.Matcher;
27  import java.util.regex.Pattern;
28  
29  import org.eclipse.jetty.http.HttpFields;
30  import org.eclipse.jetty.http.HttpGenerator;
31  import org.eclipse.jetty.io.AsyncEndPoint;
32  import org.eclipse.jetty.io.Buffer;
33  import org.eclipse.jetty.io.ByteArrayBuffer;
34  import org.eclipse.jetty.io.EndPoint;
35  import org.eclipse.jetty.io.nio.DirectNIOBuffer;
36  import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
37  import org.eclipse.jetty.io.nio.NIOBuffer;
38  import org.eclipse.jetty.server.AsyncHttpConnection;
39  import org.eclipse.jetty.spdy.ISession;
40  import org.eclipse.jetty.spdy.IStream;
41  import org.eclipse.jetty.spdy.SPDYServerConnector;
42  import org.eclipse.jetty.spdy.StandardSession;
43  import org.eclipse.jetty.spdy.StandardStream;
44  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
45  import org.eclipse.jetty.spdy.api.BytesDataInfo;
46  import org.eclipse.jetty.spdy.api.DataInfo;
47  import org.eclipse.jetty.spdy.api.GoAwayInfo;
48  import org.eclipse.jetty.spdy.api.Handler;
49  import org.eclipse.jetty.spdy.api.Headers;
50  import org.eclipse.jetty.spdy.api.HeadersInfo;
51  import org.eclipse.jetty.spdy.api.ReplyInfo;
52  import org.eclipse.jetty.spdy.api.RstInfo;
53  import org.eclipse.jetty.spdy.api.SessionStatus;
54  import org.eclipse.jetty.spdy.api.Stream;
55  import org.eclipse.jetty.spdy.api.StreamFrameListener;
56  import org.eclipse.jetty.spdy.api.SynInfo;
57  import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
58  
59  public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
60  {
61      private final Headers headers = new Headers();
62      private final short version;
63      private final ProxyEngineSelector proxyEngineSelector;
64      private final HttpGenerator generator;
65      private final ISession session;
66      private HTTPStream stream;
67      private Buffer content;
68  
69      public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endPoint, short version, ProxyEngineSelector proxyEngineSelector)
70      {
71          super(connector, endPoint, connector.getServer());
72          this.version = version;
73          this.proxyEngineSelector = proxyEngineSelector;
74          this.generator = (HttpGenerator)_generator;
75          this.session = new HTTPSession(version, connector);
76          this.session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddr());
77      }
78  
79      @Override
80      public AsyncEndPoint getEndPoint()
81      {
82          return (AsyncEndPoint)super.getEndPoint();
83      }
84  
85      @Override
86      protected void startRequest(Buffer method, Buffer uri, Buffer httpVersion) throws IOException
87      {
88          SPDYServerConnector connector = (SPDYServerConnector)getConnector();
89          String scheme = connector.getSslContextFactory() != null ? "https" : "http";
90          headers.put(HTTPSPDYHeader.SCHEME.name(version), scheme);
91          headers.put(HTTPSPDYHeader.METHOD.name(version), method.toString("UTF-8"));
92          headers.put(HTTPSPDYHeader.URI.name(version), uri.toString("UTF-8"));
93          headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.toString("UTF-8"));
94      }
95  
96      @Override
97      protected void parsedHeader(Buffer name, Buffer value) throws IOException
98      {
99          String headerName = name.toString("UTF-8").toLowerCase(Locale.ENGLISH);
100         String headerValue = value.toString("UTF-8");
101         switch (headerName)
102         {
103             case "host":
104                 headers.put(HTTPSPDYHeader.HOST.name(version), headerValue);
105                 break;
106             default:
107                 headers.put(headerName, headerValue);
108                 break;
109         }
110     }
111 
112     @Override
113     protected void headerComplete() throws IOException
114     {
115     }
116 
117     @Override
118     protected void content(Buffer buffer) throws IOException
119     {
120         if (content == null)
121         {
122             stream = syn(false);
123             content = buffer;
124         }
125         else
126         {
127             stream.getStreamFrameListener().onData(stream, toDataInfo(buffer, false));
128         }
129     }
130 
131     @Override
132     public void messageComplete(long contentLength) throws IOException
133     {
134         if (stream == null)
135         {
136             assert content == null;
137             if (headers.isEmpty())
138                 proxyEngineSelector.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK));
139             else
140                 syn(true);
141         }
142         else
143         {
144             stream.getStreamFrameListener().onData(stream, toDataInfo(content, true));
145         }
146         headers.clear();
147         stream = null;
148         content = null;
149     }
150 
151     private HTTPStream syn(boolean close)
152     {
153         HTTPStream stream = new HTTPStream(1, (byte)0, session, null);
154         StreamFrameListener streamFrameListener = proxyEngineSelector.onSyn(stream, new SynInfo(headers, close));
155         stream.setStreamFrameListener(streamFrameListener);
156         return stream;
157     }
158 
159     private DataInfo toDataInfo(Buffer buffer, boolean close)
160     {
161         if (buffer instanceof ByteArrayBuffer)
162             return new BytesDataInfo(buffer.array(), buffer.getIndex(), buffer.length(), close);
163 
164         if (buffer instanceof NIOBuffer)
165         {
166             ByteBuffer byteBuffer = ((NIOBuffer)buffer).getByteBuffer();
167             byteBuffer.limit(buffer.putIndex());
168             byteBuffer.position(buffer.getIndex());
169             return new ByteBufferDataInfo(byteBuffer, close);
170         }
171 
172         return new BytesDataInfo(buffer.asArray(), close);
173     }
174 
175     private class HTTPSession extends StandardSession
176     {
177         private HTTPSession(short version, SPDYServerConnector connector)
178         {
179             super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngineSelector, null, null);
180         }
181 
182         @Override
183         public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler<Void> handler)
184         {
185             // Not much we can do in HTTP land: just close the connection
186             goAway(timeout, unit, handler);
187         }
188 
189         @Override
190         public void goAway(long timeout, TimeUnit unit, Handler<Void> handler)
191         {
192             try
193             {
194                 getEndPoint().close();
195                 handler.completed(null);
196             }
197             catch (IOException x)
198             {
199                 handler.failed(null, x);
200             }
201         }
202     }
203 
204     /**
205      * <p>This stream will convert the SPDY invocations performed by the proxy into HTTP to be sent to the client.</p>
206      */
207     private class HTTPStream extends StandardStream
208     {
209         private final Pattern statusRegexp = Pattern.compile("(\\d{3})\\s*(.*)");
210 
211         private HTTPStream(int id, byte priority, ISession session, IStream associatedStream)
212         {
213             super(id, priority, session, associatedStream);
214         }
215 
216         @Override
217         public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler)
218         {
219             // HTTP does not support pushed streams
220             handler.completed(new HTTPPushStream(2, getPriority(), getSession(), this));
221         }
222 
223         @Override
224         public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
225         {
226             // TODO
227             throw new UnsupportedOperationException("Not Yet Implemented");
228         }
229 
230         @Override
231         public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler)
232         {
233             try
234             {
235                 Headers headers = new Headers(replyInfo.getHeaders(), false);
236 
237                 headers.remove(HTTPSPDYHeader.SCHEME.name(version));
238 
239                 String status = headers.remove(HTTPSPDYHeader.STATUS.name(version)).value();
240                 Matcher matcher = statusRegexp.matcher(status);
241                 matcher.matches();
242                 int code = Integer.parseInt(matcher.group(1));
243                 String reason = matcher.group(2);
244                 generator.setResponse(code, reason);
245 
246                 String httpVersion = headers.remove(HTTPSPDYHeader.VERSION.name(version)).value();
247                 generator.setVersion(Integer.parseInt(httpVersion.replaceAll("\\D", "")));
248 
249                 Headers.Header host = headers.remove(HTTPSPDYHeader.HOST.name(version));
250                 if (host != null)
251                     headers.put("host", host.value());
252 
253                 HttpFields fields = new HttpFields();
254                 for (Headers.Header header : headers)
255                 {
256                     String name = camelize(header.name());
257                     fields.put(name, header.value());
258                 }
259                 generator.completeHeader(fields, replyInfo.isClose());
260 
261                 if (replyInfo.isClose())
262                     complete();
263 
264                 handler.completed(null);
265             }
266             catch (IOException x)
267             {
268                 handler.failed(null, x);
269             }
270         }
271 
272         private String camelize(String name)
273         {
274             char[] chars = name.toCharArray();
275             chars[0] = Character.toUpperCase(chars[0]);
276 
277             for (int i = 0; i < chars.length; ++i)
278             {
279                 char c = chars[i];
280                 int j = i + 1;
281                 if (c == '-' && j < chars.length)
282                     chars[j] = Character.toUpperCase(chars[j]);
283             }
284             return new String(chars);
285         }
286 
287         @Override
288         public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
289         {
290             try
291             {
292                 // Data buffer must be copied, as the ByteBuffer is pooled
293                 ByteBuffer byteBuffer = dataInfo.asByteBuffer(false);
294 
295                 Buffer buffer = byteBuffer.isDirect() ?
296                         new DirectNIOBuffer(byteBuffer, false) :
297                         new IndirectNIOBuffer(byteBuffer, false);
298 
299                 generator.addContent(buffer, dataInfo.isClose());
300                 generator.flush(unit.toMillis(timeout));
301 
302                 if (dataInfo.isClose())
303                     complete();
304 
305                 handler.completed(null);
306             }
307             catch (IOException x)
308             {
309                 handler.failed(null, x);
310             }
311         }
312 
313         private void complete() throws IOException
314         {
315             generator.complete();
316             // We need to call asyncDispatch() as if the HTTP request
317             // has been suspended and now we complete the response
318             getEndPoint().asyncDispatch();
319         }
320     }
321 
322     private class HTTPPushStream extends StandardStream
323     {
324         private HTTPPushStream(int id, byte priority, ISession session, IStream associatedStream)
325         {
326             super(id, priority, session, associatedStream);
327         }
328 
329         @Override
330         public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
331         {
332             // Ignore pushed headers
333             handler.completed(null);
334         }
335 
336         @Override
337         public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
338         {
339             // Ignore pushed data
340             handler.completed(null);
341         }
342     }
343 }