1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.util.List;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.TimeoutException;
26
27 import org.eclipse.jetty.util.Callback;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30
31
32
33
34
35
36
37
38 public abstract class AbstractConnection implements Connection
39 {
40 private static final Logger LOG = Log.getLogger(AbstractConnection.class);
41
42 private final List<Listener> listeners = new CopyOnWriteArrayList<>();
43 private final long _created=System.currentTimeMillis();
44 private final EndPoint _endPoint;
45 private final Executor _executor;
46 private final Callback _readCallback;
47 private int _inputBufferSize=2048;
48
49 protected AbstractConnection(EndPoint endp, Executor executor)
50 {
51 if (executor == null)
52 throw new IllegalArgumentException("Executor must not be null!");
53 _endPoint = endp;
54 _executor = executor;
55 _readCallback = new ReadCallback();
56 }
57
58 @Override
59 public void addListener(Listener listener)
60 {
61 listeners.add(listener);
62 }
63
64 @Override
65 public void removeListener(Listener listener)
66 {
67 listeners.remove(listener);
68 }
69
70 public int getInputBufferSize()
71 {
72 return _inputBufferSize;
73 }
74
75 public void setInputBufferSize(int inputBufferSize)
76 {
77 _inputBufferSize = inputBufferSize;
78 }
79
80 protected Executor getExecutor()
81 {
82 return _executor;
83 }
84
85 @Deprecated
86 public boolean isDispatchIO()
87 {
88 return false;
89 }
90
91 protected void failedCallback(final Callback callback, final Throwable x)
92 {
93 if (callback.isNonBlocking())
94 {
95 try
96 {
97 callback.failed(x);
98 }
99 catch (Exception e)
100 {
101 LOG.warn(e);
102 }
103 }
104 else
105 {
106 try
107 {
108 getExecutor().execute(new Runnable()
109 {
110 @Override
111 public void run()
112 {
113 try
114 {
115 callback.failed(x);
116 }
117 catch (Exception e)
118 {
119 LOG.warn(e);
120 }
121 }
122 });
123 }
124 catch(RejectedExecutionException e)
125 {
126 LOG.debug(e);
127 callback.failed(x);
128 }
129 }
130 }
131
132
133
134
135
136
137
138 public void fillInterested()
139 {
140 if (LOG.isDebugEnabled())
141 LOG.debug("fillInterested {}",this);
142 getEndPoint().fillInterested(_readCallback);
143 }
144
145 public boolean isFillInterested()
146 {
147 return getEndPoint().isFillInterested();
148 }
149
150
151
152
153
154 public abstract void onFillable();
155
156
157
158
159
160 protected void onFillInterestedFailed(Throwable cause)
161 {
162 if (LOG.isDebugEnabled())
163 LOG.debug("{} onFillInterestedFailed {}", this, cause);
164 if (_endPoint.isOpen())
165 {
166 boolean close = true;
167 if (cause instanceof TimeoutException)
168 close = onReadTimeout();
169 if (close)
170 {
171 if (_endPoint.isOutputShutdown())
172 _endPoint.close();
173 else
174 {
175 _endPoint.shutdownOutput();
176 fillInterested();
177 }
178 }
179 }
180 }
181
182
183
184
185
186 protected boolean onReadTimeout()
187 {
188 return true;
189 }
190
191 @Override
192 public void onOpen()
193 {
194 if (LOG.isDebugEnabled())
195 LOG.debug("onOpen {}", this);
196
197 for (Listener listener : listeners)
198 listener.onOpened(this);
199 }
200
201 @Override
202 public void onClose()
203 {
204 if (LOG.isDebugEnabled())
205 LOG.debug("onClose {}",this);
206
207 for (Listener listener : listeners)
208 listener.onClosed(this);
209 }
210
211 @Override
212 public EndPoint getEndPoint()
213 {
214 return _endPoint;
215 }
216
217 @Override
218 public void close()
219 {
220 getEndPoint().close();
221 }
222
223 @Override
224 public boolean onIdleExpired()
225 {
226 return true;
227 }
228
229 @Override
230 public int getMessagesIn()
231 {
232 return -1;
233 }
234
235 @Override
236 public int getMessagesOut()
237 {
238 return -1;
239 }
240
241 @Override
242 public long getBytesIn()
243 {
244 return -1;
245 }
246
247 @Override
248 public long getBytesOut()
249 {
250 return -1;
251 }
252
253 @Override
254 public long getCreatedTimeStamp()
255 {
256 return _created;
257 }
258
259 @Override
260 public String toString()
261 {
262 return String.format("%s@%x[%s]",
263 getClass().getSimpleName(),
264 hashCode(),
265 _endPoint);
266 }
267
268 private class ReadCallback implements Callback
269 {
270 @Override
271 public void succeeded()
272 {
273 onFillable();
274 }
275
276 @Override
277 public void failed(final Throwable x)
278 {
279 onFillInterestedFailed(x);
280 }
281
282 @Override
283 public String toString()
284 {
285 return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this);
286 }
287 }
288 }