1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.nio.channels.CancelledKeyException;
22 import java.nio.channels.SelectionKey;
23 import java.nio.channels.SocketChannel;
24 import java.util.concurrent.atomic.AtomicBoolean;
25
26 import org.eclipse.jetty.util.log.Log;
27 import org.eclipse.jetty.util.log.Logger;
28 import org.eclipse.jetty.util.thread.Locker;
29 import org.eclipse.jetty.util.thread.Scheduler;
30
31
32
33
34 public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSelector.SelectableEndPoint
35 {
36 public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
37
38 private final Locker _locker = new Locker();
39 private boolean _updatePending;
40
41
42
43
44 private final AtomicBoolean _open = new AtomicBoolean();
45 private final ManagedSelector _selector;
46 private final SelectionKey _key;
47
48
49
50 private int _currentInterestOps;
51
52
53
54 private int _desiredInterestOps;
55
56 private final Runnable _runUpdateKey = new Runnable()
57 {
58 @Override
59 public void run()
60 {
61 updateKey();
62 }
63
64 @Override
65 public String toString()
66 {
67 return SelectChannelEndPoint.this.toString()+":runUpdateKey";
68 }
69 };
70 private final Runnable _runFillable = new Runnable()
71 {
72 @Override
73 public void run()
74 {
75 getFillInterest().fillable();
76 }
77
78 @Override
79 public String toString()
80 {
81 return SelectChannelEndPoint.this.toString()+":runFillable";
82 }
83 };
84 private final Runnable _runCompleteWrite = new Runnable()
85 {
86 @Override
87 public void run()
88 {
89 getWriteFlusher().completeWrite();
90 }
91
92 @Override
93 public String toString()
94 {
95 return SelectChannelEndPoint.this.toString()+":runCompleteWrite";
96 }
97 };
98 private final Runnable _runFillableCompleteWrite = new Runnable()
99 {
100 @Override
101 public void run()
102 {
103 getFillInterest().fillable();
104 getWriteFlusher().completeWrite();
105 }
106
107 @Override
108 public String toString()
109 {
110 return SelectChannelEndPoint.this.toString()+":runFillableCompleteWrite";
111 }
112 };
113
114 public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
115 {
116 super(scheduler, channel);
117 _selector = selector;
118 _key = key;
119 setIdleTimeout(idleTimeout);
120 }
121
122 @Override
123 protected void needsFillInterest()
124 {
125 changeInterests(SelectionKey.OP_READ);
126 }
127
128 @Override
129 protected void onIncompleteFlush()
130 {
131 changeInterests(SelectionKey.OP_WRITE);
132 }
133
134 @Override
135 public Runnable onSelected()
136 {
137
138
139
140
141 int readyOps = _key.readyOps();
142 int oldInterestOps;
143 int newInterestOps;
144 try (Locker.Lock lock = _locker.lock())
145 {
146 _updatePending = true;
147
148 oldInterestOps = _desiredInterestOps;
149 newInterestOps = oldInterestOps & ~readyOps;
150 _desiredInterestOps = newInterestOps;
151 }
152
153
154 boolean readable = (readyOps & SelectionKey.OP_READ) != 0;
155 boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0;
156
157
158 if (LOG.isDebugEnabled())
159 LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, readable, writable, this);
160
161
162
163
164
165
166 if (readable && getFillInterest().isCallbackNonBlocking())
167 {
168 if (LOG.isDebugEnabled())
169 LOG.debug("Direct readable run {}",this);
170 _runFillable.run();
171 readable = false;
172 }
173 if (writable && getWriteFlusher().isCallbackNonBlocking())
174 {
175 if (LOG.isDebugEnabled())
176 LOG.debug("Direct writable run {}",this);
177 _runCompleteWrite.run();
178 writable = false;
179 }
180
181
182 Runnable task= readable ? (writable ? _runFillableCompleteWrite : _runFillable)
183 : (writable ? _runCompleteWrite : null);
184
185 if (LOG.isDebugEnabled())
186 LOG.debug("task {}",task);
187 return task;
188 }
189
190 @Override
191 public void updateKey()
192 {
193
194
195
196
197 try
198 {
199 int oldInterestOps;
200 int newInterestOps;
201 try (Locker.Lock lock = _locker.lock())
202 {
203 _updatePending = false;
204 oldInterestOps = _currentInterestOps;
205 newInterestOps = _desiredInterestOps;
206 if (oldInterestOps != newInterestOps)
207 {
208 _currentInterestOps = newInterestOps;
209 _key.interestOps(newInterestOps);
210 }
211 }
212
213 if (LOG.isDebugEnabled())
214 LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this);
215 }
216 catch (CancelledKeyException x)
217 {
218 LOG.debug("Ignoring key update for concurrently closed channel {}", this);
219 close();
220 }
221 catch (Throwable x)
222 {
223 LOG.warn("Ignoring key update for " + this, x);
224 close();
225 }
226 }
227
228 private void changeInterests(int operation)
229 {
230
231
232
233
234
235 int oldInterestOps;
236 int newInterestOps;
237 boolean pending;
238 try (Locker.Lock lock = _locker.lock())
239 {
240 pending = _updatePending;
241 oldInterestOps = _desiredInterestOps;
242 newInterestOps = oldInterestOps | operation;
243 if (newInterestOps != oldInterestOps)
244 _desiredInterestOps = newInterestOps;
245 }
246
247 if (LOG.isDebugEnabled())
248 LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this);
249
250 if (!pending)
251 _selector.submit(_runUpdateKey);
252 }
253
254
255 @Override
256 public void close()
257 {
258 if (_open.compareAndSet(true, false))
259 {
260 super.close();
261 _selector.destroyEndPoint(this);
262 }
263 }
264
265 @Override
266 public boolean isOpen()
267 {
268
269
270
271 return _open.get();
272 }
273
274 @Override
275 public void onOpen()
276 {
277 if (_open.compareAndSet(false, true))
278 super.onOpen();
279 }
280
281 @Override
282 public String toString()
283 {
284
285 try
286 {
287 boolean valid = _key != null && _key.isValid();
288 int keyInterests = valid ? _key.interestOps() : -1;
289 int keyReadiness = valid ? _key.readyOps() : -1;
290 return String.format("%s{io=%d/%d,kio=%d,kro=%d}",
291 super.toString(),
292 _currentInterestOps,
293 _desiredInterestOps,
294 keyInterests,
295 keyReadiness);
296 }
297 catch (Throwable x)
298 {
299 return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _desiredInterestOps);
300 }
301 }
302 }