1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.Socket;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.ByteChannel;
26 import java.nio.channels.GatheringByteChannel;
27 import java.nio.channels.SocketChannel;
28
29 import org.eclipse.jetty.util.BufferUtil;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32 import org.eclipse.jetty.util.thread.Scheduler;
33
34
35
36
37
38 public class ChannelEndPoint extends AbstractEndPoint
39 {
40 private static final Logger LOG = Log.getLogger(ChannelEndPoint.class);
41
42 private final ByteChannel _channel;
43 private final Socket _socket;
44 private volatile boolean _ishut;
45 private volatile boolean _oshut;
46
47 public ChannelEndPoint(Scheduler scheduler,SocketChannel channel)
48 {
49 super(scheduler,
50 (InetSocketAddress)channel.socket().getLocalSocketAddress(),
51 (InetSocketAddress)channel.socket().getRemoteSocketAddress());
52 _channel = channel;
53 _socket=channel.socket();
54 }
55
56 @Override
57 public boolean isOpen()
58 {
59 return _channel.isOpen();
60 }
61
62 protected void shutdownInput()
63 {
64 if (LOG.isDebugEnabled())
65 LOG.debug("ishut {}", this);
66 _ishut=true;
67 if (_oshut)
68 close();
69 }
70
71 @Override
72 public void shutdownOutput()
73 {
74 if (LOG.isDebugEnabled())
75 LOG.debug("oshut {}", this);
76 _oshut = true;
77 if (_channel.isOpen())
78 {
79 try
80 {
81 if (!_socket.isOutputShutdown())
82 _socket.shutdownOutput();
83 }
84 catch (IOException e)
85 {
86 LOG.debug(e);
87 }
88 finally
89 {
90 if (_ishut)
91 {
92 close();
93 }
94 }
95 }
96 }
97
98 @Override
99 public boolean isOutputShutdown()
100 {
101 return _oshut || !_channel.isOpen() || _socket.isOutputShutdown();
102 }
103
104 @Override
105 public boolean isInputShutdown()
106 {
107 return _ishut || !_channel.isOpen() || _socket.isInputShutdown();
108 }
109
110 @Override
111 public void close()
112 {
113 super.close();
114 if (LOG.isDebugEnabled())
115 LOG.debug("close {}", this);
116 try
117 {
118 _channel.close();
119 }
120 catch (IOException e)
121 {
122 LOG.debug(e);
123 }
124 finally
125 {
126 _ishut=true;
127 _oshut=true;
128 }
129 }
130
131 @Override
132 public int fill(ByteBuffer buffer) throws IOException
133 {
134 if (_ishut)
135 return -1;
136
137 int pos=BufferUtil.flipToFill(buffer);
138 try
139 {
140 int filled = _channel.read(buffer);
141 if (LOG.isDebugEnabled())
142 LOG.debug("filled {} {}", filled, this);
143
144 if (filled>0)
145 notIdle();
146 else if (filled==-1)
147 shutdownInput();
148
149 return filled;
150 }
151 catch(IOException e)
152 {
153 LOG.debug(e);
154 shutdownInput();
155 return -1;
156 }
157 finally
158 {
159 BufferUtil.flipToFlush(buffer,pos);
160 }
161 }
162
163 @Override
164 public boolean flush(ByteBuffer... buffers) throws IOException
165 {
166 int flushed=0;
167 try
168 {
169 if (buffers.length==1)
170 flushed=_channel.write(buffers[0]);
171 else if (buffers.length>1 && _channel instanceof GatheringByteChannel)
172 flushed= (int)((GatheringByteChannel)_channel).write(buffers,0,buffers.length);
173 else
174 {
175 for (ByteBuffer b : buffers)
176 {
177 if (b.hasRemaining())
178 {
179 int l=_channel.write(b);
180 if (l>0)
181 flushed+=l;
182 if (b.hasRemaining())
183 break;
184 }
185 }
186 }
187 if (LOG.isDebugEnabled())
188 LOG.debug("flushed {} {}", flushed, this);
189 }
190 catch (IOException e)
191 {
192 throw new EofException(e);
193 }
194
195 if (flushed>0)
196 notIdle();
197
198 for (ByteBuffer b : buffers)
199 if (!BufferUtil.isEmpty(b))
200 return false;
201
202 return true;
203 }
204
205 public ByteChannel getChannel()
206 {
207 return _channel;
208 }
209
210 @Override
211 public Object getTransport()
212 {
213 return _channel;
214 }
215
216 public Socket getSocket()
217 {
218 return _socket;
219 }
220
221 @Override
222 protected void onIncompleteFlush()
223 {
224 throw new UnsupportedOperationException();
225 }
226
227 @Override
228 protected boolean needsFill() throws IOException
229 {
230 throw new UnsupportedOperationException();
231 }
232 }