View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.ArrayDeque;
24  import java.util.Queue;
25  import java.util.concurrent.Executor;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  import org.eclipse.jetty.http2.parser.Parser;
29  import org.eclipse.jetty.io.AbstractConnection;
30  import org.eclipse.jetty.io.ByteBufferPool;
31  import org.eclipse.jetty.io.EndPoint;
32  import org.eclipse.jetty.util.BufferUtil;
33  import org.eclipse.jetty.util.Callback;
34  import org.eclipse.jetty.util.log.Log;
35  import org.eclipse.jetty.util.log.Logger;
36  import org.eclipse.jetty.util.thread.ExecutionStrategy;
37  
38  public class HTTP2Connection extends AbstractConnection
39  {
40      protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
41  
42      private final Queue<Runnable> tasks = new ArrayDeque<>();
43      private final HTTP2Producer producer = new HTTP2Producer();
44      private final AtomicLong bytesIn = new AtomicLong();
45      private final ByteBufferPool byteBufferPool;
46      private final Parser parser;
47      private final ISession session;
48      private final int bufferSize;
49      private final ExecutionStrategy executionStrategy;
50  
51      public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize, ExecutionStrategy.Factory executionFactory)
52      {
53          super(endPoint, executor);
54          this.byteBufferPool = byteBufferPool;
55          this.parser = parser;
56          this.session = session;
57          this.bufferSize = bufferSize;
58          this.executionStrategy = executionFactory.newExecutionStrategy(producer, executor);
59      }
60  
61      @Override
62      public long getBytesIn()
63      {
64          return bytesIn.get();
65      }
66  
67      @Override
68      public long getBytesOut()
69      {
70          return session.getBytesWritten();
71      }
72  
73      public ISession getSession()
74      {
75          return session;
76      }
77  
78      protected Parser getParser()
79      {
80          return parser;
81      }
82  
83      protected void setInputBuffer(ByteBuffer buffer)
84      {
85          producer.buffer = buffer;
86      }
87  
88      @Override
89      public void onOpen()
90      {
91          if (LOG.isDebugEnabled())
92              LOG.debug("HTTP2 Open {} ", this);
93          super.onOpen();
94          executionStrategy.execute();
95      }
96  
97      @Override
98      public void onClose()
99      {
100         if (LOG.isDebugEnabled())
101             LOG.debug("HTTP2 Close {} ", this);
102         super.onClose();
103     }
104 
105     @Override
106     public void onFillable()
107     {
108         if (LOG.isDebugEnabled())
109             LOG.debug("HTTP2 onFillable {} ", this);
110         executionStrategy.execute();
111     }
112 
113     private int fill(EndPoint endPoint, ByteBuffer buffer)
114     {
115         try
116         {
117             if (endPoint.isInputShutdown())
118                 return -1;
119             return endPoint.fill(buffer);
120         }
121         catch (IOException x)
122         {
123             LOG.debug("Could not read from " + endPoint, x);
124             return -1;
125         }
126     }
127 
128     @Override
129     public boolean onIdleExpired()
130     {
131         boolean idle = isFillInterested();
132         if (idle)
133         {
134             boolean close = session.onIdleTimeout();
135             if (close)
136                 session.close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.NOOP);
137         }
138         return false;
139     }
140 
141     protected void offerTask(Runnable task, boolean dispatch)
142     {
143         offerTask(task);
144         if (dispatch)
145             executionStrategy.dispatch();
146         else
147             executionStrategy.execute();
148     }
149 
150     @Override
151     public void close()
152     {
153         // We don't call super from here, otherwise we close the
154         // endPoint and we're not able to read or write anymore.
155         session.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP);
156     }
157 
158     private void offerTask(Runnable task)
159     {
160         synchronized (this)
161         {
162             tasks.offer(task);
163         }
164     }
165 
166     private Runnable pollTask()
167     {
168         synchronized (this)
169         {
170             return tasks.poll();
171         }
172     }
173 
174     protected class HTTP2Producer implements ExecutionStrategy.Producer
175     {
176         private final Callback fillCallback = new FillCallback();
177         private ByteBuffer buffer;
178 
179         @Override
180         public Runnable produce()
181         {
182             Runnable task = pollTask();
183             if (LOG.isDebugEnabled())
184                 LOG.debug("Dequeued task {}", task);
185             if (task != null)
186                 return task;
187 
188             if (isFillInterested())
189                 return null;
190 
191             if (buffer == null)
192                 buffer = byteBufferPool.acquire(bufferSize, false); // TODO: make directness customizable
193             boolean looping = BufferUtil.hasContent(buffer);
194             while (true)
195             {
196                 if (looping)
197                 {
198                     while (buffer.hasRemaining())
199                         parser.parse(buffer);
200 
201                     task = pollTask();
202                     if (LOG.isDebugEnabled())
203                         LOG.debug("Dequeued new task {}", task);
204                     if (task != null)
205                     {
206                         release();
207                         return task;
208                     }
209                 }
210 
211                 int filled = fill(getEndPoint(), buffer);
212                 if (LOG.isDebugEnabled())
213                     LOG.debug("Filled {} bytes", filled);
214 
215                 if (filled == 0)
216                 {
217                     release();
218                     getEndPoint().fillInterested(fillCallback);
219                     return null;
220                 }
221                 else if (filled < 0)
222                 {
223                     release();
224                     session.onShutdown();
225                     return null;
226                 }
227                 else
228                 {
229                     bytesIn.addAndGet(filled);
230                 }
231 
232                 looping = true;
233             }
234         }
235 
236         private void release()
237         {
238             if (buffer != null && !buffer.hasRemaining())
239             {
240                 byteBufferPool.release(buffer);
241                 buffer = null;
242             }
243         }
244     }
245 
246     private class FillCallback implements Callback.NonBlocking
247     {
248         @Override
249         public void succeeded()
250         {
251             onFillable();
252         }
253 
254         @Override
255         public void failed(Throwable x)
256         {
257             onFillInterestedFailed(x);
258         }
259     }
260 }