View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.Queue;
24  import java.util.concurrent.Executor;
25  
26  import org.eclipse.jetty.http2.parser.Parser;
27  import org.eclipse.jetty.io.AbstractConnection;
28  import org.eclipse.jetty.io.ByteBufferPool;
29  import org.eclipse.jetty.io.EndPoint;
30  import org.eclipse.jetty.util.BufferUtil;
31  import org.eclipse.jetty.util.ConcurrentArrayQueue;
32  import org.eclipse.jetty.util.log.Log;
33  import org.eclipse.jetty.util.log.Logger;
34  import org.eclipse.jetty.util.thread.ExecutionStrategy;
35  
36  public class HTTP2Connection extends AbstractConnection
37  {
38      protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
39  
40      private final Queue<Runnable> tasks = new ConcurrentArrayQueue<>();
41      private final ByteBufferPool byteBufferPool;
42      private final Parser parser;
43      private final ISession session;
44      private final int bufferSize;
45      private final HTTP2Producer producer = new HTTP2Producer();
46      private final ExecutionStrategy executionStrategy;
47  
48      public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
49      {
50          super(endPoint, executor);
51          this.byteBufferPool = byteBufferPool;
52          this.parser = parser;
53          this.session = session;
54          this.bufferSize = bufferSize;
55          this.executionStrategy = ExecutionStrategy.Factory.instanceFor(producer, executor);
56      }
57  
58      public ISession getSession()
59      {
60          return session;
61      }
62  
63  
64      protected Parser getParser()
65      {
66          return parser;
67      }
68  
69      protected void setInputBuffer(ByteBuffer buffer)
70      {
71          producer.buffer = buffer;
72      }
73  
74      @Override
75      public void onOpen()
76      {
77          if (LOG.isDebugEnabled())
78              LOG.debug("HTTP2 Open {} ", this);
79          super.onOpen();
80          executionStrategy.execute();
81      }
82  
83      @Override
84      public void onClose()
85      {
86          if (LOG.isDebugEnabled())
87              LOG.debug("HTTP2 Close {} ", this);
88          super.onClose();
89      }
90  
91      @Override
92      public void onFillable()
93      {
94          if (LOG.isDebugEnabled())
95              LOG.debug("HTTP2 onFillable {} ", this);
96          executionStrategy.execute();
97      }
98  
99      private int fill(EndPoint endPoint, ByteBuffer buffer)
100     {
101         try
102         {
103             if (endPoint.isInputShutdown())
104                 return -1;
105             return endPoint.fill(buffer);
106         }
107         catch (IOException x)
108         {
109             LOG.debug("Could not read from " + endPoint, x);
110             return -1;
111         }
112     }
113 
114     @Override
115     protected boolean onReadTimeout()
116     {
117         if (LOG.isDebugEnabled())
118             LOG.debug("Idle timeout {}ms expired on {}", getEndPoint().getIdleTimeout(), this);
119         session.onIdleTimeout();
120         return false;
121     }
122 
123     protected void offerTask(Runnable task, boolean dispatch)
124     {
125         tasks.offer(task);
126         if (dispatch)
127             executionStrategy.dispatch();
128         else
129             executionStrategy.execute();
130     }
131 
132     protected class HTTP2Producer implements ExecutionStrategy.Producer
133     {
134         private ByteBuffer buffer;
135 
136         @Override
137         public Runnable produce()
138         {
139             Runnable task = tasks.poll();
140             if (LOG.isDebugEnabled())
141                 LOG.debug("Dequeued task {}", task);
142             if (task != null)
143                 return task;
144 
145             if (isFillInterested())
146                 return null;
147 
148             if (buffer == null)
149                 buffer = byteBufferPool.acquire(bufferSize, false); // TODO: make directness customizable
150             boolean looping = BufferUtil.hasContent(buffer);
151             while (true)
152             {
153                 if (looping)
154                 {
155                     while (buffer.hasRemaining())
156                         parser.parse(buffer);
157 
158                     task = tasks.poll();
159                     if (LOG.isDebugEnabled())
160                         LOG.debug("Dequeued task {}", task);
161                     if (task != null)
162                     {
163                         release();
164                         return task;
165                     }
166                 }
167 
168                 int filled = fill(getEndPoint(), buffer);
169                 if (LOG.isDebugEnabled())
170                     LOG.debug("Filled {} bytes", filled);
171 
172                 if (filled == 0)
173                 {
174                     release();
175                     fillInterested();
176                     return null;
177                 }
178                 else if (filled < 0)
179                 {
180                     release();
181                     session.onShutdown();
182                     return null;
183                 }
184 
185                 looping = true;
186             }
187         }
188 
189         private void release()
190         {
191             if (buffer != null && !buffer.hasRemaining())
192             {
193                 byteBufferPool.release(buffer);
194                 buffer = null;
195             }
196         }
197     }
198 }