1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.util.List;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.TimeoutException;
26
27 import org.eclipse.jetty.util.Callback;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30
31
32
33
34
35
36
37
38 public abstract class AbstractConnection implements Connection
39 {
40 private static final Logger LOG = Log.getLogger(AbstractConnection.class);
41
42 private final List<Listener> listeners = new CopyOnWriteArrayList<>();
43 private final long _created=System.currentTimeMillis();
44 private final EndPoint _endPoint;
45 private final Executor _executor;
46 private final Callback _readCallback;
47 private int _inputBufferSize=2048;
48
49 protected AbstractConnection(EndPoint endp, Executor executor)
50 {
51 if (executor == null)
52 throw new IllegalArgumentException("Executor must not be null!");
53 _endPoint = endp;
54 _executor = executor;
55 _readCallback = new ReadCallback();
56 }
57
58 @Override
59 public void addListener(Listener listener)
60 {
61 listeners.add(listener);
62 }
63
64 public int getInputBufferSize()
65 {
66 return _inputBufferSize;
67 }
68
69 public void setInputBufferSize(int inputBufferSize)
70 {
71 _inputBufferSize = inputBufferSize;
72 }
73
74 protected Executor getExecutor()
75 {
76 return _executor;
77 }
78
79 @Deprecated
80 public boolean isDispatchIO()
81 {
82 return false;
83 }
84
85 protected void failedCallback(final Callback callback, final Throwable x)
86 {
87 if (callback.isNonBlocking())
88 {
89 try
90 {
91 callback.failed(x);
92 }
93 catch (Exception e)
94 {
95 LOG.warn(e);
96 }
97 }
98 else
99 {
100 try
101 {
102 getExecutor().execute(new Runnable()
103 {
104 @Override
105 public void run()
106 {
107 try
108 {
109 callback.failed(x);
110 }
111 catch (Exception e)
112 {
113 LOG.warn(e);
114 }
115 }
116 });
117 }
118 catch(RejectedExecutionException e)
119 {
120 LOG.debug(e);
121 callback.failed(x);
122 }
123 }
124 }
125
126
127
128
129
130
131
132 public void fillInterested()
133 {
134 if (LOG.isDebugEnabled())
135 LOG.debug("fillInterested {}",this);
136 getEndPoint().fillInterested(_readCallback);
137 }
138
139 public boolean isFillInterested()
140 {
141 return getEndPoint().isFillInterested();
142 }
143
144
145
146
147
148 public abstract void onFillable();
149
150
151
152
153
154 protected void onFillInterestedFailed(Throwable cause)
155 {
156 if (LOG.isDebugEnabled())
157 LOG.debug("{} onFillInterestedFailed {}", this, cause);
158 if (_endPoint.isOpen())
159 {
160 boolean close = true;
161 if (cause instanceof TimeoutException)
162 close = onReadTimeout();
163 if (close)
164 {
165 if (_endPoint.isOutputShutdown())
166 _endPoint.close();
167 else
168 {
169 _endPoint.shutdownOutput();
170 fillInterested();
171 }
172 }
173 }
174 }
175
176
177
178
179
180 protected boolean onReadTimeout()
181 {
182 return true;
183 }
184
185 @Override
186 public void onOpen()
187 {
188 if (LOG.isDebugEnabled())
189 LOG.debug("onOpen {}", this);
190
191 for (Listener listener : listeners)
192 listener.onOpened(this);
193 }
194
195 @Override
196 public void onClose()
197 {
198 if (LOG.isDebugEnabled())
199 LOG.debug("onClose {}",this);
200
201 for (Listener listener : listeners)
202 listener.onClosed(this);
203 }
204
205 @Override
206 public EndPoint getEndPoint()
207 {
208 return _endPoint;
209 }
210
211 @Override
212 public void close()
213 {
214 getEndPoint().close();
215 }
216
217 @Override
218 public int getMessagesIn()
219 {
220 return -1;
221 }
222
223 @Override
224 public int getMessagesOut()
225 {
226 return -1;
227 }
228
229 @Override
230 public long getBytesIn()
231 {
232 return -1;
233 }
234
235 @Override
236 public long getBytesOut()
237 {
238 return -1;
239 }
240
241 @Override
242 public long getCreatedTimeStamp()
243 {
244 return _created;
245 }
246
247 @Override
248 public String toString()
249 {
250 return String.format("%s@%x[%s]",
251 getClass().getSimpleName(),
252 hashCode(),
253 _endPoint);
254 }
255
256 private class ReadCallback implements Callback
257 {
258 @Override
259 public void succeeded()
260 {
261 onFillable();
262 }
263
264 @Override
265 public void failed(final Throwable x)
266 {
267 onFillInterestedFailed(x);
268 }
269
270 @Override
271 public String toString()
272 {
273 return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this);
274 }
275 }
276 }