View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.Queue;
24  import java.util.concurrent.Executor;
25  
26  import org.eclipse.jetty.http2.parser.Parser;
27  import org.eclipse.jetty.io.AbstractConnection;
28  import org.eclipse.jetty.io.ByteBufferPool;
29  import org.eclipse.jetty.io.EndPoint;
30  import org.eclipse.jetty.util.BufferUtil;
31  import org.eclipse.jetty.util.Callback;
32  import org.eclipse.jetty.util.ConcurrentArrayQueue;
33  import org.eclipse.jetty.util.log.Log;
34  import org.eclipse.jetty.util.log.Logger;
35  import org.eclipse.jetty.util.thread.ExecutionStrategy;
36  
37  public class HTTP2Connection extends AbstractConnection
38  {
39      protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
40  
41      private final Queue<Runnable> tasks = new ConcurrentArrayQueue<>();
42      private final ByteBufferPool byteBufferPool;
43      private final Parser parser;
44      private final ISession session;
45      private final int bufferSize;
46      private final HTTP2Producer producer = new HTTP2Producer();
47      private final ExecutionStrategy executionStrategy;
48  
49      public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
50      {
51          super(endPoint, executor);
52          this.byteBufferPool = byteBufferPool;
53          this.parser = parser;
54          this.session = session;
55          this.bufferSize = bufferSize;
56          this.executionStrategy = ExecutionStrategy.Factory.instanceFor(producer, executor);
57      }
58  
59      public ISession getSession()
60      {
61          return session;
62      }
63  
64  
65      protected Parser getParser()
66      {
67          return parser;
68      }
69  
70      protected void setInputBuffer(ByteBuffer buffer)
71      {
72          producer.buffer = buffer;
73      }
74  
75      @Override
76      public void onOpen()
77      {
78          if (LOG.isDebugEnabled())
79              LOG.debug("HTTP2 Open {} ", this);
80          super.onOpen();
81          executionStrategy.execute();
82      }
83  
84      @Override
85      public void onClose()
86      {
87          if (LOG.isDebugEnabled())
88              LOG.debug("HTTP2 Close {} ", this);
89          super.onClose();
90      }
91  
92      @Override
93      public void onFillable()
94      {
95          if (LOG.isDebugEnabled())
96              LOG.debug("HTTP2 onFillable {} ", this);
97          executionStrategy.execute();
98      }
99  
100     private int fill(EndPoint endPoint, ByteBuffer buffer)
101     {
102         try
103         {
104             if (endPoint.isInputShutdown())
105                 return -1;
106             return endPoint.fill(buffer);
107         }
108         catch (IOException x)
109         {
110             LOG.debug("Could not read from " + endPoint, x);
111             return -1;
112         }
113     }
114 
115     @Override
116     public boolean onIdleExpired()
117     {
118         boolean close = session.onIdleTimeout();
119         boolean idle = isFillInterested();
120         if (close && idle)
121             session.close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.NOOP);
122         return false;
123     }
124 
125     protected void offerTask(Runnable task, boolean dispatch)
126     {
127         tasks.offer(task);
128         if (dispatch)
129             executionStrategy.dispatch();
130         else
131             executionStrategy.execute();
132     }
133 
134     @Override
135     public void close()
136     {
137         // We don't call super from here, otherwise we close the
138         // endPoint and we're not able to read or write anymore.
139         session.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP);
140     }
141 
142     protected class HTTP2Producer implements ExecutionStrategy.Producer
143     {
144         private ByteBuffer buffer;
145 
146         @Override
147         public Runnable produce()
148         {
149             Runnable task = tasks.poll();
150             if (LOG.isDebugEnabled())
151                 LOG.debug("Dequeued task {}", task);
152             if (task != null)
153                 return task;
154 
155             if (isFillInterested())
156                 return null;
157 
158             if (buffer == null)
159                 buffer = byteBufferPool.acquire(bufferSize, false); // TODO: make directness customizable
160             boolean looping = BufferUtil.hasContent(buffer);
161             while (true)
162             {
163                 if (looping)
164                 {
165                     while (buffer.hasRemaining())
166                         parser.parse(buffer);
167 
168                     task = tasks.poll();
169                     if (LOG.isDebugEnabled())
170                         LOG.debug("Dequeued task {}", task);
171                     if (task != null)
172                     {
173                         release();
174                         return task;
175                     }
176                 }
177 
178                 int filled = fill(getEndPoint(), buffer);
179                 if (LOG.isDebugEnabled())
180                     LOG.debug("Filled {} bytes", filled);
181 
182                 if (filled == 0)
183                 {
184                     release();
185                     fillInterested();
186                     return null;
187                 }
188                 else if (filled < 0)
189                 {
190                     release();
191                     session.onShutdown();
192                     return null;
193                 }
194 
195                 looping = true;
196             }
197         }
198 
199         private void release()
200         {
201             if (buffer != null && !buffer.hasRemaining())
202             {
203                 byteBufferPool.release(buffer);
204                 buffer = null;
205             }
206         }
207     }
208 }