1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23
24 import javax.servlet.ServletInputStream;
25
26 import org.eclipse.jetty.util.ArrayQueue;
27 import org.eclipse.jetty.util.log.Log;
28 import org.eclipse.jetty.util.log.Logger;
29
30
31
32
33
34
35
36
37
38
39
40 public abstract class QueuedHttpInput<T> extends HttpInput<T>
41 {
42 private final static Logger LOG = Log.getLogger(QueuedHttpInput.class);
43
44 private final ArrayQueue<T> _inputQ = new ArrayQueue<>(lock());
45
46 public QueuedHttpInput()
47 {}
48
49 public void recycle()
50 {
51 synchronized (lock())
52 {
53 T item = _inputQ.peekUnsafe();
54 while (item != null)
55 {
56 _inputQ.pollUnsafe();
57 onContentConsumed(item);
58
59 item = _inputQ.peekUnsafe();
60 if (item == null)
61 onAllContentConsumed();
62 }
63 super.recycle();
64 }
65 }
66
67 @Override
68 protected T nextContent()
69 {
70 T item = _inputQ.peekUnsafe();
71
72
73 while (item != null && remaining(item) == 0)
74 {
75 _inputQ.pollUnsafe();
76 onContentConsumed(item);
77 LOG.debug("{} consumed {}", this, item);
78 item = _inputQ.peekUnsafe();
79
80
81 if (item==null)
82 onAllContentConsumed();
83 }
84 return item;
85 }
86
87 protected abstract void onContentConsumed(T item);
88
89 protected void blockForContent() throws IOException
90 {
91 synchronized (lock())
92 {
93 while (_inputQ.isEmpty() && !_state.isEOF())
94 {
95 try
96 {
97 LOG.debug("{} waiting for content", this);
98 lock().wait();
99 }
100 catch (InterruptedException e)
101 {
102 throw (IOException)new InterruptedIOException().initCause(e);
103 }
104 }
105 }
106 }
107
108
109
110
111
112 protected void onAllContentConsumed()
113 {
114 }
115
116
117
118
119
120 public void content(T item)
121 {
122
123
124
125
126 synchronized (lock())
127 {
128 boolean empty=_inputQ.isEmpty();
129
130 _inputQ.add(item);
131
132 if (empty)
133 {
134 if (!onAsyncRead())
135 lock().notify();
136 }
137
138 LOG.debug("{} queued {}", this, item);
139 }
140 }
141
142
143 public void earlyEOF()
144 {
145 synchronized (lock())
146 {
147 super.earlyEOF();
148 lock().notify();
149 }
150 }
151
152 public void messageComplete()
153 {
154 synchronized (lock())
155 {
156 super.messageComplete();
157 lock().notify();
158 }
159 }
160
161 }