1 // 2 // ======================================================================== 3 // Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. 4 // ------------------------------------------------------------------------ 5 // All rights reserved. This program and the accompanying materials 6 // are made available under the terms of the Eclipse Public License v1.0 7 // and Apache License v2.0 which accompanies this distribution. 8 // 9 // The Eclipse Public License is available at 10 // http://www.eclipse.org/legal/epl-v10.html 11 // 12 // The Apache License v2.0 is available at 13 // http://www.opensource.org/licenses/apache2.0.php 14 // 15 // You may elect to redistribute this code under either of these licenses. 16 // ======================================================================== 17 // 18 19 package org.eclipse.jetty.client.util; 20 21 import java.nio.ByteBuffer; 22 import java.util.Iterator; 23 import java.util.NoSuchElementException; 24 import java.util.Queue; 25 import java.util.concurrent.ConcurrentLinkedQueue; 26 import java.util.concurrent.atomic.AtomicBoolean; 27 import java.util.concurrent.atomic.AtomicReference; 28 29 import org.eclipse.jetty.client.AsyncContentProvider; 30 import org.eclipse.jetty.client.api.ContentProvider; 31 import org.eclipse.jetty.client.api.Request; 32 import org.eclipse.jetty.client.api.Response; 33 34 /** 35 * A {@link ContentProvider} that allows to add content after {@link Request#send(Response.CompleteListener)} 36 * has been called, therefore providing the request content at a later time. 37 * <p /> 38 * {@link DeferredContentProvider} can only be used in conjunction with 39 * {@link Request#send(Response.CompleteListener)} (and not with its blocking counterpart {@link Request#send()}) 40 * because it provides content asynchronously. 41 * <p /> 42 * The deferred content is provided once and then fully consumed. 43 * Invocations to the {@link #iterator()} method after the first will return an "empty" iterator 44 * because the stream has been consumed on the first invocation. 45 * However, it is possible for subclasses to override {@link #offer(ByteBuffer)} and {@link #close()} to copy 46 * the content to another location (for example a file) and be able to support multiple invocations 47 * of of {@link #iterator()} returning the iterator provided by this 48 * class on the first invocation, and an iterator on the bytes copied to the other location 49 * for subsequent invocations. 50 * <p /> 51 * Typical usage of {@link DeferredContentProvider} is in asynchronous proxies, where HTTP headers arrive 52 * separately from HTTP content chunks. 53 * <p /> 54 * The deferred content must be provided through {@link #offer(ByteBuffer)}, which can be invoked multiple 55 * times, and when all content has been provided it must be signaled with a call to {@link #close()}. 56 * <p /> 57 * Example usage: 58 * <pre> 59 * HttpClient httpClient = ...; 60 * 61 * // Use try-with-resources to autoclose DeferredContentProvider 62 * try (DeferredContentProvider content = new DeferredContentProvider()) 63 * { 64 * httpClient.newRequest("localhost", 8080) 65 * .content(content) 66 * .send(new Response.CompleteListener() 67 * { 68 * @Override 69 * public void onComplete(Result result) 70 * { 71 * // Your logic here 72 * } 73 * }); 74 * 75 * // At a later time... 76 * content.offer(ByteBuffer.wrap("some content".getBytes())); 77 * } 78 * </pre> 79 */ 80 public class DeferredContentProvider implements AsyncContentProvider, AutoCloseable 81 { 82 private static final ByteBuffer CLOSE = ByteBuffer.allocate(0); 83 84 private final Queue<ByteBuffer> chunks = new ConcurrentLinkedQueue<>(); 85 private final AtomicReference<Listener> listener = new AtomicReference<>(); 86 private final Iterator<ByteBuffer> iterator = new DeferredContentProviderIterator(); 87 private final AtomicBoolean closed = new AtomicBoolean(); 88 89 /** 90 * Creates a new {@link DeferredContentProvider} with the given initial content 91 * 92 * @param buffers the initial content 93 */ 94 public DeferredContentProvider(ByteBuffer... buffers) 95 { 96 for (ByteBuffer buffer : buffers) 97 chunks.offer(buffer); 98 } 99 100 @Override 101 public void setListener(Listener listener) 102 { 103 if (!this.listener.compareAndSet(null, listener)) 104 throw new IllegalStateException(); 105 } 106 107 @Override 108 public long getLength() 109 { 110 return -1; 111 } 112 113 /** 114 * Adds the given content buffer to this content provider 115 * and notifies the listener that content is available. 116 * 117 * @param buffer the content to add 118 * @return true if the content was added, false otherwise 119 */ 120 public boolean offer(ByteBuffer buffer) 121 { 122 boolean result = chunks.offer(buffer); 123 notifyListener(); 124 return result; 125 } 126 127 /** 128 * No more content will be added to this content provider 129 * and notifies the listener that no more content is available. 130 */ 131 public void close() 132 { 133 if (closed.compareAndSet(false, true)) 134 { 135 chunks.offer(CLOSE); 136 notifyListener(); 137 } 138 } 139 140 private void notifyListener() 141 { 142 Listener listener = this.listener.get(); 143 if (listener != null) 144 listener.onContent(); 145 } 146 147 @Override 148 public Iterator<ByteBuffer> iterator() 149 { 150 return iterator; 151 } 152 153 private class DeferredContentProviderIterator implements Iterator<ByteBuffer> 154 { 155 @Override 156 public boolean hasNext() 157 { 158 return chunks.peek() != CLOSE; 159 } 160 161 @Override 162 public ByteBuffer next() 163 { 164 ByteBuffer element = chunks.poll(); 165 if (element == CLOSE) 166 throw new NoSuchElementException(); 167 return element; 168 } 169 170 @Override 171 public void remove() 172 { 173 throw new UnsupportedOperationException(); 174 } 175 } 176 }