1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.http2;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.Queue;
24 import java.util.concurrent.Executor;
25
26 import org.eclipse.jetty.http2.parser.Parser;
27 import org.eclipse.jetty.io.AbstractConnection;
28 import org.eclipse.jetty.io.ByteBufferPool;
29 import org.eclipse.jetty.io.EndPoint;
30 import org.eclipse.jetty.util.BufferUtil;
31 import org.eclipse.jetty.util.ConcurrentArrayQueue;
32 import org.eclipse.jetty.util.log.Log;
33 import org.eclipse.jetty.util.log.Logger;
34 import org.eclipse.jetty.util.thread.ExecutionStrategy;
35
36 public class HTTP2Connection extends AbstractConnection
37 {
38 protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
39
40 private final Queue<Runnable> tasks = new ConcurrentArrayQueue<>();
41 private final ByteBufferPool byteBufferPool;
42 private final Parser parser;
43 private final ISession session;
44 private final int bufferSize;
45 private final HTTP2Producer producer = new HTTP2Producer();
46 private final ExecutionStrategy executionStrategy;
47
48 public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
49 {
50 super(endPoint, executor);
51 this.byteBufferPool = byteBufferPool;
52 this.parser = parser;
53 this.session = session;
54 this.bufferSize = bufferSize;
55 this.executionStrategy = ExecutionStrategy.Factory.instanceFor(producer, executor);
56 }
57
58 public ISession getSession()
59 {
60 return session;
61 }
62
63
64 protected Parser getParser()
65 {
66 return parser;
67 }
68
69 protected void setInputBuffer(ByteBuffer buffer)
70 {
71 producer.buffer = buffer;
72 }
73
74 @Override
75 public void onOpen()
76 {
77 if (LOG.isDebugEnabled())
78 LOG.debug("HTTP2 Open {} ", this);
79 super.onOpen();
80 executionStrategy.execute();
81 }
82
83 @Override
84 public void onClose()
85 {
86 if (LOG.isDebugEnabled())
87 LOG.debug("HTTP2 Close {} ", this);
88 super.onClose();
89 }
90
91 @Override
92 public void onFillable()
93 {
94 if (LOG.isDebugEnabled())
95 LOG.debug("HTTP2 onFillable {} ", this);
96 executionStrategy.execute();
97 }
98
99 private int fill(EndPoint endPoint, ByteBuffer buffer)
100 {
101 try
102 {
103 if (endPoint.isInputShutdown())
104 return -1;
105 return endPoint.fill(buffer);
106 }
107 catch (IOException x)
108 {
109 LOG.debug("Could not read from " + endPoint, x);
110 return -1;
111 }
112 }
113
114 @Override
115 protected boolean onReadTimeout()
116 {
117 if (LOG.isDebugEnabled())
118 LOG.debug("Idle timeout {}ms expired on {}", getEndPoint().getIdleTimeout(), this);
119 session.onIdleTimeout();
120 return false;
121 }
122
123 protected void offerTask(Runnable task, boolean dispatch)
124 {
125 tasks.offer(task);
126 if (dispatch)
127 executionStrategy.dispatch();
128 else
129 executionStrategy.execute();
130 }
131
132 protected class HTTP2Producer implements ExecutionStrategy.Producer
133 {
134 private ByteBuffer buffer;
135
136 @Override
137 public Runnable produce()
138 {
139 Runnable task = tasks.poll();
140 if (LOG.isDebugEnabled())
141 LOG.debug("Dequeued task {}", task);
142 if (task != null)
143 return task;
144
145 if (isFillInterested())
146 return null;
147
148 if (buffer == null)
149 buffer = byteBufferPool.acquire(bufferSize, false);
150 boolean looping = BufferUtil.hasContent(buffer);
151 while (true)
152 {
153 if (looping)
154 {
155 while (buffer.hasRemaining())
156 parser.parse(buffer);
157
158 task = tasks.poll();
159 if (LOG.isDebugEnabled())
160 LOG.debug("Dequeued task {}", task);
161 if (task != null)
162 {
163 release();
164 return task;
165 }
166 }
167
168 int filled = fill(getEndPoint(), buffer);
169 if (LOG.isDebugEnabled())
170 LOG.debug("Filled {} bytes", filled);
171
172 if (filled == 0)
173 {
174 release();
175 fillInterested();
176 return null;
177 }
178 else if (filled < 0)
179 {
180 release();
181 session.onShutdown();
182 return null;
183 }
184
185 looping = true;
186 }
187 }
188
189 private void release()
190 {
191 if (buffer != null && !buffer.hasRemaining())
192 {
193 byteBufferPool.release(buffer);
194 buffer = null;
195 }
196 }
197 }
198 }