View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.Socket;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.ByteChannel;
26  import java.nio.channels.GatheringByteChannel;
27  import java.nio.channels.SocketChannel;
28  
29  import org.eclipse.jetty.util.BufferUtil;
30  import org.eclipse.jetty.util.log.Log;
31  import org.eclipse.jetty.util.log.Logger;
32  import org.eclipse.jetty.util.thread.Scheduler;
33  
34  /**
35   * Channel End Point.
36   * <p>Holds the channel and socket for an NIO endpoint.
37   */
38  public class ChannelEndPoint extends AbstractEndPoint implements SocketBased
39  {
40      private static final Logger LOG = Log.getLogger(ChannelEndPoint.class);
41  
42      private final ByteChannel _channel;
43      private final Socket _socket;
44      private volatile boolean _ishut;
45      private volatile boolean _oshut;
46  
47      public ChannelEndPoint(Scheduler scheduler,SocketChannel channel)
48      {
49          super(scheduler,
50              (InetSocketAddress)channel.socket().getLocalSocketAddress(),
51              (InetSocketAddress)channel.socket().getRemoteSocketAddress());
52          _channel = channel;
53          _socket=channel.socket();
54      }
55  
56      @Override
57      public boolean isOpen()
58      {
59          return _channel.isOpen();
60      }
61  
62      protected void shutdownInput()
63      {
64          LOG.debug("ishut {}", this);
65          _ishut=true;
66          if (_oshut)
67              close();
68      }
69  
70      @Override
71      public void shutdownOutput()
72      {
73          LOG.debug("oshut {}", this);
74          _oshut = true;
75          if (_channel.isOpen())
76          {
77              try
78              {
79                  if (!_socket.isOutputShutdown())
80                      _socket.shutdownOutput();
81              }
82              catch (IOException e)
83              {
84                  LOG.debug(e);
85              }
86              finally
87              {
88                  if (_ishut)
89                  {
90                      close();
91                  }
92              }
93          }
94      }
95  
96      @Override
97      public boolean isOutputShutdown()
98      {
99          return _oshut || !_channel.isOpen() || _socket.isOutputShutdown();
100     }
101 
102     @Override
103     public boolean isInputShutdown()
104     {
105         return _ishut || !_channel.isOpen() || _socket.isInputShutdown();
106     }
107 
108     @Override
109     public void close()
110     {
111         LOG.debug("close {}", this);
112         try
113         {
114             _channel.close();
115         }
116         catch (IOException e)
117         {
118             LOG.debug(e);
119         }
120         finally
121         {
122             _ishut=true;
123             _oshut=true;
124         }
125     }
126 
127     @Override
128     public int fill(ByteBuffer buffer) throws IOException
129     {
130         if (_ishut)
131             return -1;
132 
133         int pos=BufferUtil.flipToFill(buffer);
134         try
135         {
136             int filled = _channel.read(buffer);
137             LOG.debug("filled {} {}", filled, this);
138 
139             if (filled>0)
140                 notIdle();
141             else if (filled==-1)
142                 shutdownInput();
143 
144             return filled;
145         }
146         catch(IOException e)
147         {
148             LOG.debug(e);
149             shutdownInput();
150             return -1;
151         }
152         finally
153         {
154             BufferUtil.flipToFlush(buffer,pos);
155         }
156     }
157 
158     @Override
159     public boolean flush(ByteBuffer... buffers) throws IOException
160     {
161         int flushed=0;
162         try
163         {
164             if (buffers.length==1)
165                 flushed=_channel.write(buffers[0]);
166             else if (buffers.length>1 && _channel instanceof GatheringByteChannel)
167                 flushed= (int)((GatheringByteChannel)_channel).write(buffers,0,buffers.length);
168             else
169             {
170                 for (ByteBuffer b : buffers)
171                 {
172                     if (b.hasRemaining())
173                     {
174                         int l=_channel.write(b);
175                         if (l>0)
176                             flushed+=l;
177                         if (b.hasRemaining())
178                             break;
179                     }
180                 }
181             }
182             LOG.debug("flushed {} {}", flushed, this);
183         }
184         catch (IOException e)
185         {
186             throw new EofException(e);
187         }
188         
189         boolean all_flushed=true;
190         if (flushed>0)
191         {
192             notIdle();
193 
194             // clear empty buffers to prevent position creeping up the buffer
195             for (ByteBuffer b : buffers)
196             {
197                 if (BufferUtil.isEmpty(b))
198                     BufferUtil.clear(b);
199                 else
200                     all_flushed=false;
201             }
202         }
203         
204         return all_flushed;
205     }
206 
207     public ByteChannel getChannel()
208     {
209         return _channel;
210     }
211 
212     @Override
213     public Object getTransport()
214     {
215         return _channel;
216     }
217 
218     @Override
219     public Socket getSocket()
220     {
221         return _socket;
222     }
223 
224     @Override
225     protected void onIncompleteFlush()
226     {
227         throw new UnsupportedOperationException();
228     }
229 
230     @Override
231     protected boolean needsFill() throws IOException
232     {
233         throw new UnsupportedOperationException();
234     }
235 }