View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.proxy;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.WritePendingException;
24  
25  import javax.servlet.ReadListener;
26  import javax.servlet.ServletConfig;
27  import javax.servlet.ServletException;
28  import javax.servlet.ServletInputStream;
29  import javax.servlet.ServletOutputStream;
30  import javax.servlet.WriteListener;
31  import javax.servlet.http.HttpServletRequest;
32  import javax.servlet.http.HttpServletResponse;
33  
34  import org.eclipse.jetty.client.api.ContentProvider;
35  import org.eclipse.jetty.client.api.Request;
36  import org.eclipse.jetty.client.api.Response;
37  import org.eclipse.jetty.client.util.DeferredContentProvider;
38  import org.eclipse.jetty.util.Callback;
39  import org.eclipse.jetty.util.IteratingCallback;
40  
41  /**
42   * <p>Servlet 3.1 asynchronous proxy servlet.</p>
43   * <p>Both the request processing and the I/O are asynchronous.</p>
44   *
45   * @see ProxyServlet
46   * @see AsyncMiddleManServlet
47   * @see ConnectHandler
48   */
49  public class AsyncProxyServlet extends ProxyServlet
50  {
51      private static final String WRITE_LISTENER_ATTRIBUTE = AsyncProxyServlet.class.getName() + ".writeListener";
52  
53      @Override
54      protected ContentProvider proxyRequestContent(HttpServletRequest request, HttpServletResponse response, Request proxyRequest) throws IOException
55      {
56          ServletInputStream input = request.getInputStream();
57          DeferredContentProvider provider = new DeferredContentProvider();
58          input.setReadListener(newReadListener(request, response, proxyRequest, provider));
59          return provider;
60      }
61  
62      protected ReadListener newReadListener(HttpServletRequest request, HttpServletResponse response, Request proxyRequest, DeferredContentProvider provider)
63      {
64          return new StreamReader(request, response, proxyRequest, provider);
65      }
66  
67      @Override
68      protected void onResponseContent(HttpServletRequest request, HttpServletResponse response, Response proxyResponse, byte[] buffer, int offset, int length, Callback callback)
69      {
70          try
71          {
72              if (_log.isDebugEnabled())
73                  _log.debug("{} proxying content to downstream: {} bytes", getRequestId(request), length);
74              StreamWriter writeListener = (StreamWriter)request.getAttribute(WRITE_LISTENER_ATTRIBUTE);
75              if (writeListener == null)
76              {
77                  writeListener = newWriteListener(request, proxyResponse);
78                  request.setAttribute(WRITE_LISTENER_ATTRIBUTE, writeListener);
79  
80                  // Set the data to write before calling setWriteListener(), because
81                  // setWriteListener() may trigger the call to onWritePossible() on
82                  // a different thread and we would have a race.
83                  writeListener.data(buffer, offset, length, callback);
84  
85                  // Setting the WriteListener triggers an invocation to onWritePossible().
86                  response.getOutputStream().setWriteListener(writeListener);
87              }
88              else
89              {
90                  writeListener.data(buffer, offset, length, callback);
91                  writeListener.onWritePossible();
92              }
93          }
94          catch (Throwable x)
95          {
96              callback.failed(x);
97              proxyResponse.abort(x);
98          }
99      }
100 
101     protected StreamWriter newWriteListener(HttpServletRequest request, Response proxyResponse)
102     {
103         return new StreamWriter(request, proxyResponse);
104     }
105 
106     /**
107      * <p>Convenience extension of {@link AsyncProxyServlet} that offers transparent proxy functionalities.</p>
108      *
109      * @see org.eclipse.jetty.proxy.AbstractProxyServlet.TransparentDelegate
110      */
111     public static class Transparent extends AsyncProxyServlet
112     {
113         private final TransparentDelegate delegate = new TransparentDelegate(this);
114 
115         @Override
116         public void init(ServletConfig config) throws ServletException
117         {
118             super.init(config);
119             delegate.init(config);
120         }
121 
122         @Override
123         protected String rewriteTarget(HttpServletRequest clientRequest)
124         {
125             return delegate.rewriteTarget(clientRequest);
126         }
127     }
128 
129     protected class StreamReader extends IteratingCallback implements ReadListener
130     {
131         private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
132         private final HttpServletRequest request;
133         private final HttpServletResponse response;
134         private final Request proxyRequest;
135         private final DeferredContentProvider provider;
136 
137         protected StreamReader(HttpServletRequest request, HttpServletResponse response, Request proxyRequest, DeferredContentProvider provider)
138         {
139             this.request = request;
140             this.response = response;
141             this.proxyRequest = proxyRequest;
142             this.provider = provider;
143         }
144 
145         @Override
146         public void onDataAvailable() throws IOException
147         {
148             iterate();
149         }
150 
151         @Override
152         public void onAllDataRead() throws IOException
153         {
154             if (_log.isDebugEnabled())
155                 _log.debug("{} proxying content to upstream completed", getRequestId(request));
156             provider.close();
157         }
158 
159         @Override
160         public void onError(Throwable t)
161         {
162             onClientRequestFailure(request, proxyRequest, response, t);
163         }
164 
165         @Override
166         protected Action process() throws Exception
167         {
168             int requestId = _log.isDebugEnabled() ? getRequestId(request) : 0;
169             ServletInputStream input = request.getInputStream();
170 
171             // First check for isReady() because it has
172             // side effects, and then for isFinished().
173             while (input.isReady() && !input.isFinished())
174             {
175                 int read = input.read(buffer);
176                 if (_log.isDebugEnabled())
177                     _log.debug("{} asynchronous read {} bytes on {}", requestId, read, input);
178                 if (read > 0)
179                 {
180                     if (_log.isDebugEnabled())
181                         _log.debug("{} proxying content to upstream: {} bytes", requestId, read);
182                     onRequestContent(request, proxyRequest, provider, buffer, 0, read, this);
183                     return Action.SCHEDULED;
184                 }
185             }
186 
187             if (input.isFinished())
188             {
189                 if (_log.isDebugEnabled())
190                     _log.debug("{} asynchronous read complete on {}", requestId, input);
191                 return Action.SUCCEEDED;
192             }
193             else
194             {
195                 if (_log.isDebugEnabled())
196                     _log.debug("{} asynchronous read pending on {}", requestId, input);
197                 return Action.IDLE;
198             }
199         }
200 
201         protected void onRequestContent(HttpServletRequest request, Request proxyRequest, DeferredContentProvider provider, byte[] buffer, int offset, int length, Callback callback)
202         {
203             provider.offer(ByteBuffer.wrap(buffer, offset, length), callback);
204         }
205 
206         @Override
207         public void failed(Throwable x)
208         {
209             super.failed(x);
210             onError(x);
211         }
212     }
213 
214     protected class StreamWriter implements WriteListener
215     {
216         private final HttpServletRequest request;
217         private final Response proxyResponse;
218         private WriteState state;
219         private byte[] buffer;
220         private int offset;
221         private int length;
222         private Callback callback;
223 
224         protected StreamWriter(HttpServletRequest request, Response proxyResponse)
225         {
226             this.request = request;
227             this.proxyResponse = proxyResponse;
228             this.state = WriteState.IDLE;
229         }
230 
231         protected void data(byte[] bytes, int offset, int length, Callback callback)
232         {
233             if (state != WriteState.IDLE)
234                 throw new WritePendingException();
235             this.state = WriteState.READY;
236             this.buffer = bytes;
237             this.offset = offset;
238             this.length = length;
239             this.callback = callback;
240         }
241 
242         @Override
243         public void onWritePossible() throws IOException
244         {
245             int requestId = getRequestId(request);
246             ServletOutputStream output = request.getAsyncContext().getResponse().getOutputStream();
247             if (state == WriteState.READY)
248             {
249                 // There is data to write.
250                 if (_log.isDebugEnabled())
251                     _log.debug("{} asynchronous write start of {} bytes on {}", requestId, length, output);
252                 output.write(buffer, offset, length);
253                 state = WriteState.PENDING;
254                 if (output.isReady())
255                 {
256                     if (_log.isDebugEnabled())
257                         _log.debug("{} asynchronous write of {} bytes completed on {}", requestId, length, output);
258                     complete();
259                 }
260                 else
261                 {
262                     if (_log.isDebugEnabled())
263                         _log.debug("{} asynchronous write of {} bytes pending on {}", requestId, length, output);
264                 }
265             }
266             else if (state == WriteState.PENDING)
267             {
268                 // The write blocked but is now complete.
269                 if (_log.isDebugEnabled())
270                     _log.debug("{} asynchronous write of {} bytes completing on {}", requestId, length, output);
271                 complete();
272             }
273             else
274             {
275                 throw new IllegalStateException();
276             }
277         }
278 
279         protected void complete()
280         {
281             buffer = null;
282             offset = 0;
283             length = 0;
284             Callback c = callback;
285             callback = null;
286             state = WriteState.IDLE;
287             // Call the callback only after the whole state has been reset,
288             // because the callback may trigger a reentrant call and
289             // the state must already be the new one that we reset here.
290             c.succeeded();
291         }
292 
293         @Override
294         public void onError(Throwable failure)
295         {
296             proxyResponse.abort(failure);
297         }
298     }
299 
300     private enum WriteState
301     {
302         READY, PENDING, IDLE
303     }
304 }