View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.Socket;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.ByteChannel;
26  import java.nio.channels.GatheringByteChannel;
27  import java.nio.channels.SocketChannel;
28  
29  import org.eclipse.jetty.util.BufferUtil;
30  import org.eclipse.jetty.util.log.Log;
31  import org.eclipse.jetty.util.log.Logger;
32  import org.eclipse.jetty.util.thread.Scheduler;
33  
34  /**
35   * Channel End Point.
36   * <p>Holds the channel and socket for an NIO endpoint.
37   */
38  public class ChannelEndPoint extends AbstractEndPoint
39  {
40      private static final Logger LOG = Log.getLogger(ChannelEndPoint.class);
41  
42      private final ByteChannel _channel;
43      private final Socket _socket;
44      private volatile boolean _ishut;
45      private volatile boolean _oshut;
46  
47      public ChannelEndPoint(Scheduler scheduler,SocketChannel channel)
48      {
49          super(scheduler,
50              (InetSocketAddress)channel.socket().getLocalSocketAddress(),
51              (InetSocketAddress)channel.socket().getRemoteSocketAddress());
52          _channel = channel;
53          _socket=channel.socket();
54      }
55  
56      @Override
57      public boolean isOpen()
58      {
59          return _channel.isOpen();
60      }
61  
62      protected void shutdownInput()
63      {
64          LOG.debug("ishut {}", this);
65          _ishut=true;
66          if (_oshut)
67              close();
68      }
69  
70      @Override
71      public void shutdownOutput()
72      {
73          LOG.debug("oshut {}", this);
74          _oshut = true;
75          if (_channel.isOpen())
76          {
77              try
78              {
79                  if (!_socket.isOutputShutdown())
80                      _socket.shutdownOutput();
81              }
82              catch (IOException e)
83              {
84                  LOG.debug(e);
85              }
86              finally
87              {
88                  if (_ishut)
89                  {
90                      close();
91                  }
92              }
93          }
94      }
95  
96      @Override
97      public boolean isOutputShutdown()
98      {
99          return _oshut || !_channel.isOpen() || _socket.isOutputShutdown();
100     }
101 
102     @Override
103     public boolean isInputShutdown()
104     {
105         return _ishut || !_channel.isOpen() || _socket.isInputShutdown();
106     }
107 
108     @Override
109     public void close()
110     {
111         LOG.debug("close {}", this);
112         try
113         {
114             _channel.close();
115         }
116         catch (IOException e)
117         {
118             LOG.debug(e);
119         }
120         finally
121         {
122             _ishut=true;
123             _oshut=true;
124         }
125     }
126 
127     @Override
128     public int fill(ByteBuffer buffer) throws IOException
129     {
130         if (_ishut)
131             return -1;
132 
133         int pos=BufferUtil.flipToFill(buffer);
134         try
135         {
136             int filled = _channel.read(buffer);
137             if (LOG.isDebugEnabled()) // Avoid boxing of variable 'filled'
138                 LOG.debug("filled {} {}", filled, this);
139 
140             if (filled>0)
141                 notIdle();
142             else if (filled==-1)
143                 shutdownInput();
144 
145             return filled;
146         }
147         catch(IOException e)
148         {
149             LOG.debug(e);
150             shutdownInput();
151             return -1;
152         }
153         finally
154         {
155             BufferUtil.flipToFlush(buffer,pos);
156         }
157     }
158 
159     @Override
160     public boolean flush(ByteBuffer... buffers) throws IOException
161     {
162         int flushed=0;
163         try
164         {
165             if (buffers.length==1)
166                 flushed=_channel.write(buffers[0]);
167             else if (buffers.length>1 && _channel instanceof GatheringByteChannel)
168                 flushed= (int)((GatheringByteChannel)_channel).write(buffers,0,buffers.length);
169             else
170             {
171                 for (ByteBuffer b : buffers)
172                 {
173                     if (b.hasRemaining())
174                     {
175                         int l=_channel.write(b);
176                         if (l>0)
177                             flushed+=l;
178                         if (b.hasRemaining())
179                             break;
180                     }
181                 }
182             }
183             LOG.debug("flushed {} {}", flushed, this);
184         }
185         catch (IOException e)
186         {
187             throw new EofException(e);
188         }
189 
190         if (flushed>0)
191             notIdle();
192 
193         for (ByteBuffer b : buffers)
194             if (!BufferUtil.isEmpty(b))
195                 return false;
196 
197         return true;
198     }
199 
200     public ByteChannel getChannel()
201     {
202         return _channel;
203     }
204 
205     @Override
206     public Object getTransport()
207     {
208         return _channel;
209     }
210 
211     public Socket getSocket()
212     {
213         return _socket;
214     }
215 
216     @Override
217     protected void onIncompleteFlush()
218     {
219         throw new UnsupportedOperationException();
220     }
221 
222     @Override
223     protected boolean needsFill() throws IOException
224     {
225         throw new UnsupportedOperationException();
226     }
227 }