View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.util;
20  
21  import java.io.Closeable;
22  import java.nio.ByteBuffer;
23  import java.util.Iterator;
24  import java.util.NoSuchElementException;
25  import java.util.Queue;
26  import java.util.concurrent.ConcurrentLinkedQueue;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  import java.util.concurrent.atomic.AtomicReference;
29  
30  import org.eclipse.jetty.client.AsyncContentProvider;
31  import org.eclipse.jetty.client.api.ContentProvider;
32  import org.eclipse.jetty.client.api.Request;
33  import org.eclipse.jetty.client.api.Response;
34  
35  /**
36   * A {@link ContentProvider} that allows to add content after {@link Request#send(Response.CompleteListener)}
37   * has been called, therefore providing the request content at a later time.
38   * <p />
39   * {@link DeferredContentProvider} can only be used in conjunction with
40   * {@link Request#send(Response.CompleteListener)} (and not with its blocking counterpart {@link Request#send()})
41   * because it provides content asynchronously.
42   * <p />
43   * The deferred content is provided once and then fully consumed.
44   * Invocations to the {@link #iterator()} method after the first will return an "empty" iterator
45   * because the stream has been consumed on the first invocation.
46   * However, it is possible for subclasses to override {@link #offer(ByteBuffer)} and {@link #close()} to copy
47   * the content to another location (for example a file) and be able to support multiple invocations
48   * of of {@link #iterator()} returning the iterator provided by this
49    * class on the first invocation, and an iterator on the bytes copied to the other location
50    * for subsequent invocations.
51   * <p />
52   * Typical usage of {@link DeferredContentProvider} is in asynchronous proxies, where HTTP headers arrive
53   * separately from HTTP content chunks.
54   * <p />
55   * The deferred content must be provided through {@link #offer(ByteBuffer)}, which can be invoked multiple
56   * times, and when all content has been provided it must be signaled with a call to {@link #close()}.
57   * <p />
58   * Example usage:
59   * <pre>
60   * HttpClient httpClient = ...;
61   *
62   * // Use try-with-resources to autoclose DeferredContentProvider
63   * try (DeferredContentProvider content = new DeferredContentProvider())
64   * {
65   *     httpClient.newRequest("localhost", 8080)
66   *             .content(content)
67   *             .send(new Response.CompleteListener()
68   *             {
69   *                 &#64Override
70   *                 public void onComplete(Result result)
71   *                 {
72   *                     // Your logic here
73   *                 }
74   *             });
75   *
76   *     // At a later time...
77   *     content.offer(ByteBuffer.wrap("some content".getBytes()));
78   * }
79   * </pre>
80   */
81  public class DeferredContentProvider implements AsyncContentProvider, Closeable
82  {
83      private static final ByteBuffer CLOSE = ByteBuffer.allocate(0);
84  
85      private final Queue<ByteBuffer> chunks = new ConcurrentLinkedQueue<>();
86      private final AtomicReference<Listener> listener = new AtomicReference<>();
87      private final Iterator<ByteBuffer> iterator = new DeferredContentProviderIterator();
88      private final AtomicBoolean closed = new AtomicBoolean();
89  
90      /**
91       * Creates a new {@link DeferredContentProvider} with the given initial content
92       *
93       * @param buffers the initial content
94       */
95      public DeferredContentProvider(ByteBuffer... buffers)
96      {
97          for (ByteBuffer buffer : buffers)
98              chunks.offer(buffer);
99      }
100 
101     @Override
102     public void setListener(Listener listener)
103     {
104         if (!this.listener.compareAndSet(null, listener))
105             throw new IllegalStateException();
106     }
107 
108     @Override
109     public long getLength()
110     {
111         return -1;
112     }
113 
114     /**
115      * Adds the given content buffer to this content provider
116      * and notifies the listener that content is available.
117      *
118      * @param buffer the content to add
119      * @return true if the content was added, false otherwise
120      */
121     public boolean offer(ByteBuffer buffer)
122     {
123         boolean result = chunks.offer(buffer);
124         notifyListener();
125         return result;
126     }
127 
128     /**
129      * No more content will be added to this content provider
130      * and notifies the listener that no more content is available.
131      */
132     public void close()
133     {
134         if (closed.compareAndSet(false, true))
135         {
136             chunks.offer(CLOSE);
137             notifyListener();
138         }
139     }
140 
141     private void notifyListener()
142     {
143         Listener listener = this.listener.get();
144         if (listener != null)
145             listener.onContent();
146     }
147 
148     @Override
149     public Iterator<ByteBuffer> iterator()
150     {
151         return iterator;
152     }
153 
154     private class DeferredContentProviderIterator implements Iterator<ByteBuffer>
155     {
156         @Override
157         public boolean hasNext()
158         {
159             return chunks.peek() != CLOSE;
160         }
161 
162         @Override
163         public ByteBuffer next()
164         {
165             ByteBuffer element = chunks.poll();
166             if (element == CLOSE)
167                 throw new NoSuchElementException();
168             return element;
169         }
170 
171         @Override
172         public void remove()
173         {
174             throw new UnsupportedOperationException();
175         }
176     }
177 }