1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.nio.channels.CancelledKeyException;
22 import java.nio.channels.SelectionKey;
23 import java.nio.channels.SocketChannel;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.util.thread.Scheduler;
31
32
33
34
35 public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorManager.SelectableEndPoint
36 {
37 public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
38
39 private final Runnable _updateTask = new Runnable()
40 {
41 @Override
42 public void run()
43 {
44 try
45 {
46 if (getChannel().isOpen())
47 {
48 int oldInterestOps = _key.interestOps();
49 int newInterestOps = _interestOps.get();
50 if (newInterestOps != oldInterestOps)
51 setKeyInterests(oldInterestOps, newInterestOps);
52 }
53 }
54 catch (CancelledKeyException x)
55 {
56 LOG.debug("Ignoring key update for concurrently closed channel {}", this);
57 close();
58 }
59 catch (Exception x)
60 {
61 LOG.warn("Ignoring key update for " + this, x);
62 close();
63 }
64 }
65 };
66
67
68
69
70 private final AtomicBoolean _open = new AtomicBoolean();
71 private final SelectorManager.ManagedSelector _selector;
72 private final SelectionKey _key;
73
74
75
76 private final AtomicInteger _interestOps = new AtomicInteger();
77
78 public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
79 {
80 super(scheduler,channel);
81 _selector = selector;
82 _key = key;
83 setIdleTimeout(idleTimeout);
84 }
85
86 @Override
87 protected boolean needsFill()
88 {
89 updateLocalInterests(SelectionKey.OP_READ, true);
90 return false;
91 }
92
93 @Override
94 protected void onIncompleteFlush()
95 {
96 updateLocalInterests(SelectionKey.OP_WRITE, true);
97 }
98
99 @Override
100 public void onSelected()
101 {
102 assert _selector.isSelectorThread();
103 int oldInterestOps = _key.interestOps();
104 int readyOps = _key.readyOps();
105 int newInterestOps = oldInterestOps & ~readyOps;
106 setKeyInterests(oldInterestOps, newInterestOps);
107 updateLocalInterests(readyOps, false);
108 if (_key.isReadable())
109 getFillInterest().fillable();
110 if (_key.isWritable())
111 getWriteFlusher().completeWrite();
112 }
113
114
115 private void updateLocalInterests(int operation, boolean add)
116 {
117 while (true)
118 {
119 int oldInterestOps = _interestOps.get();
120 int newInterestOps;
121 if (add)
122 newInterestOps = oldInterestOps | operation;
123 else
124 newInterestOps = oldInterestOps & ~operation;
125
126 if (isInputShutdown())
127 newInterestOps &= ~SelectionKey.OP_READ;
128 if (isOutputShutdown())
129 newInterestOps &= ~SelectionKey.OP_WRITE;
130
131 if (newInterestOps != oldInterestOps)
132 {
133 if (_interestOps.compareAndSet(oldInterestOps, newInterestOps))
134 {
135 if (LOG.isDebugEnabled())
136 LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this);
137 _selector.updateKey(_updateTask);
138 }
139 else
140 {
141 if (LOG.isDebugEnabled())
142 LOG.debug("Local interests update conflict: now {}, was {}, attempted {} for {}", _interestOps.get(), oldInterestOps, newInterestOps, this);
143 continue;
144 }
145 }
146 else
147 {
148 if (LOG.isDebugEnabled())
149 LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this);
150 }
151 break;
152 }
153 }
154
155
156 private void setKeyInterests(int oldInterestOps, int newInterestOps)
157 {
158 if (LOG.isDebugEnabled())
159 LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps);
160 _key.interestOps(newInterestOps);
161 }
162
163 @Override
164 public void close()
165 {
166 if (_open.compareAndSet(true, false))
167 {
168 super.close();
169 _selector.destroyEndPoint(this);
170 }
171 }
172
173 @Override
174 public boolean isOpen()
175 {
176
177
178
179 return _open.get();
180 }
181
182 @Override
183 public void onOpen()
184 {
185 if (_open.compareAndSet(false, true))
186 super.onOpen();
187 }
188
189 @Override
190 public String toString()
191 {
192
193
194
195 try
196 {
197 boolean valid = _key!=null && _key.isValid();
198 int keyInterests = valid ? _key.interestOps() : -1;
199 int keyReadiness = valid ? _key.readyOps() : -1;
200 return String.format("%s{io=%d,kio=%d,kro=%d}",
201 super.toString(),
202 _interestOps.get(),
203 keyInterests,
204 keyReadiness);
205 }
206 catch (CancelledKeyException x)
207 {
208 return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _interestOps.get());
209 }
210 }
211 }