1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.Socket;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.ByteChannel;
26 import java.nio.channels.SocketChannel;
27
28 import org.eclipse.jetty.util.BufferUtil;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31 import org.eclipse.jetty.util.thread.Scheduler;
32
33
34
35
36
37 public class ChannelEndPoint extends AbstractEndPoint
38 {
39 private static final Logger LOG = Log.getLogger(ChannelEndPoint.class);
40
41 private final SocketChannel _channel;
42 private final Socket _socket;
43 private volatile boolean _ishut;
44 private volatile boolean _oshut;
45
46 public ChannelEndPoint(Scheduler scheduler,SocketChannel channel)
47 {
48 super(scheduler,
49 (InetSocketAddress)channel.socket().getLocalSocketAddress(),
50 (InetSocketAddress)channel.socket().getRemoteSocketAddress());
51 _channel=channel;
52 _socket=channel.socket();
53 }
54
55 @Override
56 public boolean isOptimizedForDirectBuffers()
57 {
58 return true;
59 }
60
61 @Override
62 public boolean isOpen()
63 {
64 return _channel.isOpen();
65 }
66
67 protected void shutdownInput()
68 {
69 if (LOG.isDebugEnabled())
70 LOG.debug("ishut {}", this);
71 _ishut=true;
72 if (_oshut)
73 close();
74 }
75
76 @Override
77 public void shutdownOutput()
78 {
79 if (LOG.isDebugEnabled())
80 LOG.debug("oshut {}", this);
81 _oshut = true;
82 if (_channel.isOpen())
83 {
84 try
85 {
86 if (!_socket.isOutputShutdown())
87 _socket.shutdownOutput();
88 }
89 catch (IOException e)
90 {
91 LOG.debug(e);
92 }
93 finally
94 {
95 if (_ishut)
96 {
97 close();
98 }
99 }
100 }
101 }
102
103 @Override
104 public boolean isOutputShutdown()
105 {
106 return _oshut || !_channel.isOpen() || _socket.isOutputShutdown();
107 }
108
109 @Override
110 public boolean isInputShutdown()
111 {
112 return _ishut || !_channel.isOpen() || _socket.isInputShutdown();
113 }
114
115 @Override
116 public void close()
117 {
118 super.close();
119 if (LOG.isDebugEnabled())
120 LOG.debug("close {}", this);
121 try
122 {
123 _channel.close();
124 }
125 catch (IOException e)
126 {
127 LOG.debug(e);
128 }
129 finally
130 {
131 _ishut=true;
132 _oshut=true;
133 }
134 }
135
136 @Override
137 public int fill(ByteBuffer buffer) throws IOException
138 {
139 if (_ishut)
140 return -1;
141
142 int pos=BufferUtil.flipToFill(buffer);
143 try
144 {
145 int filled = _channel.read(buffer);
146 if (LOG.isDebugEnabled())
147 LOG.debug("filled {} {}", filled, this);
148
149 if (filled>0)
150 notIdle();
151 else if (filled==-1)
152 shutdownInput();
153
154 return filled;
155 }
156 catch(IOException e)
157 {
158 LOG.debug(e);
159 shutdownInput();
160 return -1;
161 }
162 finally
163 {
164 BufferUtil.flipToFlush(buffer,pos);
165 }
166 }
167
168 @Override
169 public boolean flush(ByteBuffer... buffers) throws IOException
170 {
171 long flushed=0;
172 try
173 {
174 if (buffers.length==1)
175 flushed=_channel.write(buffers[0]);
176 else if (buffers.length>1)
177 flushed=_channel.write(buffers,0,buffers.length);
178 else
179 {
180 for (ByteBuffer b : buffers)
181 {
182 if (b.hasRemaining())
183 {
184 int l=_channel.write(b);
185 if (l>0)
186 flushed+=l;
187 if (b.hasRemaining())
188 break;
189 }
190 }
191 }
192 if (LOG.isDebugEnabled())
193 LOG.debug("flushed {} {}", flushed, this);
194 }
195 catch (IOException e)
196 {
197 throw new EofException(e);
198 }
199
200 if (flushed>0)
201 notIdle();
202
203 for (ByteBuffer b : buffers)
204 if (!BufferUtil.isEmpty(b))
205 return false;
206
207 return true;
208 }
209
210 public ByteChannel getChannel()
211 {
212 return _channel;
213 }
214
215 @Override
216 public Object getTransport()
217 {
218 return _channel;
219 }
220
221 public Socket getSocket()
222 {
223 return _socket;
224 }
225
226 @Override
227 protected void onIncompleteFlush()
228 {
229 throw new UnsupportedOperationException();
230 }
231
232 @Override
233 protected void needsFillInterest() throws IOException
234 {
235 throw new UnsupportedOperationException();
236 }
237 }