1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy.client;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.concurrent.Executor;
24
25 import org.eclipse.jetty.io.AbstractConnection;
26 import org.eclipse.jetty.io.ByteBufferPool;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.io.RuntimeIOException;
29 import org.eclipse.jetty.spdy.Controller;
30 import org.eclipse.jetty.spdy.ISession;
31 import org.eclipse.jetty.spdy.IdleListener;
32 import org.eclipse.jetty.spdy.api.GoAwayInfo;
33 import org.eclipse.jetty.spdy.parser.Parser;
34 import org.eclipse.jetty.util.Callback;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37
38 public class SPDYConnection extends AbstractConnection implements Controller, IdleListener
39 {
40 private static final Logger LOG = Log.getLogger(SPDYConnection.class);
41 private final ByteBufferPool bufferPool;
42 private final Parser parser;
43 private final int bufferSize;
44 private volatile ISession session;
45 private volatile boolean idle = false;
46
47 public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor,
48 boolean executeOnFillable)
49 {
50 this(endPoint, bufferPool, parser, executor, executeOnFillable, 8192);
51 }
52
53 public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor,
54 boolean executeOnFillable, int bufferSize)
55 {
56
57
58
59
60
61
62
63
64
65
66 super(endPoint, executor, executeOnFillable);
67 this.bufferPool = bufferPool;
68 this.parser = parser;
69 onIdle(true);
70 this.bufferSize = bufferSize;
71 }
72
73 @Override
74 public void onOpen()
75 {
76 super.onOpen();
77 fillInterested();
78 }
79
80 @Override
81 public void onFillable()
82 {
83 ByteBuffer buffer = bufferPool.acquire(bufferSize, false);
84 boolean readMore = read(buffer) == 0;
85 bufferPool.release(buffer);
86 if (readMore)
87 fillInterested();
88 }
89
90 protected int read(ByteBuffer buffer)
91 {
92 EndPoint endPoint = getEndPoint();
93 while (true)
94 {
95 int filled = fill(endPoint, buffer);
96 LOG.debug("Read {} bytes", filled);
97 if (filled == 0)
98 {
99 return 0;
100 }
101 else if (filled < 0)
102 {
103 shutdown(session);
104 return -1;
105 }
106 else
107 {
108 parser.parse(buffer);
109 }
110 }
111 }
112
113 private int fill(EndPoint endPoint, ByteBuffer buffer)
114 {
115 try
116 {
117 if (endPoint.isInputShutdown())
118 return -1;
119 return endPoint.fill(buffer);
120 }
121 catch (IOException x)
122 {
123 endPoint.close();
124 throw new RuntimeIOException(x);
125 }
126 }
127
128 @Override
129 public void write(ByteBuffer buffer, final Callback callback)
130 {
131 EndPoint endPoint = getEndPoint();
132 endPoint.write(callback, buffer);
133 }
134
135 @Override
136 public void close()
137 {
138 goAway(session);
139 }
140
141 @Override
142 public void close(boolean onlyOutput)
143 {
144 EndPoint endPoint = getEndPoint();
145
146
147 LOG.debug("Shutting down output {}", endPoint);
148 endPoint.shutdownOutput();
149 if (!onlyOutput)
150 {
151 LOG.debug("Closing {}", endPoint);
152 endPoint.close();
153 }
154 }
155
156 @Override
157 public void onIdle(boolean idle)
158 {
159 this.idle = idle;
160 }
161
162 @Override
163 protected boolean onReadTimeout()
164 {
165 boolean idle = this.idle;
166 LOG.debug("Idle timeout on {}, idle={}", this, idle);
167 if (idle)
168 goAway(session);
169 return false;
170 }
171
172 protected void goAway(ISession session)
173 {
174 if (session != null)
175 session.goAway(new GoAwayInfo(), new Callback.Adapter());
176 }
177
178 private void shutdown(ISession session)
179 {
180 if (session != null && !getEndPoint().isOutputShutdown())
181 session.shutdown();
182 }
183
184 protected ISession getSession()
185 {
186 return session;
187 }
188
189 public void setSession(ISession session)
190 {
191 this.session = session;
192 }
193 }