View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2.server;
20  
21  import java.nio.ByteBuffer;
22  import java.util.concurrent.atomic.AtomicBoolean;
23  
24  import org.eclipse.jetty.http.HttpVersion;
25  import org.eclipse.jetty.http.MetaData;
26  import org.eclipse.jetty.http2.ErrorCode;
27  import org.eclipse.jetty.http2.IStream;
28  import org.eclipse.jetty.http2.api.Stream;
29  import org.eclipse.jetty.http2.frames.DataFrame;
30  import org.eclipse.jetty.http2.frames.HeadersFrame;
31  import org.eclipse.jetty.http2.frames.PushPromiseFrame;
32  import org.eclipse.jetty.http2.frames.ResetFrame;
33  import org.eclipse.jetty.server.Connector;
34  import org.eclipse.jetty.server.HttpChannel;
35  import org.eclipse.jetty.server.HttpTransport;
36  import org.eclipse.jetty.util.BufferUtil;
37  import org.eclipse.jetty.util.Callback;
38  import org.eclipse.jetty.util.Promise;
39  import org.eclipse.jetty.util.log.Log;
40  import org.eclipse.jetty.util.log.Logger;
41  
42  public class HttpTransportOverHTTP2 implements HttpTransport
43  {
44      private static final Logger LOG = Log.getLogger(HttpTransportOverHTTP2.class);
45  
46      private final AtomicBoolean commit = new AtomicBoolean();
47      private final Callback commitCallback = new CommitCallback();
48      private final Connector connector;
49      private final HTTP2ServerConnection connection;
50      private IStream stream;
51  
52      public HttpTransportOverHTTP2(Connector connector, HTTP2ServerConnection connection)
53      {
54          this.connector = connector;
55          this.connection = connection;
56      }
57  
58      @Override
59      public boolean isOptimizedForDirectBuffers()
60      {
61          // Because sent buffers are passed directly to the endpoint without
62          // copying we can defer to the endpoint
63          return connection.getEndPoint().isOptimizedForDirectBuffers();
64      }
65      
66      public IStream getStream()
67      {
68          return stream;
69      }
70  
71      public void setStream(IStream stream)
72      {
73          if (LOG.isDebugEnabled())
74              LOG.debug("{} setStream {}", this, stream.getId());
75          this.stream = stream;
76      }
77  
78      public void recycle()
79      {
80          this.stream = null;
81          commit.set(false);
82      }
83  
84      @Override
85      public void send(MetaData.Response info, boolean isHeadRequest, ByteBuffer content, boolean lastContent, Callback callback)
86      {
87          // info != null | content != 0 | last = true => commit + send/end
88          // info != null | content != 0 | last = false => commit + send
89          // info != null | content == 0 | last = true => commit/end
90          // info != null | content == 0 | last = false => commit
91          // info == null | content != 0 | last = true => send/end
92          // info == null | content != 0 | last = false => send
93          // info == null | content == 0 | last = true => send/end
94          // info == null | content == 0 | last = false => noop
95  
96          boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
97  
98          if (info != null)
99          {
100             if (commit.compareAndSet(false, true))
101             {
102                 if (hasContent)
103                 {
104                     commit(info, false, commitCallback);
105                     send(content, lastContent, callback);
106                 }
107                 else
108                 {
109                     commit(info, lastContent, callback);
110                 }
111             }
112             else
113             {
114                 callback.failed(new IllegalStateException("committed"));
115             }
116         }
117         else
118         {
119             if (hasContent || lastContent)
120             {
121                 send(content, lastContent, callback);
122             }
123             else
124             {
125                 callback.succeeded();
126             }
127         }
128     }
129 
130     @Override
131     public boolean isPushSupported()
132     {
133         return stream.getSession().isPushEnabled();
134     }
135 
136     @Override
137     public void push(final MetaData.Request request)
138     {
139         if (!stream.getSession().isPushEnabled())
140         {
141             if (LOG.isDebugEnabled())
142                 LOG.debug("HTTP/2 Push disabled for {}", request);
143             return;
144         }
145 
146         if (LOG.isDebugEnabled())
147             LOG.debug("HTTP/2 Push {}",request);
148         
149         stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Promise<Stream>()
150         {
151             @Override
152             public void succeeded(Stream pushStream)
153             {
154                 connection.push(connector, (IStream)pushStream, request);
155             }
156 
157             @Override
158             public void failed(Throwable x)
159             {
160                 if (LOG.isDebugEnabled())
161                     LOG.debug("Could not push " + request, x);
162             }
163         }, new Stream.Listener.Adapter()); // TODO: handle reset from the client ?
164     }
165 
166     private void commit(MetaData.Response info, boolean endStream, Callback callback)
167     {
168         if (LOG.isDebugEnabled())
169         {
170             LOG.debug("HTTP2 Response #{}:{}{} {}{}{}",
171                     stream.getId(), System.lineSeparator(), HttpVersion.HTTP_2, info.getStatus(),
172                     System.lineSeparator(), info.getFields());
173         }
174 
175         HeadersFrame frame = new HeadersFrame(stream.getId(), info, null, endStream);
176         stream.headers(frame, callback);
177     }
178 
179     private void send(ByteBuffer content, boolean lastContent, Callback callback)
180     {
181         if (LOG.isDebugEnabled())
182         {
183             LOG.debug("HTTP2 Response #{}: {} content bytes{}",
184                     stream.getId(), content.remaining(), lastContent ? " (last chunk)" : "");
185         }
186         DataFrame frame = new DataFrame(stream.getId(), content, lastContent);
187         stream.data(frame, callback);
188     }
189 
190     @Override
191     public void onCompleted()
192     {
193         if (!stream.isClosed())
194         {
195             // If the stream is not closed, it is still reading the request content.
196             // Send a reset to the other end so that it stops sending data.
197             stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
198             // Now that this stream is reset, in-flight data frames will be consumed and discarded.
199             // Consume the existing queued data frames to avoid stalling the flow control.
200             HttpChannel channel = (HttpChannel)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
201             channel.getRequest().getHttpInput().consumeAll();
202         }
203     }
204 
205     @Override
206     public void abort(Throwable failure)
207     {
208         IStream stream = this.stream;
209         if (LOG.isDebugEnabled())
210             LOG.debug("HTTP2 Response #{} aborted", stream == null ? -1 : stream.getId());
211         if (stream != null)
212             stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP);
213     }
214 
215     private class CommitCallback implements Callback.NonBlocking
216     {   
217         @Override
218         public void succeeded()
219         {
220             if (LOG.isDebugEnabled())
221                 LOG.debug("HTTP2 Response #{} committed", stream.getId());
222         }
223 
224         @Override
225         public void failed(Throwable x)
226         {
227             if (LOG.isDebugEnabled())
228                 LOG.debug("HTTP2 Response #" + stream.getId() + " failed to commit", x);
229         }
230     }
231 }