1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.spdy;
21
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24
25 import org.eclipse.jetty.io.AbstractConnection;
26 import org.eclipse.jetty.io.AsyncEndPoint;
27 import org.eclipse.jetty.io.Buffer;
28 import org.eclipse.jetty.io.Connection;
29 import org.eclipse.jetty.io.nio.AsyncConnection;
30 import org.eclipse.jetty.io.nio.DirectNIOBuffer;
31 import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
32 import org.eclipse.jetty.io.nio.NIOBuffer;
33 import org.eclipse.jetty.spdy.api.Handler;
34 import org.eclipse.jetty.spdy.api.Session;
35 import org.eclipse.jetty.spdy.parser.Parser;
36 import org.eclipse.jetty.util.log.Log;
37 import org.eclipse.jetty.util.log.Logger;
38
39 public class SPDYAsyncConnection extends AbstractConnection implements AsyncConnection, Controller<StandardSession.FrameBytes>, IdleListener
40 {
41 private static final Logger logger = Log.getLogger(SPDYAsyncConnection.class);
42 private final ByteBufferPool bufferPool;
43 private final Parser parser;
44 private volatile Session session;
45 private ByteBuffer writeBuffer;
46 private Handler<StandardSession.FrameBytes> writeHandler;
47 private StandardSession.FrameBytes writeContext;
48 private volatile boolean writePending;
49
50 public SPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser)
51 {
52 super(endPoint);
53 this.bufferPool = bufferPool;
54 this.parser = parser;
55 onIdle(true);
56 }
57
58 @Override
59 public Connection handle() throws IOException
60 {
61 AsyncEndPoint endPoint = getEndPoint();
62 boolean progress = true;
63 while (endPoint.isOpen() && progress)
64 {
65 int filled = fill();
66 progress = filled > 0;
67
68 int flushed = flush();
69 progress |= flushed > 0;
70
71 endPoint.flush();
72
73 progress |= endPoint.hasProgressed();
74
75 if (!progress && filled < 0)
76 {
77 onInputShutdown();
78 close(false);
79 }
80 }
81 return this;
82 }
83
84 public int fill() throws IOException
85 {
86 ByteBuffer buffer = bufferPool.acquire(8192, true);
87 NIOBuffer jettyBuffer = new DirectNIOBuffer(buffer, false);
88 jettyBuffer.setPutIndex(jettyBuffer.getIndex());
89 AsyncEndPoint endPoint = getEndPoint();
90 int filled = endPoint.fill(jettyBuffer);
91 logger.debug("Filled {} from {}", filled, endPoint);
92 if (filled <= 0)
93 return filled;
94
95 buffer.limit(jettyBuffer.putIndex());
96 buffer.position(jettyBuffer.getIndex());
97 parser.parse(buffer);
98
99 bufferPool.release(buffer);
100
101 return filled;
102 }
103
104 public int flush()
105 {
106 int result = 0;
107
108 if (writePending)
109 result = write(writeBuffer, writeHandler, writeContext);
110 logger.debug("Flushed {} to {}", result, getEndPoint());
111 return result;
112 }
113
114 @Override
115 public int write(ByteBuffer buffer, Handler<StandardSession.FrameBytes> handler, StandardSession.FrameBytes context)
116 {
117 int remaining = buffer.remaining();
118 Buffer jettyBuffer = buffer.isDirect() ? new DirectNIOBuffer(buffer, false) : new IndirectNIOBuffer(buffer, false);
119 AsyncEndPoint endPoint = getEndPoint();
120 try
121 {
122 int written = endPoint.flush(jettyBuffer);
123 logger.debug("Written {} bytes, {} remaining", written, jettyBuffer.length());
124 }
125 catch (Exception x)
126 {
127 close(false);
128 handler.failed(context, x);
129 return -1;
130 }
131 finally
132 {
133 buffer.limit(jettyBuffer.putIndex());
134 buffer.position(jettyBuffer.getIndex());
135 }
136
137 if (buffer.hasRemaining())
138 {
139
140 this.writeBuffer = buffer;
141 this.writeHandler = handler;
142 this.writeContext = context;
143
144 writePending = true;
145 endPoint.scheduleWrite();
146 }
147 else
148 {
149 if (writePending)
150 {
151 this.writeBuffer = null;
152 this.writeHandler = null;
153 this.writeContext = null;
154
155 writePending = false;
156 }
157 handler.completed(context);
158 }
159
160 return remaining - buffer.remaining();
161 }
162
163 @Override
164 public void close(boolean onlyOutput)
165 {
166 try
167 {
168 AsyncEndPoint endPoint = getEndPoint();
169 try
170 {
171
172
173 logger.debug("Shutting down output {}", endPoint);
174 endPoint.shutdownOutput();
175 if (!onlyOutput)
176 {
177 logger.debug("Closing {}", endPoint);
178 endPoint.close();
179 }
180 }
181 catch (IOException x)
182 {
183 endPoint.close();
184 }
185 }
186 catch (IOException x)
187 {
188 logger.ignore(x);
189 }
190 }
191
192 @Override
193 public void onIdle(boolean idle)
194 {
195 getEndPoint().setCheckForIdle(idle);
196 }
197
198 @Override
199 public AsyncEndPoint getEndPoint()
200 {
201 return (AsyncEndPoint)super.getEndPoint();
202 }
203
204 @Override
205 public boolean isIdle()
206 {
207 return false;
208 }
209
210 @Override
211 public boolean isSuspended()
212 {
213 return false;
214 }
215
216 @Override
217 public void onClose()
218 {
219 }
220
221 @Override
222 public void onInputShutdown() throws IOException
223 {
224 }
225
226 @Override
227 public void onIdleExpired(long idleForMs)
228 {
229 logger.debug("Idle timeout expired for {}", getEndPoint());
230 session.goAway();
231 }
232
233 protected Session getSession()
234 {
235 return session;
236 }
237
238 protected void setSession(Session session)
239 {
240 this.session = session;
241 }
242
243 public String toString()
244 {
245 return String.format("%s@%x{endp=%s@%x}",getClass().getSimpleName(),hashCode(),getEndPoint().getClass().getSimpleName(),getEndPoint().hashCode());
246 }
247 }