View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.proxy;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.nio.ByteBuffer;
24  import java.nio.channels.WritePendingException;
25  import javax.servlet.ReadListener;
26  import javax.servlet.ServletConfig;
27  import javax.servlet.ServletException;
28  import javax.servlet.ServletInputStream;
29  import javax.servlet.ServletOutputStream;
30  import javax.servlet.WriteListener;
31  import javax.servlet.http.HttpServletRequest;
32  import javax.servlet.http.HttpServletResponse;
33  
34  import org.eclipse.jetty.client.api.ContentProvider;
35  import org.eclipse.jetty.client.api.Request;
36  import org.eclipse.jetty.client.api.Response;
37  import org.eclipse.jetty.client.util.DeferredContentProvider;
38  import org.eclipse.jetty.util.Callback;
39  import org.eclipse.jetty.util.IteratingCallback;
40  
41  public class AsyncProxyServlet extends ProxyServlet
42  {
43      private static final String WRITE_LISTENER_ATTRIBUTE = AsyncProxyServlet.class.getName() + ".writeListener";
44  
45      @Override
46      protected ContentProvider proxyRequestContent(Request proxyRequest, HttpServletRequest request) throws IOException
47      {
48          ServletInputStream input = request.getInputStream();
49          DeferredContentProvider provider = new DeferredContentProvider();
50          input.setReadListener(newReadListener(proxyRequest, request, provider));
51          return provider;
52      }
53  
54      protected ReadListener newReadListener(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider)
55      {
56          return new StreamReader(proxyRequest, request, provider);
57      }
58  
59      @Override
60      protected void onResponseContent(HttpServletRequest request, HttpServletResponse response, Response proxyResponse, byte[] buffer, int offset, int length, Callback callback)
61      {
62          try
63          {
64              if (_log.isDebugEnabled())
65                  _log.debug("{} proxying content to downstream: {} bytes", getRequestId(request), length);
66              StreamWriter writeListener = (StreamWriter)request.getAttribute(WRITE_LISTENER_ATTRIBUTE);
67              if (writeListener == null)
68              {
69                  writeListener = newWriteListener(request, proxyResponse);
70                  request.setAttribute(WRITE_LISTENER_ATTRIBUTE, writeListener);
71  
72                  // Set the data to write before calling setWriteListener(), because
73                  // setWriteListener() may trigger the call to onWritePossible() on
74                  // a different thread and we would have a race.
75                  writeListener.data(buffer, offset, length, callback);
76  
77                  // Setting the WriteListener triggers an invocation to onWritePossible().
78                  response.getOutputStream().setWriteListener(writeListener);
79              }
80              else
81              {
82                  writeListener.data(buffer, offset, length, callback);
83                  writeListener.onWritePossible();
84              }
85          }
86          catch (Throwable x)
87          {
88              callback.failed(x);
89              proxyResponse.abort(x);
90          }
91      }
92  
93      protected StreamWriter newWriteListener(HttpServletRequest request, Response proxyResponse)
94      {
95          return new StreamWriter(request, proxyResponse);
96      }
97  
98      public static class Transparent extends AsyncProxyServlet
99      {
100         private final TransparentDelegate delegate = new TransparentDelegate(this);
101 
102         @Override
103         public void init(ServletConfig config) throws ServletException
104         {
105             super.init(config);
106             delegate.init(config);
107         }
108 
109         @Override
110         protected URI rewriteURI(HttpServletRequest request)
111         {
112             return delegate.rewriteURI(request);
113         }
114     }
115 
116     protected class StreamReader extends IteratingCallback implements ReadListener
117     {
118         private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
119         private final Request proxyRequest;
120         private final HttpServletRequest request;
121         private final DeferredContentProvider provider;
122 
123         protected StreamReader(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider)
124         {
125             this.proxyRequest = proxyRequest;
126             this.request = request;
127             this.provider = provider;
128         }
129 
130         @Override
131         public void onDataAvailable() throws IOException
132         {
133             iterate();
134         }
135 
136         @Override
137         public void onAllDataRead() throws IOException
138         {
139             if (_log.isDebugEnabled())
140                 _log.debug("{} proxying content to upstream completed", getRequestId(request));
141             provider.close();
142         }
143 
144         @Override
145         public void onError(Throwable t)
146         {
147             onClientRequestFailure(proxyRequest, request, t);
148         }
149 
150         @Override
151         protected Action process() throws Exception
152         {
153             int requestId = _log.isDebugEnabled() ? getRequestId(request) : 0;
154             ServletInputStream input = request.getInputStream();
155 
156             // First check for isReady() because it has
157             // side effects, and then for isFinished().
158             while (input.isReady() && !input.isFinished())
159             {
160                 int read = input.read(buffer);
161                 if (_log.isDebugEnabled())
162                     _log.debug("{} asynchronous read {} bytes on {}", requestId, read, input);
163                 if (read > 0)
164                 {
165                     if (_log.isDebugEnabled())
166                         _log.debug("{} proxying content to upstream: {} bytes", requestId, read);
167                     onRequestContent(proxyRequest, request, provider, buffer, 0, read, this);
168                     return Action.SCHEDULED;
169                 }
170             }
171 
172             if (input.isFinished())
173             {
174                 if (_log.isDebugEnabled())
175                     _log.debug("{} asynchronous read complete on {}", requestId, input);
176                 return Action.SUCCEEDED;
177             }
178             else
179             {
180                 if (_log.isDebugEnabled())
181                     _log.debug("{} asynchronous read pending on {}", requestId, input);
182                 return Action.IDLE;
183             }
184         }
185 
186         protected void onRequestContent(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider, byte[] buffer, int offset, int length, Callback callback)
187         {
188             provider.offer(ByteBuffer.wrap(buffer, offset, length), callback);
189         }
190 
191         @Override
192         public void failed(Throwable x)
193         {
194             super.failed(x);
195             onError(x);
196         }
197     }
198 
199     protected class StreamWriter implements WriteListener
200     {
201         private final HttpServletRequest request;
202         private final Response proxyResponse;
203         private WriteState state;
204         private byte[] buffer;
205         private int offset;
206         private int length;
207         private Callback callback;
208 
209         protected StreamWriter(HttpServletRequest request, Response proxyResponse)
210         {
211             this.request = request;
212             this.proxyResponse = proxyResponse;
213             this.state = WriteState.IDLE;
214         }
215 
216         protected void data(byte[] bytes, int offset, int length, Callback callback)
217         {
218             if (state != WriteState.IDLE)
219                 throw new WritePendingException();
220             this.state = WriteState.READY;
221             this.buffer = bytes;
222             this.offset = offset;
223             this.length = length;
224             this.callback = callback;
225         }
226 
227         @Override
228         public void onWritePossible() throws IOException
229         {
230             int requestId = getRequestId(request);
231             ServletOutputStream output = request.getAsyncContext().getResponse().getOutputStream();
232             if (state == WriteState.READY)
233             {
234                 // There is data to write.
235                 if (_log.isDebugEnabled())
236                     _log.debug("{} asynchronous write start of {} bytes on {}", requestId, length, output);
237                 output.write(buffer, offset, length);
238                 state = WriteState.PENDING;
239                 if (output.isReady())
240                 {
241                     if (_log.isDebugEnabled())
242                         _log.debug("{} asynchronous write of {} bytes completed on {}", requestId, length, output);
243                     complete();
244                 }
245                 else
246                 {
247                     if (_log.isDebugEnabled())
248                         _log.debug("{} asynchronous write of {} bytes pending on {}", requestId, length, output);
249                 }
250             }
251             else if (state == WriteState.PENDING)
252             {
253                 // The write blocked but is now complete.
254                 if (_log.isDebugEnabled())
255                     _log.debug("{} asynchronous write of {} bytes completing on {}", requestId, length, output);
256                 complete();
257             }
258             else
259             {
260                 throw new IllegalStateException();
261             }
262         }
263 
264         protected void complete()
265         {
266             buffer = null;
267             offset = 0;
268             length = 0;
269             Callback c = callback;
270             callback = null;
271             state = WriteState.IDLE;
272             // Call the callback only after the whole state has been reset,
273             // because the callback may trigger a reentrant call and
274             // the state must already be the new one that we reset here.
275             c.succeeded();
276         }
277 
278         @Override
279         public void onError(Throwable failure)
280         {
281             proxyResponse.abort(failure);
282         }
283     }
284 
285     private enum WriteState
286     {
287         READY, PENDING, IDLE
288     }
289 }