1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.http2;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.Queue;
24 import java.util.concurrent.Executor;
25
26 import org.eclipse.jetty.http2.parser.Parser;
27 import org.eclipse.jetty.io.AbstractConnection;
28 import org.eclipse.jetty.io.ByteBufferPool;
29 import org.eclipse.jetty.io.EndPoint;
30 import org.eclipse.jetty.util.BufferUtil;
31 import org.eclipse.jetty.util.Callback;
32 import org.eclipse.jetty.util.ConcurrentArrayQueue;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35 import org.eclipse.jetty.util.thread.ExecutionStrategy;
36
37 public class HTTP2Connection extends AbstractConnection
38 {
39 protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
40
41 private final Queue<Runnable> tasks = new ConcurrentArrayQueue<>();
42 private final ByteBufferPool byteBufferPool;
43 private final Parser parser;
44 private final ISession session;
45 private final int bufferSize;
46 private final HTTP2Producer producer = new HTTP2Producer();
47 private final ExecutionStrategy executionStrategy;
48
49 public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
50 {
51 super(endPoint, executor);
52 this.byteBufferPool = byteBufferPool;
53 this.parser = parser;
54 this.session = session;
55 this.bufferSize = bufferSize;
56 this.executionStrategy = ExecutionStrategy.Factory.instanceFor(producer, executor);
57 }
58
59 public ISession getSession()
60 {
61 return session;
62 }
63
64
65 protected Parser getParser()
66 {
67 return parser;
68 }
69
70 protected void setInputBuffer(ByteBuffer buffer)
71 {
72 producer.buffer = buffer;
73 }
74
75 @Override
76 public void onOpen()
77 {
78 if (LOG.isDebugEnabled())
79 LOG.debug("HTTP2 Open {} ", this);
80 super.onOpen();
81 executionStrategy.execute();
82 }
83
84 @Override
85 public void onClose()
86 {
87 if (LOG.isDebugEnabled())
88 LOG.debug("HTTP2 Close {} ", this);
89 super.onClose();
90 }
91
92 @Override
93 public void onFillable()
94 {
95 if (LOG.isDebugEnabled())
96 LOG.debug("HTTP2 onFillable {} ", this);
97 executionStrategy.execute();
98 }
99
100 private int fill(EndPoint endPoint, ByteBuffer buffer)
101 {
102 try
103 {
104 if (endPoint.isInputShutdown())
105 return -1;
106 return endPoint.fill(buffer);
107 }
108 catch (IOException x)
109 {
110 LOG.debug("Could not read from " + endPoint, x);
111 return -1;
112 }
113 }
114
115 @Override
116 public boolean onIdleExpired()
117 {
118 boolean close = session.onIdleTimeout();
119 boolean idle = isFillInterested();
120 if (close && idle)
121 session.close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.NOOP);
122 return false;
123 }
124
125 protected void offerTask(Runnable task, boolean dispatch)
126 {
127 tasks.offer(task);
128 if (dispatch)
129 executionStrategy.dispatch();
130 else
131 executionStrategy.execute();
132 }
133
134 @Override
135 public void close()
136 {
137
138
139 session.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP);
140 }
141
142 protected class HTTP2Producer implements ExecutionStrategy.Producer
143 {
144 private ByteBuffer buffer;
145
146 @Override
147 public Runnable produce()
148 {
149 Runnable task = tasks.poll();
150 if (LOG.isDebugEnabled())
151 LOG.debug("Dequeued task {}", task);
152 if (task != null)
153 return task;
154
155 if (isFillInterested())
156 return null;
157
158 if (buffer == null)
159 buffer = byteBufferPool.acquire(bufferSize, false);
160 boolean looping = BufferUtil.hasContent(buffer);
161 while (true)
162 {
163 if (looping)
164 {
165 while (buffer.hasRemaining())
166 parser.parse(buffer);
167
168 task = tasks.poll();
169 if (LOG.isDebugEnabled())
170 LOG.debug("Dequeued task {}", task);
171 if (task != null)
172 {
173 release();
174 return task;
175 }
176 }
177
178 int filled = fill(getEndPoint(), buffer);
179 if (LOG.isDebugEnabled())
180 LOG.debug("Filled {} bytes", filled);
181
182 if (filled == 0)
183 {
184 release();
185 fillInterested();
186 return null;
187 }
188 else if (filled < 0)
189 {
190 release();
191 session.onShutdown();
192 return null;
193 }
194
195 looping = true;
196 }
197 }
198
199 private void release()
200 {
201 if (buffer != null && !buffer.hasRemaining())
202 {
203 byteBufferPool.release(buffer);
204 buffer = null;
205 }
206 }
207 }
208 }