1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.util.List;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.TimeoutException;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.eclipse.jetty.util.Callback;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30
31
32
33
34
35
36
37
38 public abstract class AbstractConnection implements Connection
39 {
40 private static final Logger LOG = Log.getLogger(AbstractConnection.class);
41
42 public static final boolean EXECUTE_ONFILLABLE=true;
43
44 private final List<Listener> listeners = new CopyOnWriteArrayList<>();
45 private final AtomicReference<State> _state = new AtomicReference<>(IDLE);
46 private final long _created=System.currentTimeMillis();
47 private final EndPoint _endPoint;
48 private final Executor _executor;
49 private final Callback _readCallback;
50 private final boolean _executeOnfillable;
51 private int _inputBufferSize=2048;
52
53 protected AbstractConnection(EndPoint endp, Executor executor)
54 {
55 this(endp,executor,EXECUTE_ONFILLABLE);
56 }
57
58 protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
59 {
60 if (executor == null)
61 throw new IllegalArgumentException("Executor must not be null!");
62 _endPoint = endp;
63 _executor = executor;
64 _readCallback = new ReadCallback();
65 _executeOnfillable=executeOnfillable;
66 _state.set(IDLE);
67 }
68
69 @Override
70 public void addListener(Listener listener)
71 {
72 listeners.add(listener);
73 }
74
75 public int getInputBufferSize()
76 {
77 return _inputBufferSize;
78 }
79
80 public void setInputBufferSize(int inputBufferSize)
81 {
82 _inputBufferSize = inputBufferSize;
83 }
84
85 protected Executor getExecutor()
86 {
87 return _executor;
88 }
89
90
91
92
93
94
95
96 public void fillInterested()
97 {
98 LOG.debug("fillInterested {}",this);
99
100 while(true)
101 {
102 State state=_state.get();
103 if (next(state,state.fillInterested()))
104 break;
105 }
106 }
107
108 public void fillInterested(Callback callback)
109 {
110 LOG.debug("fillInterested {}",this);
111
112 while(true)
113 {
114 State state=_state.get();
115
116 if (state instanceof FillingInterestedCallback && ((FillingInterestedCallback)state)._callback==callback)
117 break;
118 State next=new FillingInterestedCallback(callback,state);
119 if (next(state,next))
120 break;
121 }
122 }
123
124
125
126
127
128 public abstract void onFillable();
129
130
131
132
133
134 protected void onFillInterestedFailed(Throwable cause)
135 {
136 LOG.debug("{} onFillInterestedFailed {}", this, cause);
137 if (_endPoint.isOpen())
138 {
139 boolean close = true;
140 if (cause instanceof TimeoutException)
141 close = onReadTimeout();
142 if (close)
143 {
144 if (_endPoint.isOutputShutdown())
145 _endPoint.close();
146 else
147 _endPoint.shutdownOutput();
148 }
149 }
150
151 if (_endPoint.isOpen())
152 fillInterested();
153 }
154
155
156
157
158
159 protected boolean onReadTimeout()
160 {
161 return true;
162 }
163
164 @Override
165 public void onOpen()
166 {
167 LOG.debug("onOpen {}", this);
168
169 for (Listener listener : listeners)
170 listener.onOpened(this);
171 }
172
173 @Override
174 public void onClose()
175 {
176 LOG.debug("onClose {}",this);
177
178 for (Listener listener : listeners)
179 listener.onClosed(this);
180 }
181
182 @Override
183 public EndPoint getEndPoint()
184 {
185 return _endPoint;
186 }
187
188 @Override
189 public void close()
190 {
191 getEndPoint().close();
192 }
193
194 @Override
195 public int getMessagesIn()
196 {
197 return -1;
198 }
199
200 @Override
201 public int getMessagesOut()
202 {
203 return -1;
204 }
205
206 @Override
207 public long getBytesIn()
208 {
209 return -1;
210 }
211
212 @Override
213 public long getBytesOut()
214 {
215 return -1;
216 }
217
218 @Override
219 public long getCreatedTimeStamp()
220 {
221 return _created;
222 }
223
224 @Override
225 public String toString()
226 {
227 return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), _state.get());
228 }
229
230 public boolean next(State state, State next)
231 {
232 if (next==null)
233 return true;
234 if(_state.compareAndSet(state,next))
235 {
236 LOG.debug("{}-->{} {}",state,next,this);
237 if (next!=state)
238 next.onEnter(AbstractConnection.this);
239 return true;
240 }
241 return false;
242 }
243
244 private static final class IdleState extends State
245 {
246 private IdleState()
247 {
248 super("IDLE");
249 }
250
251 @Override
252 State fillInterested()
253 {
254 return FILL_INTERESTED;
255 }
256 }
257
258
259 private static final class FillInterestedState extends State
260 {
261 private FillInterestedState()
262 {
263 super("FILL_INTERESTED");
264 }
265
266 @Override
267 public void onEnter(AbstractConnection connection)
268 {
269 connection.getEndPoint().fillInterested(connection._readCallback);
270 }
271
272 @Override
273 State fillInterested()
274 {
275 return this;
276 }
277
278 @Override
279 public State onFillable()
280 {
281 return FILLING;
282 }
283
284 @Override
285 State onFailed()
286 {
287 return IDLE;
288 }
289 }
290
291
292 private static final class RefillingState extends State
293 {
294 private RefillingState()
295 {
296 super("REFILLING");
297 }
298
299 @Override
300 State fillInterested()
301 {
302 return FILLING_FILL_INTERESTED;
303 }
304
305 @Override
306 public State onFilled()
307 {
308 return IDLE;
309 }
310 }
311
312
313 private static final class FillingFillInterestedState extends State
314 {
315 private FillingFillInterestedState(String name)
316 {
317 super(name);
318 }
319
320 @Override
321 State fillInterested()
322 {
323 return this;
324 }
325
326 State onFilled()
327 {
328 return FILL_INTERESTED;
329 }
330 }
331
332
333 private static final class FillingState extends State
334 {
335 private FillingState()
336 {
337 super("FILLING");
338 }
339
340 @Override
341 public void onEnter(AbstractConnection connection)
342 {
343 if (connection._executeOnfillable)
344 connection.getExecutor().execute(connection._runOnFillable);
345 else
346 connection._runOnFillable.run();
347 }
348
349 @Override
350 State fillInterested()
351 {
352 return FILLING_FILL_INTERESTED;
353 }
354
355 @Override
356 public State onFilled()
357 {
358 return IDLE;
359 }
360 }
361
362
363 public static class State
364 {
365 private final String _name;
366 State(String name)
367 {
368 _name=name;
369 }
370
371 @Override
372 public String toString()
373 {
374 return _name;
375 }
376
377 void onEnter(AbstractConnection connection)
378 {
379 }
380
381 State fillInterested()
382 {
383 throw new IllegalStateException(this.toString());
384 }
385
386 State onFillable()
387 {
388 throw new IllegalStateException(this.toString());
389 }
390
391 State onFilled()
392 {
393 throw new IllegalStateException(this.toString());
394 }
395
396 State onFailed()
397 {
398 throw new IllegalStateException(this.toString());
399 }
400 }
401
402
403 public static final State IDLE=new IdleState();
404
405 public static final State FILL_INTERESTED=new FillInterestedState();
406
407 public static final State FILLING=new FillingState();
408
409 public static final State REFILLING=new RefillingState();
410
411 public static final State FILLING_FILL_INTERESTED=new FillingFillInterestedState("FILLING_FILL_INTERESTED");
412
413 public class NestedState extends State
414 {
415 private final State _nested;
416
417 NestedState(State nested)
418 {
419 super("NESTED("+nested+")");
420 _nested=nested;
421 }
422 NestedState(String name,State nested)
423 {
424 super(name+"("+nested+")");
425 _nested=nested;
426 }
427
428 @Override
429 State fillInterested()
430 {
431 return new NestedState(_nested.fillInterested());
432 }
433
434 @Override
435 State onFillable()
436 {
437 return new NestedState(_nested.onFillable());
438 }
439
440 @Override
441 State onFilled()
442 {
443 return new NestedState(_nested.onFilled());
444 }
445 }
446
447
448 public class FillingInterestedCallback extends NestedState
449 {
450 private final Callback _callback;
451
452 FillingInterestedCallback(Callback callback,State nested)
453 {
454 super("FILLING_INTERESTED_CALLBACK",nested==FILLING?REFILLING:nested);
455 _callback=callback;
456 }
457
458 @Override
459 void onEnter(final AbstractConnection connection)
460 {
461 Callback callback=new Callback()
462 {
463 @Override
464 public void succeeded()
465 {
466 while(true)
467 {
468 State state = connection._state.get();
469 if (!(state instanceof NestedState))
470 break;
471 State nested=((NestedState)state)._nested;
472 if (connection.next(state,nested))
473 break;
474 }
475 _callback.succeeded();
476 }
477
478 @Override
479 public void failed(Throwable x)
480 {
481 while(true)
482 {
483 State state = connection._state.get();
484 if (!(state instanceof NestedState))
485 break;
486 State nested=((NestedState)state)._nested;
487 if (connection.next(state,nested))
488 break;
489 }
490 _callback.failed(x);
491 }
492 };
493
494 connection.getEndPoint().fillInterested(callback);
495 }
496 }
497
498 private final Runnable _runOnFillable = new Runnable()
499 {
500 @Override
501 public void run()
502 {
503 try
504 {
505 onFillable();
506 }
507 finally
508 {
509 while(true)
510 {
511 State state=_state.get();
512 if (next(state,state.onFilled()))
513 break;
514 }
515 }
516 }
517 };
518
519
520 private class ReadCallback implements Callback
521 {
522 @Override
523 public void succeeded()
524 {
525 while(true)
526 {
527 State state=_state.get();
528 if (next(state,state.onFillable()))
529 break;
530 }
531 }
532
533 @Override
534 public void failed(final Throwable x)
535 {
536 _executor.execute(new Runnable()
537 {
538 @Override
539 public void run()
540 {
541 while(true)
542 {
543 State state=_state.get();
544 if (next(state,state.onFailed()))
545 break;
546 }
547 onFillInterestedFailed(x);
548 }
549 });
550 }
551
552 @Override
553 public String toString()
554 {
555 return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this);
556 }
557 };
558 }