View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.Socket;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.ByteChannel;
26  import java.nio.channels.SocketChannel;
27  
28  import org.eclipse.jetty.util.BufferUtil;
29  import org.eclipse.jetty.util.log.Log;
30  import org.eclipse.jetty.util.log.Logger;
31  import org.eclipse.jetty.util.thread.Scheduler;
32  
33  /**
34   * Channel End Point.
35   * <p>Holds the channel and socket for an NIO endpoint.
36   */
37  public class ChannelEndPoint extends AbstractEndPoint
38  {
39      private static final Logger LOG = Log.getLogger(ChannelEndPoint.class);
40  
41      private final SocketChannel _channel;
42      private final Socket _socket;
43      private volatile boolean _ishut;
44      private volatile boolean _oshut;
45  
46      public ChannelEndPoint(Scheduler scheduler,SocketChannel channel)
47      {
48          super(scheduler,
49              (InetSocketAddress)channel.socket().getLocalSocketAddress(),
50              (InetSocketAddress)channel.socket().getRemoteSocketAddress());
51          _channel=channel;
52          _socket=channel.socket();
53      }
54  
55      @Override
56      public boolean isOptimizedForDirectBuffers()
57      {
58          return true;
59      }
60  
61      @Override
62      public boolean isOpen()
63      {
64          return _channel.isOpen();
65      }
66  
67      protected void shutdownInput()
68      {
69          if (LOG.isDebugEnabled())
70              LOG.debug("ishut {}", this);
71          _ishut=true;
72          if (_oshut)
73              close();
74      }
75  
76      @Override
77      public void shutdownOutput()
78      {
79          if (LOG.isDebugEnabled())
80              LOG.debug("oshut {}", this);
81          _oshut = true;
82          if (_channel.isOpen())
83          {
84              try
85              {
86                  if (!_socket.isOutputShutdown())
87                      _socket.shutdownOutput();
88              }
89              catch (IOException e)
90              {
91                  LOG.debug(e);
92              }
93              finally
94              {
95                  if (_ishut)
96                  {
97                      close();
98                  }
99              }
100         }
101     }
102 
103     @Override
104     public boolean isOutputShutdown()
105     {
106         return _oshut || !_channel.isOpen() || _socket.isOutputShutdown();
107     }
108 
109     @Override
110     public boolean isInputShutdown()
111     {
112         return _ishut || !_channel.isOpen() || _socket.isInputShutdown();
113     }
114 
115     @Override
116     public void close()
117     {
118         super.close();
119         if (LOG.isDebugEnabled())
120             LOG.debug("close {}", this);
121         try
122         {
123             _channel.close();
124         }
125         catch (IOException e)
126         {
127             LOG.debug(e);
128         }
129         finally
130         {
131             _ishut=true;
132             _oshut=true;
133         }
134     }
135 
136     @Override
137     public int fill(ByteBuffer buffer) throws IOException
138     {
139         if (_ishut)
140             return -1;
141 
142         int pos=BufferUtil.flipToFill(buffer);
143         try
144         {
145             int filled = _channel.read(buffer);
146             if (LOG.isDebugEnabled()) // Avoid boxing of variable 'filled'
147                 LOG.debug("filled {} {}", filled, this);
148 
149             if (filled>0)
150                 notIdle();
151             else if (filled==-1)
152                 shutdownInput();
153 
154             return filled;
155         }
156         catch(IOException e)
157         {
158             LOG.debug(e);
159             shutdownInput();
160             return -1;
161         }
162         finally
163         {
164             BufferUtil.flipToFlush(buffer,pos);
165         }
166     }
167 
168     @Override
169     public boolean flush(ByteBuffer... buffers) throws IOException
170     {
171         long flushed=0;
172         try
173         {
174             if (buffers.length==1)
175                 flushed=_channel.write(buffers[0]);
176             else if (buffers.length>1)
177                 flushed=_channel.write(buffers,0,buffers.length);
178             else
179             {
180                 for (ByteBuffer b : buffers)
181                 {
182                     if (b.hasRemaining())
183                     {
184                         int l=_channel.write(b);
185                         if (l>0)
186                             flushed+=l;
187                         if (b.hasRemaining())
188                             break;
189                     }
190                 }
191             }
192             if (LOG.isDebugEnabled())
193                 LOG.debug("flushed {} {}", flushed, this);
194         }
195         catch (IOException e)
196         {
197             throw new EofException(e);
198         }
199 
200         if (flushed>0)
201             notIdle();
202 
203         for (ByteBuffer b : buffers)
204             if (!BufferUtil.isEmpty(b))
205                 return false;
206 
207         return true;
208     }
209 
210     public ByteChannel getChannel()
211     {
212         return _channel;
213     }
214 
215     @Override
216     public Object getTransport()
217     {
218         return _channel;
219     }
220 
221     public Socket getSocket()
222     {
223         return _socket;
224     }
225 
226     @Override
227     protected void onIncompleteFlush()
228     {
229         throw new UnsupportedOperationException();
230     }
231 
232     @Override
233     protected void needsFillInterest() throws IOException
234     {
235         throw new UnsupportedOperationException();
236     }
237 }