1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.Socket;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.ByteChannel;
26 import java.nio.channels.GatheringByteChannel;
27 import java.nio.channels.SocketChannel;
28
29 import org.eclipse.jetty.util.BufferUtil;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32 import org.eclipse.jetty.util.thread.Scheduler;
33
34
35
36
37
38 public class ChannelEndPoint extends AbstractEndPoint
39 {
40 private static final Logger LOG = Log.getLogger(ChannelEndPoint.class);
41
42 private final ByteChannel _channel;
43 private final Socket _socket;
44 private volatile boolean _ishut;
45 private volatile boolean _oshut;
46
47 public ChannelEndPoint(Scheduler scheduler,SocketChannel channel)
48 {
49 super(scheduler,
50 (InetSocketAddress)channel.socket().getLocalSocketAddress(),
51 (InetSocketAddress)channel.socket().getRemoteSocketAddress());
52 _channel = channel;
53 _socket=channel.socket();
54 }
55
56 @Override
57 public boolean isOpen()
58 {
59 return _channel.isOpen();
60 }
61
62 protected void shutdownInput()
63 {
64 LOG.debug("ishut {}", this);
65 _ishut=true;
66 if (_oshut)
67 close();
68 }
69
70 @Override
71 public void shutdownOutput()
72 {
73 LOG.debug("oshut {}", this);
74 _oshut = true;
75 if (_channel.isOpen())
76 {
77 try
78 {
79 if (!_socket.isOutputShutdown())
80 _socket.shutdownOutput();
81 }
82 catch (IOException e)
83 {
84 LOG.debug(e);
85 }
86 finally
87 {
88 if (_ishut)
89 {
90 close();
91 }
92 }
93 }
94 }
95
96 @Override
97 public boolean isOutputShutdown()
98 {
99 return _oshut || !_channel.isOpen() || _socket.isOutputShutdown();
100 }
101
102 @Override
103 public boolean isInputShutdown()
104 {
105 return _ishut || !_channel.isOpen() || _socket.isInputShutdown();
106 }
107
108 @Override
109 public void close()
110 {
111 LOG.debug("close {}", this);
112 try
113 {
114 _channel.close();
115 }
116 catch (IOException e)
117 {
118 LOG.debug(e);
119 }
120 finally
121 {
122 _ishut=true;
123 _oshut=true;
124 }
125 }
126
127 @Override
128 public int fill(ByteBuffer buffer) throws IOException
129 {
130 if (_ishut)
131 return -1;
132
133 int pos=BufferUtil.flipToFill(buffer);
134 try
135 {
136 int filled = _channel.read(buffer);
137 if (LOG.isDebugEnabled())
138 LOG.debug("filled {} {}", filled, this);
139
140 if (filled>0)
141 notIdle();
142 else if (filled==-1)
143 shutdownInput();
144
145 return filled;
146 }
147 catch(IOException e)
148 {
149 LOG.debug(e);
150 shutdownInput();
151 return -1;
152 }
153 finally
154 {
155 BufferUtil.flipToFlush(buffer,pos);
156 }
157 }
158
159 @Override
160 public boolean flush(ByteBuffer... buffers) throws IOException
161 {
162 int flushed=0;
163 try
164 {
165 if (buffers.length==1)
166 flushed=_channel.write(buffers[0]);
167 else if (buffers.length>1 && _channel instanceof GatheringByteChannel)
168 flushed= (int)((GatheringByteChannel)_channel).write(buffers,0,buffers.length);
169 else
170 {
171 for (ByteBuffer b : buffers)
172 {
173 if (b.hasRemaining())
174 {
175 int l=_channel.write(b);
176 if (l>0)
177 flushed+=l;
178 if (b.hasRemaining())
179 break;
180 }
181 }
182 }
183 LOG.debug("flushed {} {}", flushed, this);
184 }
185 catch (IOException e)
186 {
187 throw new EofException(e);
188 }
189
190 if (flushed>0)
191 notIdle();
192
193 for (ByteBuffer b : buffers)
194 if (!BufferUtil.isEmpty(b))
195 return false;
196
197 return true;
198 }
199
200 public ByteChannel getChannel()
201 {
202 return _channel;
203 }
204
205 @Override
206 public Object getTransport()
207 {
208 return _channel;
209 }
210
211 public Socket getSocket()
212 {
213 return _socket;
214 }
215
216 @Override
217 protected void onIncompleteFlush()
218 {
219 throw new UnsupportedOperationException();
220 }
221
222 @Override
223 protected boolean needsFill() throws IOException
224 {
225 throw new UnsupportedOperationException();
226 }
227 }