1 // 2 // ======================================================================== 3 // Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. 4 // ------------------------------------------------------------------------ 5 // All rights reserved. This program and the accompanying materials 6 // are made available under the terms of the Eclipse Public License v1.0 7 // and Apache License v2.0 which accompanies this distribution. 8 // 9 // The Eclipse Public License is available at 10 // http://www.eclipse.org/legal/epl-v10.html 11 // 12 // The Apache License v2.0 is available at 13 // http://www.opensource.org/licenses/apache2.0.php 14 // 15 // You may elect to redistribute this code under either of these licenses. 16 // ======================================================================== 17 // 18 19 package org.eclipse.jetty.client.util; 20 21 import java.io.Closeable; 22 import java.nio.ByteBuffer; 23 import java.util.Iterator; 24 import java.util.NoSuchElementException; 25 import java.util.Queue; 26 import java.util.concurrent.ConcurrentLinkedQueue; 27 import java.util.concurrent.atomic.AtomicBoolean; 28 import java.util.concurrent.atomic.AtomicReference; 29 30 import org.eclipse.jetty.client.AsyncContentProvider; 31 import org.eclipse.jetty.client.api.ContentProvider; 32 import org.eclipse.jetty.client.api.Request; 33 import org.eclipse.jetty.client.api.Response; 34 35 /** 36 * A {@link ContentProvider} that allows to add content after {@link Request#send(Response.CompleteListener)} 37 * has been called, therefore providing the request content at a later time. 38 * <p /> 39 * {@link DeferredContentProvider} can only be used in conjunction with 40 * {@link Request#send(Response.CompleteListener)} (and not with its blocking counterpart {@link Request#send()}) 41 * because it provides content asynchronously. 42 * <p /> 43 * The deferred content is provided once and then fully consumed. 44 * Invocations to the {@link #iterator()} method after the first will return an "empty" iterator 45 * because the stream has been consumed on the first invocation. 46 * However, it is possible for subclasses to override {@link #offer(ByteBuffer)} and {@link #close()} to copy 47 * the content to another location (for example a file) and be able to support multiple invocations 48 * of of {@link #iterator()} returning the iterator provided by this 49 * class on the first invocation, and an iterator on the bytes copied to the other location 50 * for subsequent invocations. 51 * <p /> 52 * Typical usage of {@link DeferredContentProvider} is in asynchronous proxies, where HTTP headers arrive 53 * separately from HTTP content chunks. 54 * <p /> 55 * The deferred content must be provided through {@link #offer(ByteBuffer)}, which can be invoked multiple 56 * times, and when all content has been provided it must be signaled with a call to {@link #close()}. 57 * <p /> 58 * Example usage: 59 * <pre> 60 * HttpClient httpClient = ...; 61 * 62 * // Use try-with-resources to autoclose DeferredContentProvider 63 * try (DeferredContentProvider content = new DeferredContentProvider()) 64 * { 65 * httpClient.newRequest("localhost", 8080) 66 * .content(content) 67 * .send(new Response.CompleteListener() 68 * { 69 * @Override 70 * public void onComplete(Result result) 71 * { 72 * // Your logic here 73 * } 74 * }); 75 * 76 * // At a later time... 77 * content.offer(ByteBuffer.wrap("some content".getBytes())); 78 * } 79 * </pre> 80 */ 81 public class DeferredContentProvider implements AsyncContentProvider, Closeable 82 { 83 private static final ByteBuffer CLOSE = ByteBuffer.allocate(0); 84 85 private final Queue<ByteBuffer> chunks = new ConcurrentLinkedQueue<>(); 86 private final AtomicReference<Listener> listener = new AtomicReference<>(); 87 private final Iterator<ByteBuffer> iterator = new DeferredContentProviderIterator(); 88 private final AtomicBoolean closed = new AtomicBoolean(); 89 90 /** 91 * Creates a new {@link DeferredContentProvider} with the given initial content 92 * 93 * @param buffers the initial content 94 */ 95 public DeferredContentProvider(ByteBuffer... buffers) 96 { 97 for (ByteBuffer buffer : buffers) 98 chunks.offer(buffer); 99 } 100 101 @Override 102 public void setListener(Listener listener) 103 { 104 if (!this.listener.compareAndSet(null, listener)) 105 throw new IllegalStateException(); 106 } 107 108 @Override 109 public long getLength() 110 { 111 return -1; 112 } 113 114 /** 115 * Adds the given content buffer to this content provider 116 * and notifies the listener that content is available. 117 * 118 * @param buffer the content to add 119 * @return true if the content was added, false otherwise 120 */ 121 public boolean offer(ByteBuffer buffer) 122 { 123 boolean result = chunks.offer(buffer); 124 notifyListener(); 125 return result; 126 } 127 128 /** 129 * No more content will be added to this content provider 130 * and notifies the listener that no more content is available. 131 */ 132 public void close() 133 { 134 if (closed.compareAndSet(false, true)) 135 { 136 chunks.offer(CLOSE); 137 notifyListener(); 138 } 139 } 140 141 private void notifyListener() 142 { 143 Listener listener = this.listener.get(); 144 if (listener != null) 145 listener.onContent(); 146 } 147 148 @Override 149 public Iterator<ByteBuffer> iterator() 150 { 151 return iterator; 152 } 153 154 private class DeferredContentProviderIterator implements Iterator<ByteBuffer> 155 { 156 @Override 157 public boolean hasNext() 158 { 159 return chunks.peek() != CLOSE; 160 } 161 162 @Override 163 public ByteBuffer next() 164 { 165 ByteBuffer element = chunks.poll(); 166 if (element == CLOSE) 167 throw new NoSuchElementException(); 168 return element; 169 } 170 171 @Override 172 public void remove() 173 { 174 throw new UnsupportedOperationException(); 175 } 176 } 177 }