1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.nio.channels.CancelledKeyException;
22 import java.nio.channels.SelectionKey;
23 import java.nio.channels.SocketChannel;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.util.thread.Scheduler;
31
32
33
34
35 public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorManager.SelectableEndPoint
36 {
37 public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
38
39 private final Runnable _updateTask = new Runnable()
40 {
41 @Override
42 public void run()
43 {
44 try
45 {
46 if (getChannel().isOpen())
47 {
48 int oldInterestOps = _key.interestOps();
49 int newInterestOps = _interestOps.get();
50 if (newInterestOps != oldInterestOps)
51 setKeyInterests(oldInterestOps, newInterestOps);
52 }
53 }
54 catch (CancelledKeyException x)
55 {
56 LOG.debug("Ignoring key update for concurrently closed channel {}", this);
57 close();
58 }
59 catch (Exception x)
60 {
61 LOG.warn("Ignoring key update for " + this, x);
62 close();
63 }
64 }
65 };
66
67
68
69
70 private final AtomicBoolean _open = new AtomicBoolean();
71 private final SelectorManager.ManagedSelector _selector;
72 private final SelectionKey _key;
73
74
75
76 private final AtomicInteger _interestOps = new AtomicInteger();
77
78 public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
79 {
80 super(scheduler,channel);
81 _selector = selector;
82 _key = key;
83 setIdleTimeout(idleTimeout);
84 }
85
86 @Override
87 protected boolean needsFill()
88 {
89 updateLocalInterests(SelectionKey.OP_READ, true);
90 return false;
91 }
92
93 @Override
94 protected void onIncompleteFlush()
95 {
96 updateLocalInterests(SelectionKey.OP_WRITE, true);
97 }
98
99 @Override
100 public void onSelected()
101 {
102 assert _selector.isSelectorThread();
103 int oldInterestOps = _key.interestOps();
104 int readyOps = _key.readyOps();
105 int newInterestOps = oldInterestOps & ~readyOps;
106 setKeyInterests(oldInterestOps, newInterestOps);
107 updateLocalInterests(readyOps, false);
108 if (_key.isReadable())
109 getFillInterest().fillable();
110 if (_key.isWritable())
111 getWriteFlusher().completeWrite();
112 }
113
114
115 private void updateLocalInterests(int operation, boolean add)
116 {
117 while (true)
118 {
119 int oldInterestOps = _interestOps.get();
120 int newInterestOps;
121 if (add)
122 newInterestOps = oldInterestOps | operation;
123 else
124 newInterestOps = oldInterestOps & ~operation;
125
126 if (isInputShutdown())
127 newInterestOps &= ~SelectionKey.OP_READ;
128 if (isOutputShutdown())
129 newInterestOps &= ~SelectionKey.OP_WRITE;
130
131 if (newInterestOps != oldInterestOps)
132 {
133 if (_interestOps.compareAndSet(oldInterestOps, newInterestOps))
134 {
135 LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this);
136 _selector.submit(_updateTask);
137 }
138 else
139 {
140 LOG.debug("Local interests update conflict: now {}, was {}, attempted {} for {}", _interestOps.get(), oldInterestOps, newInterestOps, this);
141 continue;
142 }
143 }
144 else
145 {
146 LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this);
147 }
148 break;
149 }
150 }
151
152
153 private void setKeyInterests(int oldInterestOps, int newInterestOps)
154 {
155 assert _selector.isSelectorThread();
156 LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps);
157 _key.interestOps(newInterestOps);
158 }
159
160 @Override
161 public void close()
162 {
163 if (_open.compareAndSet(true, false))
164 {
165 super.close();
166 _selector.destroyEndPoint(this);
167 }
168 }
169
170 @Override
171 public boolean isOpen()
172 {
173
174
175
176 return _open.get();
177 }
178
179 @Override
180 public void onOpen()
181 {
182 if (_open.compareAndSet(false, true))
183 super.onOpen();
184 }
185
186 @Override
187 public String toString()
188 {
189
190
191
192 try
193 {
194 boolean valid = _key!=null && _key.isValid();
195 int keyInterests = valid ? _key.interestOps() : -1;
196 int keyReadiness = valid ? _key.readyOps() : -1;
197 return String.format("%s{io=%d,kio=%d,kro=%d}",
198 super.toString(),
199 _interestOps.get(),
200 keyInterests,
201 keyReadiness);
202 }
203 catch (CancelledKeyException x)
204 {
205 return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _interestOps.get());
206 }
207 }
208 }