View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.proxy;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.nio.ByteBuffer;
24  import java.nio.channels.WritePendingException;
25  import javax.servlet.ReadListener;
26  import javax.servlet.ServletConfig;
27  import javax.servlet.ServletException;
28  import javax.servlet.ServletInputStream;
29  import javax.servlet.ServletOutputStream;
30  import javax.servlet.WriteListener;
31  import javax.servlet.http.HttpServletRequest;
32  import javax.servlet.http.HttpServletResponse;
33  
34  import org.eclipse.jetty.client.api.ContentProvider;
35  import org.eclipse.jetty.client.api.Request;
36  import org.eclipse.jetty.client.api.Response;
37  import org.eclipse.jetty.client.util.DeferredContentProvider;
38  import org.eclipse.jetty.util.Callback;
39  
40  public class AsyncProxyServlet extends ProxyServlet
41  {
42      private static final String WRITE_LISTENER_ATTRIBUTE = AsyncProxyServlet.class.getName() + ".writeListener";
43  
44      @Override
45      protected ContentProvider proxyRequestContent(Request proxyRequest, HttpServletRequest request) throws IOException
46      {
47          ServletInputStream input = request.getInputStream();
48          DeferredContentProvider provider = new DeferredContentProvider();
49          input.setReadListener(new StreamReader(proxyRequest, request, provider));
50          return provider;
51      }
52  
53      @Override
54      protected void onResponseContent(HttpServletRequest request, HttpServletResponse response, Response proxyResponse, byte[] buffer, int offset, int length, Callback callback)
55      {
56          try
57          {
58              _log.debug("{} proxying content to downstream: {} bytes", getRequestId(request), length);
59              StreamWriter writeListener = (StreamWriter)request.getAttribute(WRITE_LISTENER_ATTRIBUTE);
60              if (writeListener == null)
61              {
62                  writeListener = new StreamWriter(request, proxyResponse);
63                  request.setAttribute(WRITE_LISTENER_ATTRIBUTE, writeListener);
64  
65                  // Set the data to write before calling setWriteListener(), because
66                  // setWriteListener() may trigger the call to onWritePossible() on
67                  // a different thread and we would have a race.
68                  writeListener.data(buffer, offset, length, callback);
69  
70                  // Setting the WriteListener triggers an invocation to onWritePossible().
71                  response.getOutputStream().setWriteListener(writeListener);
72              }
73              else
74              {
75                  writeListener.data(buffer, offset, length, callback);
76                  writeListener.onWritePossible();
77              }
78          }
79          catch (Throwable x)
80          {
81              callback.failed(x);
82              onResponseFailure(request, response, proxyResponse, x);
83          }
84      }
85      
86      public static class Transparent extends AsyncProxyServlet
87      {
88          private final TransparentDelegate delegate = new TransparentDelegate(this);
89  
90          @Override
91          public void init(ServletConfig config) throws ServletException
92          {
93              super.init(config);
94              delegate.init(config);
95          }
96  
97          @Override
98          protected URI rewriteURI(HttpServletRequest request)
99          {
100             return delegate.rewriteURI(request);
101         }
102     }
103 
104     private class StreamReader implements ReadListener, Callback
105     {
106         private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
107         private final Request proxyRequest;
108         private final HttpServletRequest request;
109         private final DeferredContentProvider provider;
110 
111         public StreamReader(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider)
112         {
113             this.proxyRequest = proxyRequest;
114             this.request = request;
115             this.provider = provider;
116         }
117 
118         @Override
119         public void onDataAvailable() throws IOException
120         {
121             int requestId = getRequestId(request);
122             ServletInputStream input = request.getInputStream();
123             _log.debug("{} asynchronous read start on {}", requestId, input);
124 
125             // First check for isReady() because it has
126             // side effects, and then for isFinished().
127             while (input.isReady() && !input.isFinished())
128             {
129                 int read = input.read(buffer);
130                 _log.debug("{} asynchronous read {} bytes on {}", requestId, read, input);
131                 if (read > 0)
132                 {
133                     _log.debug("{} proxying content to upstream: {} bytes", requestId, read);
134                     provider.offer(ByteBuffer.wrap(buffer, 0, read), this);
135                     // Do not call isReady() so that we can apply backpressure.
136                     break;
137                 }
138             }
139             if (!input.isFinished())
140                 _log.debug("{} asynchronous read pending on {}", requestId, input);
141         }
142 
143         @Override
144         public void onAllDataRead() throws IOException
145         {
146             _log.debug("{} proxying content to upstream completed", getRequestId(request));
147             provider.close();
148         }
149 
150         @Override
151         public void onError(Throwable x)
152         {
153             failed(x);
154         }
155 
156         @Override
157         public void succeeded()
158         {
159             try
160             {
161                 if (request.getInputStream().isReady())
162                     onDataAvailable();
163             }
164             catch (Throwable x)
165             {
166                 failed(x);
167             }
168         }
169 
170         @Override
171         public void failed(Throwable x)
172         {
173             onClientRequestFailure(proxyRequest, request, x);
174         }
175     }
176 
177     private class StreamWriter implements WriteListener
178     {
179         private final HttpServletRequest request;
180         private final Response proxyResponse;
181         private WriteState state;
182         private byte[] buffer;
183         private int offset;
184         private int length;
185         private Callback callback;
186 
187         private StreamWriter(HttpServletRequest request, Response proxyResponse)
188         {
189             this.request = request;
190             this.proxyResponse = proxyResponse;
191             this.state = WriteState.IDLE;
192         }
193 
194         private void data(byte[] bytes, int offset, int length, Callback callback)
195         {
196             if (state != WriteState.IDLE)
197                 throw new WritePendingException();
198             this.state = WriteState.READY;
199             this.buffer = bytes;
200             this.offset = offset;
201             this.length = length;
202             this.callback = callback;
203         }
204 
205         @Override
206         public void onWritePossible() throws IOException
207         {
208             int requestId = getRequestId(request);
209             ServletOutputStream output = request.getAsyncContext().getResponse().getOutputStream();
210             if (state == WriteState.READY)
211             {
212                 // There is data to write.
213                 _log.debug("{} asynchronous write start of {} bytes on {}", requestId, length, output);
214                 output.write(buffer, offset, length);
215                 state = WriteState.PENDING;
216                 if (output.isReady())
217                 {
218                     _log.debug("{} asynchronous write of {} bytes completed on {}", requestId, length, output);
219                     complete();
220                 }
221                 else
222                 {
223                     _log.debug("{} asynchronous write of {} bytes pending on {}", requestId, length, output);
224                 }
225             }
226             else if (state == WriteState.PENDING)
227             {
228                 // The write blocked but is now complete.
229                 _log.debug("{} asynchronous write of {} bytes completing on {}", requestId, length, output);
230                 complete();
231             }
232             else
233             {
234                 throw new IllegalStateException();
235             }
236         }
237 
238         private void complete()
239         {
240             buffer = null;
241             offset = 0;
242             length = 0;
243             Callback c = callback;
244             callback = null;
245             state = WriteState.IDLE;
246             // Call the callback only after the whole state has been reset,
247             // because the callback may trigger a reentrant call and
248             // the state must already be the new one that we reset here.
249             c.succeeded();
250         }
251 
252         @Override
253         public void onError(Throwable failure)
254         {
255             HttpServletResponse response = (HttpServletResponse)request.getAsyncContext().getResponse();
256             onResponseFailure(request, response, proxyResponse, failure);
257         }
258     }
259 
260     private enum WriteState
261     {
262         READY, PENDING, IDLE
263     }
264 }