1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.util.List;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicReference;
27
28 import org.eclipse.jetty.util.Callback;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31 import org.eclipse.jetty.util.thread.NonBlockingThread;
32
33
34
35
36
37
38
39
40 public abstract class AbstractConnection implements Connection
41 {
42 private static final Logger LOG = Log.getLogger(AbstractConnection.class);
43
44 public static final boolean EXECUTE_ONFILLABLE=true;
45
46 private final List<Listener> listeners = new CopyOnWriteArrayList<>();
47 private final AtomicReference<State> _state = new AtomicReference<>(IDLE);
48 private final long _created=System.currentTimeMillis();
49 private final EndPoint _endPoint;
50 private final Executor _executor;
51 private final Callback _readCallback;
52 private final boolean _executeOnfillable;
53 private int _inputBufferSize=2048;
54
55 protected AbstractConnection(EndPoint endp, Executor executor)
56 {
57 this(endp,executor,EXECUTE_ONFILLABLE);
58 }
59
60 protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
61 {
62 if (executor == null)
63 throw new IllegalArgumentException("Executor must not be null!");
64 _endPoint = endp;
65 _executor = executor;
66 _readCallback = new ReadCallback();
67 _executeOnfillable=executeOnfillable;
68 _state.set(IDLE);
69 }
70
71 @Override
72 public void addListener(Listener listener)
73 {
74 listeners.add(listener);
75 }
76
77 public int getInputBufferSize()
78 {
79 return _inputBufferSize;
80 }
81
82 public void setInputBufferSize(int inputBufferSize)
83 {
84 _inputBufferSize = inputBufferSize;
85 }
86
87 protected Executor getExecutor()
88 {
89 return _executor;
90 }
91
92 protected void failedCallback(final Callback callback, final Throwable x)
93 {
94 if (NonBlockingThread.isNonBlockingThread())
95 {
96 try
97 {
98 getExecutor().execute(new Runnable()
99 {
100 @Override
101 public void run()
102 {
103 callback.failed(x);
104 }
105 });
106 }
107 catch(RejectedExecutionException e)
108 {
109 LOG.debug(e);
110 callback.failed(x);
111 }
112 }
113 else
114 {
115 callback.failed(x);
116 }
117 }
118
119
120
121
122
123
124
125 public void fillInterested()
126 {
127 if (LOG.isDebugEnabled())
128 LOG.debug("fillInterested {}",this);
129
130 while(true)
131 {
132 State state=_state.get();
133 if (next(state,state.fillInterested()))
134 break;
135 }
136 }
137
138 public void fillInterested(Callback callback)
139 {
140 if (LOG.isDebugEnabled())
141 LOG.debug("fillInterested {}",this);
142
143 while(true)
144 {
145 State state=_state.get();
146
147 if (state instanceof FillingInterestedCallback && ((FillingInterestedCallback)state)._callback==callback)
148 break;
149 State next=new FillingInterestedCallback(callback,state);
150 if (next(state,next))
151 break;
152 }
153 }
154
155
156
157
158
159 public abstract void onFillable();
160
161
162
163
164
165 protected void onFillInterestedFailed(Throwable cause)
166 {
167 if (LOG.isDebugEnabled())
168 LOG.debug("{} onFillInterestedFailed {}", this, cause);
169 if (_endPoint.isOpen())
170 {
171 boolean close = true;
172 if (cause instanceof TimeoutException)
173 close = onReadTimeout();
174 if (close)
175 {
176 if (_endPoint.isOutputShutdown())
177 _endPoint.close();
178 else
179 _endPoint.shutdownOutput();
180 }
181 }
182
183 if (_endPoint.isOpen())
184 fillInterested();
185 }
186
187
188
189
190
191 protected boolean onReadTimeout()
192 {
193 return true;
194 }
195
196 @Override
197 public void onOpen()
198 {
199 if (LOG.isDebugEnabled())
200 LOG.debug("onOpen {}", this);
201
202 for (Listener listener : listeners)
203 listener.onOpened(this);
204 }
205
206 @Override
207 public void onClose()
208 {
209 if (LOG.isDebugEnabled())
210 LOG.debug("onClose {}",this);
211
212 for (Listener listener : listeners)
213 listener.onClosed(this);
214 }
215
216 @Override
217 public EndPoint getEndPoint()
218 {
219 return _endPoint;
220 }
221
222 @Override
223 public void close()
224 {
225 getEndPoint().close();
226 }
227
228 @Override
229 public int getMessagesIn()
230 {
231 return -1;
232 }
233
234 @Override
235 public int getMessagesOut()
236 {
237 return -1;
238 }
239
240 @Override
241 public long getBytesIn()
242 {
243 return -1;
244 }
245
246 @Override
247 public long getBytesOut()
248 {
249 return -1;
250 }
251
252 @Override
253 public long getCreatedTimeStamp()
254 {
255 return _created;
256 }
257
258 @Override
259 public String toString()
260 {
261 return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), _state.get());
262 }
263
264 public boolean next(State state, State next)
265 {
266 if (next==null)
267 return true;
268 if(_state.compareAndSet(state,next))
269 {
270 if (LOG.isDebugEnabled())
271 LOG.debug("{}-->{} {}",state,next,this);
272 if (next!=state)
273 next.onEnter(AbstractConnection.this);
274 return true;
275 }
276 return false;
277 }
278
279 private static final class IdleState extends State
280 {
281 private IdleState()
282 {
283 super("IDLE");
284 }
285
286 @Override
287 State fillInterested()
288 {
289 return FILL_INTERESTED;
290 }
291 }
292
293
294 private static final class FillInterestedState extends State
295 {
296 private FillInterestedState()
297 {
298 super("FILL_INTERESTED");
299 }
300
301 @Override
302 public void onEnter(AbstractConnection connection)
303 {
304 connection.getEndPoint().fillInterested(connection._readCallback);
305 }
306
307 @Override
308 State fillInterested()
309 {
310 return this;
311 }
312
313 @Override
314 public State onFillable()
315 {
316 return FILLING;
317 }
318
319 @Override
320 State onFailed()
321 {
322 return IDLE;
323 }
324 }
325
326
327 private static final class RefillingState extends State
328 {
329 private RefillingState()
330 {
331 super("REFILLING");
332 }
333
334 @Override
335 State fillInterested()
336 {
337 return FILLING_FILL_INTERESTED;
338 }
339
340 @Override
341 public State onFilled()
342 {
343 return IDLE;
344 }
345 }
346
347
348 private static final class FillingFillInterestedState extends State
349 {
350 private FillingFillInterestedState(String name)
351 {
352 super(name);
353 }
354
355 @Override
356 State fillInterested()
357 {
358 return this;
359 }
360
361 State onFilled()
362 {
363 return FILL_INTERESTED;
364 }
365 }
366
367
368 private static final class FillingState extends State
369 {
370 private FillingState()
371 {
372 super("FILLING");
373 }
374
375 @Override
376 public void onEnter(AbstractConnection connection)
377 {
378 if (connection._executeOnfillable)
379 connection.getExecutor().execute(connection._runOnFillable);
380 else
381 connection._runOnFillable.run();
382 }
383
384 @Override
385 State fillInterested()
386 {
387 return FILLING_FILL_INTERESTED;
388 }
389
390 @Override
391 public State onFilled()
392 {
393 return IDLE;
394 }
395 }
396
397
398 public static class State
399 {
400 private final String _name;
401 State(String name)
402 {
403 _name=name;
404 }
405
406 @Override
407 public String toString()
408 {
409 return _name;
410 }
411
412 void onEnter(AbstractConnection connection)
413 {
414 }
415
416 State fillInterested()
417 {
418 throw new IllegalStateException(this.toString());
419 }
420
421 State onFillable()
422 {
423 throw new IllegalStateException(this.toString());
424 }
425
426 State onFilled()
427 {
428 throw new IllegalStateException(this.toString());
429 }
430
431 State onFailed()
432 {
433 throw new IllegalStateException(this.toString());
434 }
435 }
436
437
438 public static final State IDLE=new IdleState();
439
440 public static final State FILL_INTERESTED=new FillInterestedState();
441
442 public static final State FILLING=new FillingState();
443
444 public static final State REFILLING=new RefillingState();
445
446 public static final State FILLING_FILL_INTERESTED=new FillingFillInterestedState("FILLING_FILL_INTERESTED");
447
448 public class NestedState extends State
449 {
450 private final State _nested;
451
452 NestedState(State nested)
453 {
454 super("NESTED("+nested+")");
455 _nested=nested;
456 }
457 NestedState(String name,State nested)
458 {
459 super(name+"("+nested+")");
460 _nested=nested;
461 }
462
463 @Override
464 State fillInterested()
465 {
466 return new NestedState(_nested.fillInterested());
467 }
468
469 @Override
470 State onFillable()
471 {
472 return new NestedState(_nested.onFillable());
473 }
474
475 @Override
476 State onFilled()
477 {
478 return new NestedState(_nested.onFilled());
479 }
480 }
481
482
483 public class FillingInterestedCallback extends NestedState
484 {
485 private final Callback _callback;
486
487 FillingInterestedCallback(Callback callback,State nested)
488 {
489 super("FILLING_INTERESTED_CALLBACK",nested==FILLING?REFILLING:nested);
490 _callback=callback;
491 }
492
493 @Override
494 void onEnter(final AbstractConnection connection)
495 {
496 Callback callback=new Callback()
497 {
498 @Override
499 public void succeeded()
500 {
501 while(true)
502 {
503 State state = connection._state.get();
504 if (!(state instanceof NestedState))
505 break;
506 State nested=((NestedState)state)._nested;
507 if (connection.next(state,nested))
508 break;
509 }
510 _callback.succeeded();
511 }
512
513 @Override
514 public void failed(Throwable x)
515 {
516 while(true)
517 {
518 State state = connection._state.get();
519 if (!(state instanceof NestedState))
520 break;
521 State nested=((NestedState)state)._nested;
522 if (connection.next(state,nested))
523 break;
524 }
525 _callback.failed(x);
526 }
527 };
528
529 connection.getEndPoint().fillInterested(callback);
530 }
531 }
532
533 private final Runnable _runOnFillable = new Runnable()
534 {
535 @Override
536 public void run()
537 {
538 try
539 {
540 onFillable();
541 }
542 finally
543 {
544 while(true)
545 {
546 State state=_state.get();
547 if (next(state,state.onFilled()))
548 break;
549 }
550 }
551 }
552 };
553
554
555 private class ReadCallback implements Callback
556 {
557 @Override
558 public void succeeded()
559 {
560 while(true)
561 {
562 State state=_state.get();
563 if (next(state,state.onFillable()))
564 break;
565 }
566 }
567
568 @Override
569 public void failed(final Throwable x)
570 {
571 _executor.execute(new Runnable()
572 {
573 @Override
574 public void run()
575 {
576 while(true)
577 {
578 State state=_state.get();
579 if (next(state,state.onFailed()))
580 break;
581 }
582 onFillInterestedFailed(x);
583 }
584 });
585 }
586
587 @Override
588 public String toString()
589 {
590 return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this);
591 }
592 };
593 }