1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.http2;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.ArrayDeque;
24 import java.util.Queue;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.atomic.AtomicLong;
27
28 import org.eclipse.jetty.http2.parser.Parser;
29 import org.eclipse.jetty.io.AbstractConnection;
30 import org.eclipse.jetty.io.ByteBufferPool;
31 import org.eclipse.jetty.io.EndPoint;
32 import org.eclipse.jetty.util.BufferUtil;
33 import org.eclipse.jetty.util.Callback;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.log.Logger;
36 import org.eclipse.jetty.util.thread.ExecutionStrategy;
37
38 public class HTTP2Connection extends AbstractConnection
39 {
40 protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
41
42 private final Queue<Runnable> tasks = new ArrayDeque<>();
43 private final HTTP2Producer producer = new HTTP2Producer();
44 private final AtomicLong bytesIn = new AtomicLong();
45 private final ByteBufferPool byteBufferPool;
46 private final Parser parser;
47 private final ISession session;
48 private final int bufferSize;
49 private final ExecutionStrategy executionStrategy;
50
51 public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize, ExecutionStrategy.Factory executionFactory)
52 {
53 super(endPoint, executor);
54 this.byteBufferPool = byteBufferPool;
55 this.parser = parser;
56 this.session = session;
57 this.bufferSize = bufferSize;
58 this.executionStrategy = executionFactory.newExecutionStrategy(producer, executor);
59 }
60
61 @Override
62 public long getBytesIn()
63 {
64 return bytesIn.get();
65 }
66
67 @Override
68 public long getBytesOut()
69 {
70 return session.getBytesWritten();
71 }
72
73 public ISession getSession()
74 {
75 return session;
76 }
77
78 protected Parser getParser()
79 {
80 return parser;
81 }
82
83 protected void setInputBuffer(ByteBuffer buffer)
84 {
85 producer.buffer = buffer;
86 }
87
88 @Override
89 public void onOpen()
90 {
91 if (LOG.isDebugEnabled())
92 LOG.debug("HTTP2 Open {} ", this);
93 super.onOpen();
94 executionStrategy.execute();
95 }
96
97 @Override
98 public void onClose()
99 {
100 if (LOG.isDebugEnabled())
101 LOG.debug("HTTP2 Close {} ", this);
102 super.onClose();
103 }
104
105 @Override
106 public void onFillable()
107 {
108 if (LOG.isDebugEnabled())
109 LOG.debug("HTTP2 onFillable {} ", this);
110 executionStrategy.execute();
111 }
112
113 private int fill(EndPoint endPoint, ByteBuffer buffer)
114 {
115 try
116 {
117 if (endPoint.isInputShutdown())
118 return -1;
119 return endPoint.fill(buffer);
120 }
121 catch (IOException x)
122 {
123 LOG.debug("Could not read from " + endPoint, x);
124 return -1;
125 }
126 }
127
128 @Override
129 public boolean onIdleExpired()
130 {
131 boolean idle = isFillInterested();
132 if (idle)
133 {
134 boolean close = session.onIdleTimeout();
135 if (close)
136 session.close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.NOOP);
137 }
138 return false;
139 }
140
141 protected void offerTask(Runnable task, boolean dispatch)
142 {
143 offerTask(task);
144 if (dispatch)
145 executionStrategy.dispatch();
146 else
147 executionStrategy.execute();
148 }
149
150 @Override
151 public void close()
152 {
153
154
155 session.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP);
156 }
157
158 private void offerTask(Runnable task)
159 {
160 synchronized (this)
161 {
162 tasks.offer(task);
163 }
164 }
165
166 private Runnable pollTask()
167 {
168 synchronized (this)
169 {
170 return tasks.poll();
171 }
172 }
173
174 protected class HTTP2Producer implements ExecutionStrategy.Producer
175 {
176 private final Callback fillCallback = new FillCallback();
177 private ByteBuffer buffer;
178
179 @Override
180 public Runnable produce()
181 {
182 Runnable task = pollTask();
183 if (LOG.isDebugEnabled())
184 LOG.debug("Dequeued task {}", task);
185 if (task != null)
186 return task;
187
188 if (isFillInterested())
189 return null;
190
191 if (buffer == null)
192 buffer = byteBufferPool.acquire(bufferSize, false);
193 boolean looping = BufferUtil.hasContent(buffer);
194 while (true)
195 {
196 if (looping)
197 {
198 while (buffer.hasRemaining())
199 parser.parse(buffer);
200
201 task = pollTask();
202 if (LOG.isDebugEnabled())
203 LOG.debug("Dequeued new task {}", task);
204 if (task != null)
205 {
206 release();
207 return task;
208 }
209 }
210
211 int filled = fill(getEndPoint(), buffer);
212 if (LOG.isDebugEnabled())
213 LOG.debug("Filled {} bytes", filled);
214
215 if (filled == 0)
216 {
217 release();
218 getEndPoint().fillInterested(fillCallback);
219 return null;
220 }
221 else if (filled < 0)
222 {
223 release();
224 session.onShutdown();
225 return null;
226 }
227 else
228 {
229 bytesIn.addAndGet(filled);
230 }
231
232 looping = true;
233 }
234 }
235
236 private void release()
237 {
238 if (buffer != null && !buffer.hasRemaining())
239 {
240 byteBufferPool.release(buffer);
241 buffer = null;
242 }
243 }
244 }
245
246 private class FillCallback implements Callback.NonBlocking
247 {
248 @Override
249 public void succeeded()
250 {
251 onFillable();
252 }
253
254 @Override
255 public void failed(Throwable x)
256 {
257 onFillInterestedFailed(x);
258 }
259 }
260 }