1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy.client;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.concurrent.Executor;
24
25 import org.eclipse.jetty.io.AbstractConnection;
26 import org.eclipse.jetty.io.ByteBufferPool;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.io.RuntimeIOException;
29 import org.eclipse.jetty.spdy.Controller;
30 import org.eclipse.jetty.spdy.ISession;
31 import org.eclipse.jetty.spdy.IdleListener;
32 import org.eclipse.jetty.spdy.api.GoAwayInfo;
33 import org.eclipse.jetty.spdy.parser.Parser;
34 import org.eclipse.jetty.util.Callback;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37
38 public class SPDYConnection extends AbstractConnection implements Controller, IdleListener
39 {
40 private static final Logger LOG = Log.getLogger(SPDYConnection.class);
41 private final ByteBufferPool bufferPool;
42 private final Parser parser;
43 private final int bufferSize;
44 private volatile ISession session;
45 private volatile boolean idle = false;
46
47 public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor, boolean dispatchIO)
48 {
49 this(endPoint, bufferPool, parser, executor, dispatchIO, 8192);
50 }
51
52 public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor, boolean dispatchIO, int bufferSize)
53 {
54
55
56
57
58
59
60 super(endPoint, executor, dispatchIO);
61 this.bufferPool = bufferPool;
62 this.parser = parser;
63 onIdle(true);
64 this.bufferSize = bufferSize;
65 }
66
67 @Override
68 public void onOpen()
69 {
70 super.onOpen();
71 fillInterested();
72 }
73
74 @Override
75 public void onFillable()
76 {
77 ByteBuffer buffer = bufferPool.acquire(bufferSize, false);
78 boolean readMore = read(buffer) == 0;
79 bufferPool.release(buffer);
80 if (readMore)
81 fillInterested();
82 }
83
84 protected int read(ByteBuffer buffer)
85 {
86 EndPoint endPoint = getEndPoint();
87 while (true)
88 {
89 int filled = fill(endPoint, buffer);
90 if (LOG.isDebugEnabled())
91 LOG.debug("Read {} bytes", filled);
92 if (filled == 0)
93 {
94 return 0;
95 }
96 else if (filled < 0)
97 {
98 shutdown(session);
99 return -1;
100 }
101 else
102 {
103 parser.parse(buffer);
104 }
105 }
106 }
107
108 private int fill(EndPoint endPoint, ByteBuffer buffer)
109 {
110 try
111 {
112 if (endPoint.isInputShutdown())
113 return -1;
114 return endPoint.fill(buffer);
115 }
116 catch (IOException x)
117 {
118 endPoint.close();
119 throw new RuntimeIOException(x);
120 }
121 }
122
123 @Override
124 public void write(ByteBuffer buffer, final Callback callback)
125 {
126 EndPoint endPoint = getEndPoint();
127 endPoint.write(callback, buffer);
128 }
129
130 @Override
131 public void close()
132 {
133 goAway(session);
134 }
135
136 @Override
137 public void close(boolean onlyOutput)
138 {
139 EndPoint endPoint = getEndPoint();
140
141
142 LOG.debug("Shutting down output {}", endPoint);
143 endPoint.shutdownOutput();
144 if (!onlyOutput)
145 {
146 LOG.debug("Closing {}", endPoint);
147 endPoint.close();
148 }
149 }
150
151 @Override
152 public void onIdle(boolean idle)
153 {
154 this.idle = idle;
155 }
156
157 @Override
158 protected boolean onReadTimeout()
159 {
160 boolean idle = this.idle;
161 LOG.debug("Idle timeout on {}, idle={}", this, idle);
162 if (idle)
163 goAway(session);
164 return false;
165 }
166
167 protected void goAway(ISession session)
168 {
169 if (session != null)
170 session.goAway(new GoAwayInfo(), new Callback.Adapter());
171 }
172
173 private void shutdown(ISession session)
174 {
175 if (session != null && !getEndPoint().isOutputShutdown())
176 session.shutdown();
177 }
178
179 protected ISession getSession()
180 {
181 return session;
182 }
183
184 public void setSession(ISession session)
185 {
186 this.session = session;
187 }
188 }