View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.net.InetSocketAddress;
18  import java.net.Socket;
19  import java.nio.ByteBuffer;
20  import java.nio.channels.ByteChannel;
21  import java.nio.channels.GatheringByteChannel;
22  import java.nio.channels.SelectableChannel;
23  import java.nio.channels.SocketChannel;
24  
25  import org.eclipse.jetty.io.Buffer;
26  import org.eclipse.jetty.io.EndPoint;
27  import org.eclipse.jetty.util.StringUtil;
28  import org.eclipse.jetty.util.log.Log;
29  
30  /**
31   * Channel End Point.
32   * <p>Holds the channel and socket for an NIO endpoint.
33   *
34   */
35  public class ChannelEndPoint implements EndPoint
36  {
37      protected final ByteChannel _channel;
38      protected final ByteBuffer[] _gather2=new ByteBuffer[2];
39      protected final Socket _socket;
40      protected final InetSocketAddress _local;
41      protected final InetSocketAddress _remote;
42      protected int _maxIdleTime;
43  
44      public ChannelEndPoint(ByteChannel channel) throws IOException
45      {
46          super();
47          this._channel = channel;
48          _socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null;
49          if (_socket!=null)
50          {
51              _local=(InetSocketAddress)_socket.getLocalSocketAddress();
52              _remote=(InetSocketAddress)_socket.getRemoteSocketAddress();
53              _maxIdleTime=_socket.getSoTimeout();
54          }
55          else
56          {
57              _local=_remote=null;
58          }
59      }
60  
61      protected ChannelEndPoint(ByteChannel channel, int maxIdleTime) throws IOException
62      {
63          this._channel = channel;
64          _maxIdleTime=maxIdleTime;
65          _socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null;
66          if (_socket!=null)
67          {
68              _local=(InetSocketAddress)_socket.getLocalSocketAddress();
69              _remote=(InetSocketAddress)_socket.getRemoteSocketAddress();
70              _socket.setSoTimeout(_maxIdleTime);
71          }
72          else
73          {
74              _local=_remote=null;
75          }
76      }
77  
78      public boolean isBlocking()
79      {
80          return  !(_channel instanceof SelectableChannel) || ((SelectableChannel)_channel).isBlocking();
81      }
82  
83      public boolean blockReadable(long millisecs) throws IOException
84      {
85          return true;
86      }
87  
88      public boolean blockWritable(long millisecs) throws IOException
89      {
90          return true;
91      }
92  
93      /*
94       * @see org.eclipse.io.EndPoint#isOpen()
95       */
96      public boolean isOpen()
97      {
98          return _channel.isOpen();
99      }
100 
101     /* (non-Javadoc)
102      * @see org.eclipse.io.EndPoint#close()
103      */
104     public void shutdownInput() throws IOException
105     {
106         if (_channel.isOpen() && _channel instanceof SocketChannel)
107         {
108             Socket socket= ((SocketChannel)_channel).socket();
109             if (!socket.isClosed()&&!socket.isInputShutdown())
110                 socket.shutdownInput();
111         }
112     }
113 
114     /* (non-Javadoc)
115      * @see org.eclipse.io.EndPoint#close()
116      */
117     public void shutdownOutput() throws IOException
118     {
119         if (_channel.isOpen() && _channel instanceof SocketChannel)
120         {
121             Socket socket= ((SocketChannel)_channel).socket();
122             if (!socket.isClosed()&&!socket.isOutputShutdown())
123                 socket.shutdownOutput();
124         }
125     }
126 
127     public boolean isOutputShutdown()
128     {
129         return _channel.isOpen() && _socket!=null && _socket.isOutputShutdown();
130     }
131 
132     public boolean isInputShutdown()
133     {
134         return _channel.isOpen() && _socket!=null && _socket.isInputShutdown();
135     }
136 
137     /* (non-Javadoc)
138      * @see org.eclipse.io.EndPoint#close()
139      */
140     public void close() throws IOException
141     {
142         if (_socket!=null && !_socket.isOutputShutdown())
143         {
144             try
145             {
146                 _socket.shutdownOutput();
147             }
148             catch (IOException x)
149             {
150                 Log.ignore(x);
151             }
152         }
153         _channel.close();
154     }
155 
156     /* (non-Javadoc)
157      * @see org.eclipse.io.EndPoint#fill(org.eclipse.io.Buffer)
158      */
159     public int fill(Buffer buffer) throws IOException
160     {
161         Buffer buf = buffer.buffer();
162         int len=0;
163         if (buf instanceof NIOBuffer)
164         {
165             final NIOBuffer nbuf = (NIOBuffer)buf;
166             final ByteBuffer bbuf=nbuf.getByteBuffer();
167             //noinspection SynchronizationOnLocalVariableOrMethodParameter
168             synchronized(bbuf)
169             {
170                 try
171                 {
172                     bbuf.position(buffer.putIndex());
173                     len=_channel.read(bbuf);
174                     if (len<0 && isOpen() && !isInputShutdown())
175                     {
176                         try
177                         {
178                             shutdownInput();
179                         }
180                         catch(IOException x)
181                         {
182                             Log.ignore(x);
183                             try
184                             {
185                                 close();
186                             }
187                             catch (IOException xx)
188                             {
189                                 Log.ignore(xx);
190                             }
191                         }
192                     }
193                 }
194                 catch (IOException x)
195                 {
196                     try
197                     {
198                         close();
199                     }
200                     catch (IOException xx)
201                     {
202                         Log.ignore(xx);
203                     }
204                     throw x;
205                 }
206                 finally
207                 {
208                     buffer.setPutIndex(bbuf.position());
209                     bbuf.position(0);
210                 }
211             }
212         }
213         else
214         {
215             throw new IOException("Not Implemented");
216         }
217 
218         return len;
219     }
220 
221     /* (non-Javadoc)
222      * @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer)
223      */
224     public int flush(Buffer buffer) throws IOException
225     {
226         Buffer buf = buffer.buffer();
227         int len=0;
228         if (buf instanceof NIOBuffer)
229         {
230             final NIOBuffer nbuf = (NIOBuffer)buf;
231             final ByteBuffer bbuf=nbuf.getByteBuffer();
232 
233             //noinspection SynchronizationOnLocalVariableOrMethodParameter
234             synchronized(bbuf)
235             {
236                 try
237                 {
238                     bbuf.position(buffer.getIndex());
239                     bbuf.limit(buffer.putIndex());
240                     len=_channel.write(bbuf);
241                 }
242                 finally
243                 {
244                     if (len>0)
245                         buffer.skip(len);
246                     bbuf.position(0);
247                     bbuf.limit(bbuf.capacity());
248                 }
249             }
250         }
251         else if (buf instanceof RandomAccessFileBuffer)
252         {
253             len = ((RandomAccessFileBuffer)buf).writeTo(_channel,buffer.getIndex(),buffer.length());
254             if (len>0)
255                 buffer.skip(len);
256         }
257         else if (buffer.array()!=null)
258         {
259             ByteBuffer b = ByteBuffer.wrap(buffer.array(), buffer.getIndex(), buffer.length());
260             len=_channel.write(b);
261             if (len>0)
262                 buffer.skip(len);
263         }
264         else
265         {
266             throw new IOException("Not Implemented");
267         }
268         return len;
269     }
270 
271     /* (non-Javadoc)
272      * @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer)
273      */
274     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
275     {
276         int length=0;
277 
278         Buffer buf0 = header==null?null:header.buffer();
279         Buffer buf1 = buffer==null?null:buffer.buffer();
280 
281         if (_channel instanceof GatheringByteChannel &&
282             header!=null && header.length()!=0 && buf0 instanceof NIOBuffer &&
283             buffer!=null && buffer.length()!=0 && buf1 instanceof NIOBuffer)
284         {
285             length = gatheringFlush(header,((NIOBuffer)buf0).getByteBuffer(),buffer,((NIOBuffer)buf1).getByteBuffer());
286         }
287         else
288         {
289             if (header!=null)
290             {
291                 if (buffer!=null && buffer.length()>0 && header.space()>buffer.length())
292                 {
293                     header.put(buffer);
294                     buffer.clear();
295                 }
296                 if (trailer!=null && trailer.length()>0 && header.space()>trailer.length())
297                 {
298                     header.put(trailer);
299                     trailer.clear();
300                 }
301             }
302 
303             // flush header
304             if (header!=null && header.length()>0)
305                 length=flush(header);
306 
307             // flush buffer
308             if ((header==null || header.length()==0) &&
309                  buffer!=null && buffer.length()>0)
310                 length+=flush(buffer);
311 
312             // flush trailer
313             if ((header==null || header.length()==0) &&
314                 (buffer==null || buffer.length()==0) &&
315                  trailer!=null && trailer.length()>0)
316                 length+=flush(trailer);
317         }
318 
319         return length;
320     }
321 
322     protected int gatheringFlush(Buffer header, ByteBuffer bbuf0, Buffer buffer, ByteBuffer bbuf1) throws IOException
323     {
324         int length;
325 
326         synchronized(this)
327         {
328             // We must sync because buffers may be shared (eg nbuf1 is likely to be cached content).
329             //noinspection SynchronizationOnLocalVariableOrMethodParameter
330             synchronized(bbuf0)
331             {
332                 //noinspection SynchronizationOnLocalVariableOrMethodParameter
333                 synchronized(bbuf1)
334                 {
335                     try
336                     {
337                         // Adjust position indexs of buf0 and buf1
338                         bbuf0.position(header.getIndex());
339                         bbuf0.limit(header.putIndex());
340                         bbuf1.position(buffer.getIndex());
341                         bbuf1.limit(buffer.putIndex());
342 
343                         _gather2[0]=bbuf0;
344                         _gather2[1]=bbuf1;
345 
346                         // do the gathering write.
347                         length=(int)((GatheringByteChannel)_channel).write(_gather2);
348 
349                         int hl=header.length();
350                         if (length>hl)
351                         {
352                             header.clear();
353                             buffer.skip(length-hl);
354                         }
355                         else if (length>0)
356                         {
357                             header.skip(length);
358                         }
359                     }
360                     finally
361                     {
362                         bbuf0.position(0);
363                         bbuf1.position(0);
364                         bbuf0.limit(bbuf0.capacity());
365                         bbuf1.limit(bbuf1.capacity());
366                     }
367                 }
368             }
369         }
370         return length;
371     }
372 
373     /* ------------------------------------------------------------ */
374     /**
375      * @return Returns the channel.
376      */
377     public ByteChannel getChannel()
378     {
379         return _channel;
380     }
381 
382 
383     /* ------------------------------------------------------------ */
384     /*
385      * @see org.eclipse.io.EndPoint#getLocalAddr()
386      */
387     public String getLocalAddr()
388     {
389         if (_socket==null)
390             return null;
391        if (_local==null || _local.getAddress()==null || _local.getAddress().isAnyLocalAddress())
392            return StringUtil.ALL_INTERFACES;
393         return _local.getAddress().getHostAddress();
394     }
395 
396     /* ------------------------------------------------------------ */
397     /*
398      * @see org.eclipse.io.EndPoint#getLocalHost()
399      */
400     public String getLocalHost()
401     {
402         if (_socket==null)
403             return null;
404        if (_local==null || _local.getAddress()==null || _local.getAddress().isAnyLocalAddress())
405            return StringUtil.ALL_INTERFACES;
406         return _local.getAddress().getCanonicalHostName();
407     }
408 
409     /* ------------------------------------------------------------ */
410     /*
411      * @see org.eclipse.io.EndPoint#getLocalPort()
412      */
413     public int getLocalPort()
414     {
415         if (_socket==null)
416             return 0;
417         if (_local==null)
418             return -1;
419         return _local.getPort();
420     }
421 
422     /* ------------------------------------------------------------ */
423     /*
424      * @see org.eclipse.io.EndPoint#getRemoteAddr()
425      */
426     public String getRemoteAddr()
427     {
428         if (_socket==null)
429             return null;
430         if (_remote==null)
431             return null;
432         return _remote.getAddress().getHostAddress();
433     }
434 
435     /* ------------------------------------------------------------ */
436     /*
437      * @see org.eclipse.io.EndPoint#getRemoteHost()
438      */
439     public String getRemoteHost()
440     {
441         if (_socket==null)
442             return null;
443         if (_remote==null)
444             return null;
445         return _remote.getAddress().getCanonicalHostName();
446     }
447 
448     /* ------------------------------------------------------------ */
449     /*
450      * @see org.eclipse.io.EndPoint#getRemotePort()
451      */
452     public int getRemotePort()
453     {
454         if (_socket==null)
455             return 0;
456         return _remote==null?-1:_remote.getPort();
457     }
458 
459     /* ------------------------------------------------------------ */
460     /*
461      * @see org.eclipse.io.EndPoint#getConnection()
462      */
463     public Object getTransport()
464     {
465         return _channel;
466     }
467 
468     /* ------------------------------------------------------------ */
469     public void flush()
470         throws IOException
471     {
472     }
473 
474     /* ------------------------------------------------------------ */
475     public boolean isBufferingInput()
476     {
477         return false;
478     }
479 
480     /* ------------------------------------------------------------ */
481     public boolean isBufferingOutput()
482     {
483         return false;
484     }
485 
486     /* ------------------------------------------------------------ */
487     public boolean isBufferred()
488     {
489         return false;
490     }
491 
492     /* ------------------------------------------------------------ */
493     public int getMaxIdleTime()
494     {
495         return _maxIdleTime;
496     }
497 
498     /* ------------------------------------------------------------ */
499     /**
500      * @see org.eclipse.jetty.io.bio.StreamEndPoint#setMaxIdleTime(int)
501      */
502     public void setMaxIdleTime(int timeMs) throws IOException
503     {
504         if (_socket!=null && timeMs!=_maxIdleTime)
505             _socket.setSoTimeout(timeMs>0?timeMs:0);
506         _maxIdleTime=timeMs;
507     }
508 }