1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23
24 import org.eclipse.jetty.util.ArrayQueue;
25 import org.eclipse.jetty.util.log.Log;
26 import org.eclipse.jetty.util.log.Logger;
27
28
29
30
31
32
33
34
35
36
37
38 public abstract class QueuedHttpInput<T> extends HttpInput<T>
39 {
40 private final static Logger LOG = Log.getLogger(QueuedHttpInput.class);
41
42 private final ArrayQueue<T> _inputQ = new ArrayQueue<>(lock());
43
44 public QueuedHttpInput()
45 {
46 }
47
48 public void content(T item)
49 {
50
51
52
53
54 synchronized (lock())
55 {
56 boolean wasEmpty = _inputQ.isEmpty();
57 _inputQ.add(item);
58 if (LOG.isDebugEnabled())
59 LOG.debug("{} queued {}", this, item);
60 if (wasEmpty)
61 {
62 if (!onAsyncRead())
63 lock().notify();
64 }
65 }
66 }
67
68 public void recycle()
69 {
70 synchronized (lock())
71 {
72 T item = _inputQ.pollUnsafe();
73 while (item != null)
74 {
75 onContentConsumed(item);
76 item = _inputQ.pollUnsafe();
77 }
78 super.recycle();
79 }
80 }
81
82 @Override
83 protected T nextContent()
84 {
85 synchronized (lock())
86 {
87
88 T item = _inputQ.peekUnsafe();
89
90 while (item != null && remaining(item) == 0)
91 {
92 _inputQ.pollUnsafe();
93 onContentConsumed(item);
94 if (LOG.isDebugEnabled())
95 LOG.debug("{} consumed {}", this, item);
96 item = _inputQ.peekUnsafe();
97 }
98 return item;
99 }
100 }
101
102 protected void blockForContent() throws IOException
103 {
104 synchronized (lock())
105 {
106 while (_inputQ.isEmpty() && !isFinished() && !isEOF())
107 {
108 try
109 {
110 if (LOG.isDebugEnabled())
111 LOG.debug("{} waiting for content", this);
112 lock().wait();
113 }
114 catch (InterruptedException e)
115 {
116 throw (IOException)new InterruptedIOException().initCause(e);
117 }
118 }
119 }
120 }
121
122
123
124
125
126
127 protected abstract void onContentConsumed(T item);
128
129 public void earlyEOF()
130 {
131 synchronized (lock())
132 {
133 super.earlyEOF();
134 lock().notify();
135 }
136 }
137
138 public void messageComplete()
139 {
140 synchronized (lock())
141 {
142 super.messageComplete();
143 lock().notify();
144 }
145 }
146 }