View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.Socket;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.ByteChannel;
26  import java.nio.channels.GatheringByteChannel;
27  import java.nio.channels.SocketChannel;
28  
29  import org.eclipse.jetty.util.BufferUtil;
30  import org.eclipse.jetty.util.log.Log;
31  import org.eclipse.jetty.util.log.Logger;
32  import org.eclipse.jetty.util.thread.Scheduler;
33  
34  /**
35   * Channel End Point.
36   * <p>Holds the channel and socket for an NIO endpoint.
37   */
38  public class ChannelEndPoint extends AbstractEndPoint
39  {
40      private static final Logger LOG = Log.getLogger(ChannelEndPoint.class);
41  
42      private final ByteChannel _channel;
43      private final Socket _socket;
44      private volatile boolean _ishut;
45      private volatile boolean _oshut;
46  
47      public ChannelEndPoint(Scheduler scheduler,SocketChannel channel)
48      {
49          super(scheduler,
50              (InetSocketAddress)channel.socket().getLocalSocketAddress(),
51              (InetSocketAddress)channel.socket().getRemoteSocketAddress());
52          _channel = channel;
53          _socket=channel.socket();
54      }
55  
56      @Override
57      public boolean isOpen()
58      {
59          return _channel.isOpen();
60      }
61  
62      protected void shutdownInput()
63      {
64          if (LOG.isDebugEnabled())
65              LOG.debug("ishut {}", this);
66          _ishut=true;
67          if (_oshut)
68              close();
69      }
70  
71      @Override
72      public void shutdownOutput()
73      {
74          if (LOG.isDebugEnabled())
75              LOG.debug("oshut {}", this);
76          _oshut = true;
77          if (_channel.isOpen())
78          {
79              try
80              {
81                  if (!_socket.isOutputShutdown())
82                      _socket.shutdownOutput();
83              }
84              catch (IOException e)
85              {
86                  LOG.debug(e);
87              }
88              finally
89              {
90                  if (_ishut)
91                  {
92                      close();
93                  }
94              }
95          }
96      }
97  
98      @Override
99      public boolean isOutputShutdown()
100     {
101         return _oshut || !_channel.isOpen() || _socket.isOutputShutdown();
102     }
103 
104     @Override
105     public boolean isInputShutdown()
106     {
107         return _ishut || !_channel.isOpen() || _socket.isInputShutdown();
108     }
109 
110     @Override
111     public void close()
112     {
113         super.close();
114         if (LOG.isDebugEnabled())
115             LOG.debug("close {}", this);
116         try
117         {
118             _channel.close();
119         }
120         catch (IOException e)
121         {
122             LOG.debug(e);
123         }
124         finally
125         {
126             _ishut=true;
127             _oshut=true;
128         }
129     }
130 
131     @Override
132     public int fill(ByteBuffer buffer) throws IOException
133     {
134         if (_ishut)
135             return -1;
136 
137         int pos=BufferUtil.flipToFill(buffer);
138         try
139         {
140             int filled = _channel.read(buffer);
141             if (LOG.isDebugEnabled()) // Avoid boxing of variable 'filled'
142                 LOG.debug("filled {} {}", filled, this);
143 
144             if (filled>0)
145                 notIdle();
146             else if (filled==-1)
147                 shutdownInput();
148 
149             return filled;
150         }
151         catch(IOException e)
152         {
153             LOG.debug(e);
154             shutdownInput();
155             return -1;
156         }
157         finally
158         {
159             BufferUtil.flipToFlush(buffer,pos);
160         }
161     }
162 
163     @Override
164     public boolean flush(ByteBuffer... buffers) throws IOException
165     {
166         int flushed=0;
167         try
168         {
169             if (buffers.length==1)
170                 flushed=_channel.write(buffers[0]);
171             else if (buffers.length>1 && _channel instanceof GatheringByteChannel)
172                 flushed= (int)((GatheringByteChannel)_channel).write(buffers,0,buffers.length);
173             else
174             {
175                 for (ByteBuffer b : buffers)
176                 {
177                     if (b.hasRemaining())
178                     {
179                         int l=_channel.write(b);
180                         if (l>0)
181                             flushed+=l;
182                         if (b.hasRemaining())
183                             break;
184                     }
185                 }
186             }
187             if (LOG.isDebugEnabled())
188                 LOG.debug("flushed {} {}", flushed, this);
189         }
190         catch (IOException e)
191         {
192             throw new EofException(e);
193         }
194 
195         if (flushed>0)
196             notIdle();
197 
198         for (ByteBuffer b : buffers)
199             if (!BufferUtil.isEmpty(b))
200                 return false;
201 
202         return true;
203     }
204 
205     public ByteChannel getChannel()
206     {
207         return _channel;
208     }
209 
210     @Override
211     public Object getTransport()
212     {
213         return _channel;
214     }
215 
216     public Socket getSocket()
217     {
218         return _socket;
219     }
220 
221     @Override
222     protected void onIncompleteFlush()
223     {
224         throw new UnsupportedOperationException();
225     }
226 
227     @Override
228     protected boolean needsFill() throws IOException
229     {
230         throw new UnsupportedOperationException();
231     }
232 }