1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy.client;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.concurrent.Executor;
24
25 import org.eclipse.jetty.io.AbstractConnection;
26 import org.eclipse.jetty.io.ByteBufferPool;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.io.RuntimeIOException;
29 import org.eclipse.jetty.spdy.Controller;
30 import org.eclipse.jetty.spdy.ISession;
31 import org.eclipse.jetty.spdy.IdleListener;
32 import org.eclipse.jetty.spdy.api.GoAwayInfo;
33 import org.eclipse.jetty.spdy.parser.Parser;
34 import org.eclipse.jetty.util.Callback;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37
38 public class SPDYConnection extends AbstractConnection implements Controller, IdleListener
39 {
40 private static final Logger LOG = Log.getLogger(SPDYConnection.class);
41 private final ByteBufferPool bufferPool;
42 private final Parser parser;
43 private final int bufferSize;
44 private volatile ISession session;
45 private volatile boolean idle = false;
46
47 public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor, boolean dispatchIO)
48 {
49 this(endPoint, bufferPool, parser, executor, dispatchIO, 8192);
50 }
51
52 public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor, boolean dispatchIO, int bufferSize)
53 {
54
55
56
57
58
59
60 super(endPoint, executor, dispatchIO);
61 this.bufferPool = bufferPool;
62 this.parser = parser;
63 onIdle(true);
64 this.bufferSize = bufferSize;
65 }
66
67 @Override
68 public void onOpen()
69 {
70 super.onOpen();
71 fillInterested();
72 }
73
74 @Override
75 public void onFillable()
76 {
77 ByteBuffer buffer = bufferPool.acquire(bufferSize, false);
78 boolean readMore = read(buffer) == 0;
79 bufferPool.release(buffer);
80 if (readMore)
81 fillInterested();
82 }
83
84 protected int read(ByteBuffer buffer)
85 {
86 EndPoint endPoint = getEndPoint();
87 while (true)
88 {
89 int filled = fill(endPoint, buffer);
90 if (LOG.isDebugEnabled())
91 LOG.debug("Read {} bytes", filled);
92 if (filled == 0)
93 {
94 return 0;
95 }
96 else if (filled < 0)
97 {
98 shutdown(session);
99 return -1;
100 }
101 else
102 {
103 parser.parse(buffer);
104 }
105 }
106 }
107
108 private int fill(EndPoint endPoint, ByteBuffer buffer)
109 {
110 try
111 {
112 if (endPoint.isInputShutdown())
113 return -1;
114 return endPoint.fill(buffer);
115 }
116 catch (IOException x)
117 {
118 endPoint.close();
119 throw new RuntimeIOException(x);
120 }
121 }
122
123 @Override
124 public void write(final Callback callback, ByteBuffer... buffers)
125 {
126 EndPoint endPoint = getEndPoint();
127 endPoint.write(callback, buffers);
128 }
129
130 @Override
131 public void close()
132 {
133 goAway(session);
134 }
135
136 @Override
137 public void close(boolean onlyOutput)
138 {
139 EndPoint endPoint = getEndPoint();
140
141
142 if (LOG.isDebugEnabled())
143 LOG.debug("Shutting down output {}", endPoint);
144 endPoint.shutdownOutput();
145 if (!onlyOutput)
146 {
147 if (LOG.isDebugEnabled())
148 LOG.debug("Closing {}", endPoint);
149 endPoint.close();
150 }
151 }
152
153 @Override
154 public void onIdle(boolean idle)
155 {
156 this.idle = idle;
157 }
158
159 @Override
160 protected boolean onReadTimeout()
161 {
162 boolean idle = this.idle;
163 if (LOG.isDebugEnabled())
164 LOG.debug("Idle timeout on {}, idle={}", this, idle);
165 if (idle)
166 goAway(session);
167 return false;
168 }
169
170 protected void goAway(ISession session)
171 {
172 if (session != null)
173 session.goAway(new GoAwayInfo(), Callback.Adapter.INSTANCE);
174 }
175
176 private void shutdown(ISession session)
177 {
178 if (session != null && !getEndPoint().isOutputShutdown())
179 session.shutdown();
180 }
181
182 protected ISession getSession()
183 {
184 return session;
185 }
186
187 public void setSession(ISession session)
188 {
189 this.session = session;
190 }
191 }