1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy.client;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.concurrent.Executor;
24
25 import org.eclipse.jetty.io.AbstractConnection;
26 import org.eclipse.jetty.io.ByteBufferPool;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.io.RuntimeIOException;
29 import org.eclipse.jetty.spdy.Controller;
30 import org.eclipse.jetty.spdy.ISession;
31 import org.eclipse.jetty.spdy.IdleListener;
32 import org.eclipse.jetty.spdy.api.GoAwayInfo;
33 import org.eclipse.jetty.spdy.parser.Parser;
34 import org.eclipse.jetty.util.Callback;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37
38 public class SPDYConnection extends AbstractConnection implements Controller, IdleListener
39 {
40 private static final Logger LOG = Log.getLogger(SPDYConnection.class);
41 private final ByteBufferPool bufferPool;
42 private final Parser parser;
43 private final int bufferSize;
44 private volatile ISession session;
45 private volatile boolean idle = false;
46
47
48 public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor)
49 {
50 this(endPoint, bufferPool, parser, executor, 8192);
51 }
52
53 public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor, int bufferSize)
54 {
55
56
57
58
59
60 super(endPoint, executor, !EXECUTE_ONFILLABLE);
61 this.bufferPool = bufferPool;
62 this.parser = parser;
63 onIdle(true);
64 this.bufferSize = bufferSize;
65 }
66
67 @Override
68 public void onOpen()
69 {
70 super.onOpen();
71 fillInterested();
72 }
73
74 @Override
75 public void onFillable()
76 {
77 ByteBuffer buffer = bufferPool.acquire(bufferSize, true);
78 boolean readMore = read(buffer) == 0;
79 bufferPool.release(buffer);
80 if (readMore)
81 fillInterested();
82 }
83
84 protected int read(ByteBuffer buffer)
85 {
86 EndPoint endPoint = getEndPoint();
87 while (true)
88 {
89 int filled = fill(endPoint, buffer);
90 LOG.debug("Read {} bytes", filled);
91 if (filled == 0)
92 {
93 return 0;
94 }
95 else if (filled < 0)
96 {
97 shutdown(session);
98 return -1;
99 }
100 else
101 {
102 parser.parse(buffer);
103 }
104 }
105 }
106
107 private int fill(EndPoint endPoint, ByteBuffer buffer)
108 {
109 try
110 {
111 if (endPoint.isInputShutdown())
112 return -1;
113 return endPoint.fill(buffer);
114 }
115 catch (IOException x)
116 {
117 endPoint.close();
118 throw new RuntimeIOException(x);
119 }
120 }
121
122 @Override
123 public void write(ByteBuffer buffer, final Callback callback)
124 {
125 EndPoint endPoint = getEndPoint();
126 endPoint.write(callback, buffer);
127 }
128
129 @Override
130 public void close()
131 {
132 goAway(session);
133 }
134
135 @Override
136 public void close(boolean onlyOutput)
137 {
138 EndPoint endPoint = getEndPoint();
139
140
141 LOG.debug("Shutting down output {}", endPoint);
142 endPoint.shutdownOutput();
143 if (!onlyOutput)
144 {
145 LOG.debug("Closing {}", endPoint);
146 endPoint.close();
147 }
148 }
149
150 @Override
151 public void onIdle(boolean idle)
152 {
153 this.idle = idle;
154 }
155
156 @Override
157 protected boolean onReadTimeout()
158 {
159 boolean idle = this.idle;
160 LOG.debug("Idle timeout on {}, idle={}", this, idle);
161 if (idle)
162 goAway(session);
163 return false;
164 }
165
166 protected void goAway(ISession session)
167 {
168 if (session != null)
169 session.goAway(new GoAwayInfo(), new Callback.Adapter());
170 }
171
172 private void shutdown(ISession session)
173 {
174 if (session != null && !getEndPoint().isOutputShutdown())
175 session.shutdown();
176 }
177
178 protected ISession getSession()
179 {
180 return session;
181 }
182
183 public void setSession(ISession session)
184 {
185 this.session = session;
186 }
187 }