1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.util.List;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.TimeoutException;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.eclipse.jetty.util.Callback;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30
31
32
33
34
35
36
37
38 public abstract class AbstractConnection implements Connection
39 {
40 private static final Logger LOG = Log.getLogger(AbstractConnection.class);
41
42 public static final boolean EXECUTE_ONFILLABLE=true;
43
44 private final List<Listener> listeners = new CopyOnWriteArrayList<>();
45 private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE);
46 private final long _created=System.currentTimeMillis();
47 private final EndPoint _endPoint;
48 private final Executor _executor;
49 private final Callback _readCallback;
50 private final boolean _executeOnfillable;
51 private int _inputBufferSize=2048;
52
53 protected AbstractConnection(EndPoint endp, Executor executor)
54 {
55 this(endp,executor,EXECUTE_ONFILLABLE);
56 }
57
58 protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
59 {
60 if (executor == null)
61 throw new IllegalArgumentException("Executor must not be null!");
62 _endPoint = endp;
63 _executor = executor;
64 _readCallback = new ReadCallback();
65 _executeOnfillable=executeOnfillable;
66 }
67
68 @Override
69 public void addListener(Listener listener)
70 {
71 listeners.add(listener);
72 }
73
74 public int getInputBufferSize()
75 {
76 return _inputBufferSize;
77 }
78
79 public void setInputBufferSize(int inputBufferSize)
80 {
81 _inputBufferSize = inputBufferSize;
82 }
83
84 protected Executor getExecutor()
85 {
86 return _executor;
87 }
88
89
90
91
92
93
94
95 public void fillInterested()
96 {
97 LOG.debug("fillInterested {}",this);
98
99 loop:while(true)
100 {
101 switch(_state.get())
102 {
103 case IDLE:
104 if (_state.compareAndSet(State.IDLE,State.INTERESTED))
105 {
106 getEndPoint().fillInterested(_readCallback);
107 break loop;
108 }
109 break;
110
111 case FILLING:
112 if (_state.compareAndSet(State.FILLING,State.FILLING_INTERESTED))
113 break loop;
114 break;
115
116 case FILLING_INTERESTED:
117 case INTERESTED:
118 break loop;
119 }
120 }
121 }
122
123
124
125
126
127 public abstract void onFillable();
128
129
130
131
132
133 protected void onFillInterestedFailed(Throwable cause)
134 {
135 LOG.debug("{} onFillInterestedFailed {}", this, cause);
136 if (_endPoint.isOpen())
137 {
138 boolean close = true;
139 if (cause instanceof TimeoutException)
140 close = onReadTimeout();
141 if (close)
142 {
143 if (_endPoint.isOutputShutdown())
144 _endPoint.close();
145 else
146 _endPoint.shutdownOutput();
147 }
148 }
149 }
150
151
152
153
154
155 protected boolean onReadTimeout()
156 {
157 return true;
158 }
159
160 @Override
161 public void onOpen()
162 {
163 LOG.debug("onOpen {}", this);
164
165 for (Listener listener : listeners)
166 listener.onOpened(this);
167 }
168
169 @Override
170 public void onClose()
171 {
172 LOG.debug("onClose {}",this);
173
174 for (Listener listener : listeners)
175 listener.onClosed(this);
176 }
177
178 @Override
179 public EndPoint getEndPoint()
180 {
181 return _endPoint;
182 }
183
184 @Override
185 public void close()
186 {
187 getEndPoint().close();
188 }
189
190 @Override
191 public int getMessagesIn()
192 {
193 return -1;
194 }
195
196 @Override
197 public int getMessagesOut()
198 {
199 return -1;
200 }
201
202 @Override
203 public long getBytesIn()
204 {
205 return -1;
206 }
207
208 @Override
209 public long getBytesOut()
210 {
211 return -1;
212 }
213
214 @Override
215 public long getCreatedTimeStamp()
216 {
217 return _created;
218 }
219
220 @Override
221 public String toString()
222 {
223 return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), _state.get());
224 }
225
226 private enum State
227 {
228 IDLE, INTERESTED, FILLING, FILLING_INTERESTED
229 }
230
231 private class ReadCallback implements Callback, Runnable
232 {
233 @Override
234 public void run()
235 {
236 if (_state.compareAndSet(State.INTERESTED,State.FILLING))
237 {
238 try
239 {
240 onFillable();
241 }
242 finally
243 {
244 loop:while(true)
245 {
246 switch(_state.get())
247 {
248 case IDLE:
249 case INTERESTED:
250 throw new IllegalStateException();
251
252 case FILLING:
253 if (_state.compareAndSet(State.FILLING,State.IDLE))
254 break loop;
255 break;
256
257 case FILLING_INTERESTED:
258 if (_state.compareAndSet(State.FILLING_INTERESTED,State.INTERESTED))
259 {
260 getEndPoint().fillInterested(_readCallback);
261 break loop;
262 }
263 break;
264 }
265 }
266 }
267 }
268 else
269 LOG.warn(new Throwable());
270 }
271
272 @Override
273 public void succeeded()
274 {
275 if (_executeOnfillable)
276 _executor.execute(this);
277 else
278 run();
279 }
280
281 @Override
282 public void failed(Throwable x)
283 {
284 onFillInterestedFailed(x);
285 }
286
287 @Override
288 public String toString()
289 {
290 return String.format("AC.ExReadCB@%x", AbstractConnection.this.hashCode());
291 }
292 };
293 }