View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  
24  import org.eclipse.jetty.util.ArrayQueue;
25  import org.eclipse.jetty.util.log.Log;
26  import org.eclipse.jetty.util.log.Logger;
27  
28  /**
29   * {@link QueuedHttpInput} holds a queue of items passed to it by calls to {@link #content(Object)}.
30   * <p/>
31   * {@link QueuedHttpInput} stores the items directly; if the items contain byte buffers, it does not copy them
32   * but simply holds references to the item, thus the caller must organize for those buffers to valid while
33   * held by this class.
34   * <p/>
35   * To assist the caller, subclasses may override methods {@link #onAsyncRead()}, {@link #onContentConsumed(Object)}
36   * that can be implemented so that the caller will know when buffers are queued and consumed.
37   */
38  public abstract class QueuedHttpInput<T> extends HttpInput<T>
39  {
40      private final static Logger LOG = Log.getLogger(QueuedHttpInput.class);
41  
42      private final ArrayQueue<T> _inputQ = new ArrayQueue<>(lock());
43  
44      public QueuedHttpInput()
45      {
46      }
47  
48      public void content(T item)
49      {
50          // The buffer is not copied here.  This relies on the caller not recycling the buffer
51          // until the it is consumed.  The onContentConsumed and onAllContentConsumed() callbacks are
52          // the signals to the caller that the buffers can be recycled.
53  
54          synchronized (lock())
55          {
56              boolean wasEmpty = _inputQ.isEmpty();
57              _inputQ.add(item);
58              if (LOG.isDebugEnabled())
59                  LOG.debug("{} queued {}", this, item);
60              if (wasEmpty)
61              {
62                  if (!onAsyncRead())
63                      lock().notify();
64              }
65          }
66      }
67  
68      public void recycle()
69      {
70          synchronized (lock())
71          {
72              T item = _inputQ.pollUnsafe();
73              while (item != null)
74              {
75                  onContentConsumed(item);
76                  item = _inputQ.pollUnsafe();
77              }
78              super.recycle();
79          }
80      }
81  
82      @Override
83      protected T nextContent()
84      {
85          synchronized (lock())
86          {
87              // Items are removed only when they are fully consumed.
88              T item = _inputQ.peekUnsafe();
89              // Skip consumed items at the head of the queue.
90              while (item != null && remaining(item) == 0)
91              {
92                  _inputQ.pollUnsafe();
93                  onContentConsumed(item);
94                  if (LOG.isDebugEnabled())
95                      LOG.debug("{} consumed {}", this, item);
96                  item = _inputQ.peekUnsafe();
97              }
98              return item;
99          }
100     }
101 
102     protected void blockForContent() throws IOException
103     {
104         synchronized (lock())
105         {
106             while (_inputQ.isEmpty() && !isFinished() && !isEOF())
107             {
108                 try
109                 {
110                     if (LOG.isDebugEnabled())
111                         LOG.debug("{} waiting for content", this);
112                     lock().wait();
113                 }
114                 catch (InterruptedException e)
115                 {
116                     throw (IOException)new InterruptedIOException().initCause(e);
117                 }
118             }
119         }
120     }
121 
122     /**
123      * Callback that signals that the given content has been consumed.
124      *
125      * @param item the consumed content
126      */
127     protected abstract void onContentConsumed(T item);
128 
129     public void earlyEOF()
130     {
131         synchronized (lock())
132         {
133             super.earlyEOF();
134             lock().notify();
135         }
136     }
137 
138     public void messageComplete()
139     {
140         synchronized (lock())
141         {
142             super.messageComplete();
143             lock().notify();
144         }
145     }
146 }