View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.spdy;
21  
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  
25  import org.eclipse.jetty.io.AbstractConnection;
26  import org.eclipse.jetty.io.AsyncEndPoint;
27  import org.eclipse.jetty.io.Buffer;
28  import org.eclipse.jetty.io.Connection;
29  import org.eclipse.jetty.io.nio.AsyncConnection;
30  import org.eclipse.jetty.io.nio.DirectNIOBuffer;
31  import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
32  import org.eclipse.jetty.io.nio.NIOBuffer;
33  import org.eclipse.jetty.spdy.api.Handler;
34  import org.eclipse.jetty.spdy.api.Session;
35  import org.eclipse.jetty.spdy.parser.Parser;
36  import org.eclipse.jetty.util.log.Log;
37  import org.eclipse.jetty.util.log.Logger;
38  
39  public class SPDYAsyncConnection extends AbstractConnection implements AsyncConnection, Controller<StandardSession.FrameBytes>, IdleListener
40  {
41      private static final Logger logger = Log.getLogger(SPDYAsyncConnection.class);
42      private final ByteBufferPool bufferPool;
43      private final Parser parser;
44      private volatile Session session;
45      private ByteBuffer writeBuffer;
46      private Handler<StandardSession.FrameBytes> writeHandler;
47      private StandardSession.FrameBytes writeContext;
48      private volatile boolean writePending;
49  
50      public SPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser)
51      {
52          super(endPoint);
53          this.bufferPool = bufferPool;
54          this.parser = parser;
55          onIdle(true);
56      }
57  
58      @Override
59      public Connection handle() throws IOException
60      {
61          AsyncEndPoint endPoint = getEndPoint();
62          boolean progress = true;
63          while (endPoint.isOpen() && progress)
64          {
65              int filled = fill();
66              progress = filled > 0;
67  
68              int flushed = flush();
69              progress |= flushed > 0;
70  
71              endPoint.flush();
72  
73              progress |= endPoint.hasProgressed();
74  
75              if (!progress && filled < 0)
76              {
77                  onInputShutdown();
78                  close(false);
79              }
80          }
81          return this;
82      }
83  
84      public int fill() throws IOException
85      {
86          ByteBuffer buffer = bufferPool.acquire(8192, true);
87          NIOBuffer jettyBuffer = new DirectNIOBuffer(buffer, false);
88          jettyBuffer.setPutIndex(jettyBuffer.getIndex());
89          AsyncEndPoint endPoint = getEndPoint();
90          int filled = endPoint.fill(jettyBuffer);
91          logger.debug("Filled {} from {}", filled, endPoint);
92          if (filled <= 0)
93              return filled;
94  
95          buffer.limit(jettyBuffer.putIndex());
96          buffer.position(jettyBuffer.getIndex());
97          parser.parse(buffer);
98  
99          bufferPool.release(buffer);
100 
101         return filled;
102     }
103 
104     public int flush()
105     {
106         int result = 0;
107         // Volatile read to ensure visibility of buffer and handler
108         if (writePending)
109             result = write(writeBuffer, writeHandler, writeContext);
110         logger.debug("Flushed {} to {}", result, getEndPoint());
111         return result;
112     }
113 
114     @Override
115     public int write(ByteBuffer buffer, Handler<StandardSession.FrameBytes> handler, StandardSession.FrameBytes context)
116     {
117         int remaining = buffer.remaining();
118         Buffer jettyBuffer = buffer.isDirect() ? new DirectNIOBuffer(buffer, false) : new IndirectNIOBuffer(buffer, false);
119         AsyncEndPoint endPoint = getEndPoint();
120         try
121         {
122             int written = endPoint.flush(jettyBuffer);
123             logger.debug("Written {} bytes, {} remaining", written, jettyBuffer.length());
124         }
125         catch (Exception x)
126         {
127             close(false);
128             handler.failed(context, x);
129             return -1;
130         }
131         finally
132         {
133             buffer.limit(jettyBuffer.putIndex());
134             buffer.position(jettyBuffer.getIndex());
135         }
136 
137         if (buffer.hasRemaining())
138         {
139             // Save buffer and handler in order to finish the write later in flush()
140             this.writeBuffer = buffer;
141             this.writeHandler = handler;
142             this.writeContext = context;
143             // Volatile write to ensure visibility of write fields
144             writePending = true;
145             endPoint.scheduleWrite();
146         }
147         else
148         {
149             if (writePending)
150             {
151                 this.writeBuffer = null;
152                 this.writeHandler = null;
153                 this.writeContext = null;
154                 // Volatile write to ensure visibility of write fields
155                 writePending = false;
156             }
157             handler.completed(context);
158         }
159 
160         return remaining - buffer.remaining();
161     }
162 
163     @Override
164     public void close(boolean onlyOutput)
165     {
166         try
167         {
168             AsyncEndPoint endPoint = getEndPoint();
169             try
170             {
171                 // We need to gently close first, to allow
172                 // SSL close alerts to be sent by Jetty
173                 logger.debug("Shutting down output {}", endPoint);
174                 endPoint.shutdownOutput();
175                 if (!onlyOutput)
176                 {
177                     logger.debug("Closing {}", endPoint);
178                     endPoint.close();
179                 }
180             }
181             catch (IOException x)
182             {
183                 endPoint.close();
184             }
185         }
186         catch (IOException x)
187         {
188             logger.ignore(x);
189         }
190     }
191 
192     @Override
193     public void onIdle(boolean idle)
194     {
195         getEndPoint().setCheckForIdle(idle);
196     }
197 
198     @Override
199     public AsyncEndPoint getEndPoint()
200     {
201         return (AsyncEndPoint)super.getEndPoint();
202     }
203 
204     @Override
205     public boolean isIdle()
206     {
207         return false;
208     }
209 
210     @Override
211     public boolean isSuspended()
212     {
213         return false;
214     }
215 
216     @Override
217     public void onClose()
218     {
219     }
220 
221     @Override
222     public void onInputShutdown() throws IOException
223     {
224     }
225 
226     @Override
227     public void onIdleExpired(long idleForMs)
228     {
229         logger.debug("Idle timeout expired for {}", getEndPoint());
230         session.goAway();
231     }
232 
233     protected Session getSession()
234     {
235         return session;
236     }
237 
238     protected void setSession(Session session)
239     {
240         this.session = session;
241     }
242 
243     public String toString()
244     {
245         return String.format("%s@%x{endp=%s@%x}",getClass().getSimpleName(),hashCode(),getEndPoint().getClass().getSimpleName(),getEndPoint().hashCode());
246     }
247 }