View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.net.InetSocketAddress;
18  import java.net.Socket;
19  import java.net.SocketException;
20  import java.nio.ByteBuffer;
21  import java.nio.channels.ByteChannel;
22  import java.nio.channels.GatheringByteChannel;
23  import java.nio.channels.SelectableChannel;
24  import java.nio.channels.SocketChannel;
25  
26  import org.eclipse.jetty.io.Buffer;
27  import org.eclipse.jetty.io.EndPoint;
28  import org.eclipse.jetty.util.StringUtil;
29  import org.eclipse.jetty.util.log.Log;
30  import org.eclipse.jetty.util.log.Logger;
31  
32  /**
33   * Channel End Point.
34   * <p>Holds the channel and socket for an NIO endpoint.
35   *
36   */
37  public class ChannelEndPoint implements EndPoint
38  {
39      private static final Logger LOG = Log.getLogger(ChannelEndPoint.class);
40  
41      protected final ByteChannel _channel;
42      protected final ByteBuffer[] _gather2=new ByteBuffer[2];
43      protected final Socket _socket;
44      protected final InetSocketAddress _local;
45      protected final InetSocketAddress _remote;
46      protected int _maxIdleTime;
47  
48      public ChannelEndPoint(ByteChannel channel) throws IOException
49      {
50          super();
51          this._channel = channel;
52          _socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null;
53          if (_socket!=null)
54          {
55              _local=(InetSocketAddress)_socket.getLocalSocketAddress();
56              _remote=(InetSocketAddress)_socket.getRemoteSocketAddress();
57              _maxIdleTime=_socket.getSoTimeout();
58          }
59          else
60          {
61              _local=_remote=null;
62          }
63      }
64  
65      protected ChannelEndPoint(ByteChannel channel, int maxIdleTime) throws IOException
66      {
67          this._channel = channel;
68          _maxIdleTime=maxIdleTime;
69          _socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null;
70          if (_socket!=null)
71          {
72              _local=(InetSocketAddress)_socket.getLocalSocketAddress();
73              _remote=(InetSocketAddress)_socket.getRemoteSocketAddress();
74              _socket.setSoTimeout(_maxIdleTime);
75          }
76          else
77          {
78              _local=_remote=null;
79          }
80      }
81  
82      public boolean isBlocking()
83      {
84          return  !(_channel instanceof SelectableChannel) || ((SelectableChannel)_channel).isBlocking();
85      }
86  
87      public boolean blockReadable(long millisecs) throws IOException
88      {
89          return true;
90      }
91  
92      public boolean blockWritable(long millisecs) throws IOException
93      {
94          return true;
95      }
96  
97      /*
98       * @see org.eclipse.io.EndPoint#isOpen()
99       */
100     public boolean isOpen()
101     {
102         return _channel.isOpen();
103     }
104 
105     /* (non-Javadoc)
106      * @see org.eclipse.io.EndPoint#close()
107      */
108     public void shutdownInput() throws IOException
109     {
110         if (_channel.isOpen() && _channel instanceof SocketChannel)
111         {
112             Socket socket= ((SocketChannel)_channel).socket();
113             if (!socket.isClosed())
114             {
115                 if(socket.isOutputShutdown())
116                     socket.close();
117                 else if (!socket.isInputShutdown())
118                     socket.shutdownInput();
119             }
120         }
121     }
122 
123     /* (non-Javadoc)
124      * @see org.eclipse.io.EndPoint#close()
125      */
126     public void shutdownOutput() throws IOException
127     {
128         if (_channel.isOpen() && _channel instanceof SocketChannel)
129         {
130             Socket socket= ((SocketChannel)_channel).socket();
131             if (!socket.isClosed())
132             {
133                 try
134                 {
135                     if (socket.isInputShutdown())
136                         socket.close();
137                     else if (!socket.isOutputShutdown())
138                         socket.shutdownOutput();
139                 }
140                 catch(SocketException e)
141                 {
142                     LOG.ignore(e);
143                 }
144             }
145         }
146     }
147 
148     public boolean isOutputShutdown()
149     {
150         return _channel.isOpen() && _socket!=null && _socket.isOutputShutdown();
151     }
152 
153     public boolean isInputShutdown()
154     {
155         return _channel.isOpen() && _socket!=null && _socket.isInputShutdown();
156     }
157 
158     /* (non-Javadoc)
159      * @see org.eclipse.io.EndPoint#close()
160      */
161     public void close() throws IOException
162     {
163         _channel.close();
164     }
165 
166     /* (non-Javadoc)
167      * @see org.eclipse.io.EndPoint#fill(org.eclipse.io.Buffer)
168      */
169     public int fill(Buffer buffer) throws IOException
170     {
171         Buffer buf = buffer.buffer();
172         int len=0;
173         if (buf instanceof NIOBuffer)
174         {
175             final NIOBuffer nbuf = (NIOBuffer)buf;
176             final ByteBuffer bbuf=nbuf.getByteBuffer();
177             
178             //noinspection SynchronizationOnLocalVariableOrMethodParameter
179             try
180             {
181                 synchronized(bbuf)
182                 {
183                     try
184                     {
185                         bbuf.position(buffer.putIndex());
186                         len=_channel.read(bbuf);
187                     }
188                     finally
189                     {
190                         buffer.setPutIndex(bbuf.position());
191                         bbuf.position(0);
192                     }
193                 }
194 
195                 if (len<0 && isOpen())
196                 {
197                     if (!isInputShutdown())
198                         shutdownInput();
199                     else if (isOutputShutdown())
200                         _channel.close();
201                 }
202             }
203             catch (IOException x)
204             {
205                 LOG.debug(x);
206                 try
207                 {
208                     close();
209                 }
210                 catch (IOException xx)
211                 {
212                     LOG.ignore(xx);
213                 }
214                 
215                 if (len>0)
216                     throw x;
217                 len=-1;
218             }
219         }
220         else
221         {
222             throw new IOException("Not Implemented");
223         }
224         
225         return len;
226     }
227 
228     /* (non-Javadoc)
229      * @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer)
230      */
231     public int flush(Buffer buffer) throws IOException
232     {
233         Buffer buf = buffer.buffer();
234         int len=0;
235         if (buf instanceof NIOBuffer)
236         {
237             final NIOBuffer nbuf = (NIOBuffer)buf;
238             final ByteBuffer bbuf=nbuf.getByteBuffer();
239 
240             //noinspection SynchronizationOnLocalVariableOrMethodParameter
241             synchronized(bbuf)
242             {
243                 try
244                 {
245                     bbuf.position(buffer.getIndex());
246                     bbuf.limit(buffer.putIndex());
247                     len=_channel.write(bbuf);
248                 }
249                 finally
250                 {
251                     if (len>0)
252                         buffer.skip(len);
253                     bbuf.position(0);
254                     bbuf.limit(bbuf.capacity());
255                 }
256             }
257         }
258         else if (buf instanceof RandomAccessFileBuffer)
259         {
260             len = ((RandomAccessFileBuffer)buf).writeTo(_channel,buffer.getIndex(),buffer.length());
261             if (len>0)
262                 buffer.skip(len);
263         }
264         else if (buffer.array()!=null)
265         {
266             ByteBuffer b = ByteBuffer.wrap(buffer.array(), buffer.getIndex(), buffer.length());
267             len=_channel.write(b);
268             if (len>0)
269                 buffer.skip(len);
270         }
271         else
272         {
273             throw new IOException("Not Implemented");
274         }
275         return len;
276     }
277 
278     /* (non-Javadoc)
279      * @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer)
280      */
281     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
282     {
283         int length=0;
284 
285         Buffer buf0 = header==null?null:header.buffer();
286         Buffer buf1 = buffer==null?null:buffer.buffer();
287 
288         if (_channel instanceof GatheringByteChannel &&
289             header!=null && header.length()!=0 && buf0 instanceof NIOBuffer &&
290             buffer!=null && buffer.length()!=0 && buf1 instanceof NIOBuffer)
291         {
292             length = gatheringFlush(header,((NIOBuffer)buf0).getByteBuffer(),buffer,((NIOBuffer)buf1).getByteBuffer());
293         }
294         else
295         {
296             if (header!=null)
297             {
298                 if (buffer!=null && buffer.length()>0 && header.space()>buffer.length())
299                 {
300                     header.put(buffer);
301                     buffer.clear();
302                 }
303                 if (trailer!=null && trailer.length()>0 && header.space()>trailer.length())
304                 {
305                     header.put(trailer);
306                     trailer.clear();
307                 }
308             }
309 
310             // flush header
311             if (header!=null && header.length()>0)
312                 length=flush(header);
313 
314             // flush buffer
315             if ((header==null || header.length()==0) &&
316                  buffer!=null && buffer.length()>0)
317                 length+=flush(buffer);
318 
319             // flush trailer
320             if ((header==null || header.length()==0) &&
321                 (buffer==null || buffer.length()==0) &&
322                  trailer!=null && trailer.length()>0)
323                 length+=flush(trailer);
324         }
325 
326         return length;
327     }
328 
329     protected int gatheringFlush(Buffer header, ByteBuffer bbuf0, Buffer buffer, ByteBuffer bbuf1) throws IOException
330     {
331         int length;
332 
333         synchronized(this)
334         {
335             // We must sync because buffers may be shared (eg nbuf1 is likely to be cached content).
336             //noinspection SynchronizationOnLocalVariableOrMethodParameter
337             synchronized(bbuf0)
338             {
339                 //noinspection SynchronizationOnLocalVariableOrMethodParameter
340                 synchronized(bbuf1)
341                 {
342                     try
343                     {
344                         // Adjust position indexs of buf0 and buf1
345                         bbuf0.position(header.getIndex());
346                         bbuf0.limit(header.putIndex());
347                         bbuf1.position(buffer.getIndex());
348                         bbuf1.limit(buffer.putIndex());
349 
350                         _gather2[0]=bbuf0;
351                         _gather2[1]=bbuf1;
352 
353                         // do the gathering write.
354                         length=(int)((GatheringByteChannel)_channel).write(_gather2);
355 
356                         int hl=header.length();
357                         if (length>hl)
358                         {
359                             header.clear();
360                             buffer.skip(length-hl);
361                         }
362                         else if (length>0)
363                         {
364                             header.skip(length);
365                         }
366                     }
367                     finally
368                     {
369                         // adjust buffer 0 and 1
370                         if (!header.isImmutable())
371                             header.setGetIndex(bbuf0.position());
372                         if (!buffer.isImmutable())
373                             buffer.setGetIndex(bbuf1.position());
374 
375                         bbuf0.position(0);
376                         bbuf1.position(0);
377                         bbuf0.limit(bbuf0.capacity());
378                         bbuf1.limit(bbuf1.capacity());
379                     }
380                 }
381             }
382         }
383         return length;
384     }
385 
386     /* ------------------------------------------------------------ */
387     /**
388      * @return Returns the channel.
389      */
390     public ByteChannel getChannel()
391     {
392         return _channel;
393     }
394 
395 
396     /* ------------------------------------------------------------ */
397     /*
398      * @see org.eclipse.io.EndPoint#getLocalAddr()
399      */
400     public String getLocalAddr()
401     {
402         if (_socket==null)
403             return null;
404        if (_local==null || _local.getAddress()==null || _local.getAddress().isAnyLocalAddress())
405            return StringUtil.ALL_INTERFACES;
406         return _local.getAddress().getHostAddress();
407     }
408 
409     /* ------------------------------------------------------------ */
410     /*
411      * @see org.eclipse.io.EndPoint#getLocalHost()
412      */
413     public String getLocalHost()
414     {
415         if (_socket==null)
416             return null;
417        if (_local==null || _local.getAddress()==null || _local.getAddress().isAnyLocalAddress())
418            return StringUtil.ALL_INTERFACES;
419         return _local.getAddress().getCanonicalHostName();
420     }
421 
422     /* ------------------------------------------------------------ */
423     /*
424      * @see org.eclipse.io.EndPoint#getLocalPort()
425      */
426     public int getLocalPort()
427     {
428         if (_socket==null)
429             return 0;
430         if (_local==null)
431             return -1;
432         return _local.getPort();
433     }
434 
435     /* ------------------------------------------------------------ */
436     /*
437      * @see org.eclipse.io.EndPoint#getRemoteAddr()
438      */
439     public String getRemoteAddr()
440     {
441         if (_socket==null)
442             return null;
443         if (_remote==null)
444             return null;
445         return _remote.getAddress().getHostAddress();
446     }
447 
448     /* ------------------------------------------------------------ */
449     /*
450      * @see org.eclipse.io.EndPoint#getRemoteHost()
451      */
452     public String getRemoteHost()
453     {
454         if (_socket==null)
455             return null;
456         if (_remote==null)
457             return null;
458         return _remote.getAddress().getCanonicalHostName();
459     }
460 
461     /* ------------------------------------------------------------ */
462     /*
463      * @see org.eclipse.io.EndPoint#getRemotePort()
464      */
465     public int getRemotePort()
466     {
467         if (_socket==null)
468             return 0;
469         return _remote==null?-1:_remote.getPort();
470     }
471 
472     /* ------------------------------------------------------------ */
473     /*
474      * @see org.eclipse.io.EndPoint#getConnection()
475      */
476     public Object getTransport()
477     {
478         return _channel;
479     }
480 
481     /* ------------------------------------------------------------ */
482     public void flush()
483         throws IOException
484     {
485     }
486 
487     /* ------------------------------------------------------------ */
488     public boolean isBufferingInput()
489     {
490         return false;
491     }
492 
493     /* ------------------------------------------------------------ */
494     public boolean isBufferingOutput()
495     {
496         return false;
497     }
498 
499     /* ------------------------------------------------------------ */
500     public boolean isBufferred()
501     {
502         return false;
503     }
504 
505     /* ------------------------------------------------------------ */
506     public int getMaxIdleTime()
507     {
508         return _maxIdleTime;
509     }
510 
511     /* ------------------------------------------------------------ */
512     /**
513      * @see org.eclipse.jetty.io.bio.StreamEndPoint#setMaxIdleTime(int)
514      */
515     public void setMaxIdleTime(int timeMs) throws IOException
516     {
517         if (_socket!=null && timeMs!=_maxIdleTime)
518             _socket.setSoTimeout(timeMs>0?timeMs:0);
519         _maxIdleTime=timeMs;
520     }
521 }