View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  
24  import javax.servlet.ServletInputStream;
25  
26  import org.eclipse.jetty.util.ArrayQueue;
27  import org.eclipse.jetty.util.log.Log;
28  import org.eclipse.jetty.util.log.Logger;
29  
30  /**
31   * <p>{@link QueuedHttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.</p>
32   * <p>{@link QueuedHttpInput} holds a queue of items passed to it by calls to {@link #content(Object)}.</p>
33   * <p>{@link QueuedHttpInput} stores the items directly; if the items contain byte buffers, it does not copy them
34   * but simply holds references to the item, thus the caller must organize for those buffers to valid while
35   * held by this class.</p>
36   * <p>To assist the caller, subclasses may override methods {@link #onAsyncRead()},
37   * {@link #onContentConsumed(Object)} and {@link #onAllContentConsumed()} that can be implemented so that the
38   * caller will know when buffers are queued and consumed.</p>
39   */
40  public abstract class QueuedHttpInput<T> extends HttpInput<T>
41  {
42      private final static Logger LOG = Log.getLogger(QueuedHttpInput.class);
43  
44      private final ArrayQueue<T> _inputQ = new ArrayQueue<>(lock());
45      
46      public QueuedHttpInput()
47      {}
48  
49      public void recycle()
50      {
51          synchronized (lock())
52          {
53              T item = _inputQ.peekUnsafe();
54              while (item != null)
55              {
56                  _inputQ.pollUnsafe();
57                  onContentConsumed(item);
58  
59                  item = _inputQ.peekUnsafe();
60                  if (item == null)
61                      onAllContentConsumed();
62              }
63              super.recycle();
64          }
65      }
66  
67      @Override
68      protected T nextContent()
69      {
70          T item = _inputQ.peekUnsafe();
71  
72          // Skip empty items at the head of the queue
73          while (item != null && remaining(item) == 0)
74          {
75              _inputQ.pollUnsafe();
76              onContentConsumed(item);
77              LOG.debug("{} consumed {}", this, item);
78              item = _inputQ.peekUnsafe();
79  
80              // If that was the last item then notify
81              if (item==null)
82                  onAllContentConsumed();
83          }
84          return item;
85      }
86      
87      protected abstract void onContentConsumed(T item);
88  
89      protected void blockForContent() throws IOException
90      {
91          synchronized (lock())
92          {
93              while (_inputQ.isEmpty() && !_state.isEOF())
94              {
95                  try
96                  {
97                      LOG.debug("{} waiting for content", this);
98                      lock().wait();
99                  }
100                 catch (InterruptedException e)
101                 {
102                     throw (IOException)new InterruptedIOException().initCause(e);
103                 }
104             }
105         }
106     }
107 
108 
109     /* ------------------------------------------------------------ */
110     /** Called by this HttpInput to signal all available content has been consumed
111      */
112     protected void onAllContentConsumed()
113     {
114     }
115 
116     /* ------------------------------------------------------------ */
117     /** Add some content to the input stream
118      * @param item
119      */
120     public void content(T item)
121     {
122         // The buffer is not copied here.  This relies on the caller not recycling the buffer
123         // until the it is consumed.  The onContentConsumed and onAllContentConsumed() callbacks are 
124         // the signals to the caller that the buffers can be recycled.
125         
126         synchronized (lock())
127         {
128             boolean empty=_inputQ.isEmpty();
129             
130             _inputQ.add(item);
131 
132             if (empty)
133             {
134                 if (!onAsyncRead())
135                     lock().notify();
136             }
137             
138             LOG.debug("{} queued {}", this, item);
139         }
140     }
141     
142 
143     public void earlyEOF()
144     {
145         synchronized (lock())
146         {
147             super.earlyEOF();
148             lock().notify();
149         }
150     }
151     
152     public void messageComplete()
153     {
154         synchronized (lock())
155         {
156             super.messageComplete();
157             lock().notify();
158         }
159     }
160 
161 }