1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.util.List;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.TimeoutException;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.eclipse.jetty.util.BlockingCallback;
28 import org.eclipse.jetty.util.Callback;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31
32
33
34
35
36
37
38
39 public abstract class AbstractConnection implements Connection
40 {
41 private static final Logger LOG = Log.getLogger(AbstractConnection.class);
42
43 public static final boolean EXECUTE_ONFILLABLE=true;
44
45 private final List<Listener> listeners = new CopyOnWriteArrayList<>();
46 private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE);
47 private final long _created=System.currentTimeMillis();
48 private final EndPoint _endPoint;
49 private final Executor _executor;
50 private final Callback _readCallback;
51 private final boolean _executeOnfillable;
52 private int _inputBufferSize=2048;
53
54 protected AbstractConnection(EndPoint endp, Executor executor)
55 {
56 this(endp,executor,EXECUTE_ONFILLABLE);
57 }
58
59 protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
60 {
61 if (executor == null)
62 throw new IllegalArgumentException("Executor must not be null!");
63 _endPoint = endp;
64 _executor = executor;
65 _readCallback = new ReadCallback();
66 _executeOnfillable=executeOnfillable;
67 }
68
69 @Override
70 public void addListener(Listener listener)
71 {
72 listeners.add(listener);
73 }
74
75 public int getInputBufferSize()
76 {
77 return _inputBufferSize;
78 }
79
80 public void setInputBufferSize(int inputBufferSize)
81 {
82 _inputBufferSize = inputBufferSize;
83 }
84
85 protected Executor getExecutor()
86 {
87 return _executor;
88 }
89
90
91
92
93
94
95
96 public void fillInterested()
97 {
98 LOG.debug("fillInterested {}",this);
99
100 loop:while(true)
101 {
102 switch(_state.get())
103 {
104 case IDLE:
105 if (_state.compareAndSet(State.IDLE,State.INTERESTED))
106 {
107 getEndPoint().fillInterested(_readCallback);
108 break loop;
109 }
110 break;
111
112 case FILLING:
113 if (_state.compareAndSet(State.FILLING,State.FILLING_INTERESTED))
114 break loop;
115 break;
116
117 case FILLING_BLOCKED:
118 if (_state.compareAndSet(State.FILLING_BLOCKED,State.FILLING_BLOCKED_INTERESTED))
119 break loop;
120 break;
121
122 case BLOCKED:
123 if (_state.compareAndSet(State.BLOCKED,State.BLOCKED_INTERESTED))
124 break loop;
125 break;
126
127 case FILLING_BLOCKED_INTERESTED:
128 case FILLING_INTERESTED:
129 case BLOCKED_INTERESTED:
130 case INTERESTED:
131 break loop;
132 }
133 }
134 }
135
136
137 private void unblock()
138 {
139 LOG.debug("unblock {}",this);
140
141 loop:while(true)
142 {
143 switch(_state.get())
144 {
145 case FILLING_BLOCKED:
146 if (_state.compareAndSet(State.FILLING_BLOCKED,State.FILLING))
147 break loop;
148 break;
149
150 case FILLING_BLOCKED_INTERESTED:
151 if (_state.compareAndSet(State.FILLING_BLOCKED_INTERESTED,State.FILLING_INTERESTED))
152 break loop;
153 break;
154
155 case BLOCKED_INTERESTED:
156 if (_state.compareAndSet(State.BLOCKED_INTERESTED,State.INTERESTED))
157 {
158 getEndPoint().fillInterested(_readCallback);
159 break loop;
160 }
161 break;
162
163 case BLOCKED:
164 if (_state.compareAndSet(State.BLOCKED,State.IDLE))
165 break loop;
166 break;
167
168 case FILLING:
169 case IDLE:
170 case FILLING_INTERESTED:
171 case INTERESTED:
172 break loop;
173 }
174 }
175 }
176
177
178
179
180 protected void block(final BlockingCallback callback)
181 {
182 LOG.debug("block {}",this);
183
184 final Callback blocked=new Callback()
185 {
186 @Override
187 public void succeeded()
188 {
189 unblock();
190 callback.succeeded();
191 }
192
193 @Override
194 public void failed(Throwable x)
195 {
196 unblock();
197 callback.failed(x);
198 }
199 };
200
201 loop:while(true)
202 {
203 switch(_state.get())
204 {
205 case IDLE:
206 if (_state.compareAndSet(State.IDLE,State.BLOCKED))
207 {
208 getEndPoint().fillInterested(blocked);
209 break loop;
210 }
211 break;
212
213 case FILLING:
214 if (_state.compareAndSet(State.FILLING,State.FILLING_BLOCKED))
215 {
216 getEndPoint().fillInterested(blocked);
217 break loop;
218 }
219 break;
220
221 case FILLING_INTERESTED:
222 if (_state.compareAndSet(State.FILLING_INTERESTED,State.FILLING_BLOCKED_INTERESTED))
223 {
224 getEndPoint().fillInterested(blocked);
225 break loop;
226 }
227 break;
228
229 case BLOCKED:
230 case BLOCKED_INTERESTED:
231 case FILLING_BLOCKED:
232 case FILLING_BLOCKED_INTERESTED:
233 throw new IllegalStateException("Already Blocked");
234
235 case INTERESTED:
236 throw new IllegalStateException();
237 }
238 }
239 }
240
241
242
243
244
245 public abstract void onFillable();
246
247
248
249
250
251 protected void onFillInterestedFailed(Throwable cause)
252 {
253 LOG.debug("{} onFillInterestedFailed {}", this, cause);
254 if (_endPoint.isOpen())
255 {
256 boolean close = true;
257 if (cause instanceof TimeoutException)
258 close = onReadTimeout();
259 if (close)
260 {
261 if (_endPoint.isOutputShutdown())
262 _endPoint.close();
263 else
264 _endPoint.shutdownOutput();
265 }
266 }
267 }
268
269
270
271
272
273 protected boolean onReadTimeout()
274 {
275 return true;
276 }
277
278 @Override
279 public void onOpen()
280 {
281 LOG.debug("onOpen {}", this);
282
283 for (Listener listener : listeners)
284 listener.onOpened(this);
285 }
286
287 @Override
288 public void onClose()
289 {
290 LOG.debug("onClose {}",this);
291
292 for (Listener listener : listeners)
293 listener.onClosed(this);
294 }
295
296 @Override
297 public EndPoint getEndPoint()
298 {
299 return _endPoint;
300 }
301
302 @Override
303 public void close()
304 {
305 getEndPoint().close();
306 }
307
308 @Override
309 public int getMessagesIn()
310 {
311 return -1;
312 }
313
314 @Override
315 public int getMessagesOut()
316 {
317 return -1;
318 }
319
320 @Override
321 public long getBytesIn()
322 {
323 return -1;
324 }
325
326 @Override
327 public long getBytesOut()
328 {
329 return -1;
330 }
331
332 @Override
333 public long getCreatedTimeStamp()
334 {
335 return _created;
336 }
337
338 @Override
339 public String toString()
340 {
341 return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), _state.get());
342 }
343
344 private enum State
345 {
346 IDLE, INTERESTED, FILLING, FILLING_INTERESTED, FILLING_BLOCKED, BLOCKED, FILLING_BLOCKED_INTERESTED, BLOCKED_INTERESTED
347 }
348
349 private class ReadCallback implements Callback, Runnable
350 {
351 @Override
352 public void run()
353 {
354 if (_state.compareAndSet(State.INTERESTED,State.FILLING))
355 {
356 try
357 {
358 onFillable();
359 }
360 finally
361 {
362 loop:while(true)
363 {
364 switch(_state.get())
365 {
366 case IDLE:
367 case INTERESTED:
368 case BLOCKED:
369 case BLOCKED_INTERESTED:
370 LOG.warn(new IllegalStateException());
371 return;
372
373 case FILLING:
374 if (_state.compareAndSet(State.FILLING,State.IDLE))
375 break loop;
376 break;
377
378 case FILLING_BLOCKED:
379 if (_state.compareAndSet(State.FILLING_BLOCKED,State.BLOCKED))
380 break loop;
381 break;
382
383 case FILLING_BLOCKED_INTERESTED:
384 if (_state.compareAndSet(State.FILLING_BLOCKED_INTERESTED,State.BLOCKED_INTERESTED))
385 break loop;
386 break;
387
388 case FILLING_INTERESTED:
389 if (_state.compareAndSet(State.FILLING_INTERESTED,State.INTERESTED))
390 {
391 getEndPoint().fillInterested(_readCallback);
392 break loop;
393 }
394 break;
395 }
396 }
397 }
398 }
399 else
400 LOG.warn(new IllegalStateException());
401 }
402
403 @Override
404 public void succeeded()
405 {
406 if (_executeOnfillable)
407 _executor.execute(this);
408 else
409 run();
410 }
411
412 @Override
413 public void failed(final Throwable x)
414 {
415 _executor.execute(new Runnable()
416 {
417 @Override
418 public void run()
419 {
420 onFillInterestedFailed(x);
421 }
422 });
423 }
424
425 @Override
426 public String toString()
427 {
428 return String.format("AC.ExReadCB@%x", AbstractConnection.this.hashCode());
429 }
430 };
431 }