View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io.nio;
20  
21  import java.io.File;
22  import java.io.FileInputStream;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.io.OutputStream;
26  import java.nio.ByteBuffer;
27  import java.nio.channels.Channels;
28  import java.nio.channels.FileChannel;
29  import java.nio.channels.ReadableByteChannel;
30  import java.nio.channels.WritableByteChannel;
31  
32  import org.eclipse.jetty.io.AbstractBuffer;
33  import org.eclipse.jetty.io.Buffer;
34  import org.eclipse.jetty.util.IO;
35  import org.eclipse.jetty.util.log.Log;
36  import org.eclipse.jetty.util.log.Logger;
37  
38  /* ------------------------------------------------------------------------------- */
39  /** 
40   * 
41   * 
42   */
43  public class DirectNIOBuffer extends AbstractBuffer implements NIOBuffer
44  { 	
45      private static final Logger LOG = Log.getLogger(DirectNIOBuffer.class);
46  
47      protected final ByteBuffer _buf;
48      private ReadableByteChannel _in;
49      private InputStream _inStream;
50      private WritableByteChannel _out;
51      private OutputStream _outStream;
52  
53      public DirectNIOBuffer(int size)
54      {
55          super(READWRITE,NON_VOLATILE);
56          _buf = ByteBuffer.allocateDirect(size);
57          _buf.position(0);
58          _buf.limit(_buf.capacity());
59      }
60      
61      public DirectNIOBuffer(ByteBuffer buffer,boolean immutable)
62      {
63          super(immutable?IMMUTABLE:READWRITE,NON_VOLATILE);
64          if (!buffer.isDirect())
65              throw new IllegalArgumentException();
66          _buf = buffer;
67          setGetIndex(buffer.position());
68          setPutIndex(buffer.limit());
69      }
70  
71      /**
72       * @param file
73       */
74      public DirectNIOBuffer(File file) throws IOException
75      {
76          super(READONLY,NON_VOLATILE);
77          FileInputStream fis = null;
78          FileChannel fc = null;
79          try
80          {
81              fis = new FileInputStream(file);
82              fc = fis.getChannel();
83              _buf = fc.map(FileChannel.MapMode.READ_ONLY, 0, file.length());
84              setGetIndex(0);
85              setPutIndex((int)file.length());
86              _access=IMMUTABLE;
87          }
88          finally
89          {
90              if (fc != null) try {fc.close();} catch (IOException e){LOG.ignore(e);}
91              IO.close(fis);
92          }
93      }
94  
95      /* ------------------------------------------------------------ */
96      public boolean isDirect()
97      {
98          return true;
99      }
100 
101     /* ------------------------------------------------------------ */
102     public byte[] array()
103     {
104         return null;
105     }
106 
107     /* ------------------------------------------------------------ */
108     public int capacity()
109     {
110         return _buf.capacity();
111     }
112 
113     /* ------------------------------------------------------------ */
114     public byte peek(int position)
115     {
116         return _buf.get(position);
117     }
118 
119     public int peek(int index, byte[] b, int offset, int length)
120     {
121         int l = length;
122         if (index+l > capacity())
123         {
124             l=capacity()-index;
125             if (l==0)
126                 return -1;
127         }
128         
129         if (l < 0) 
130             return -1;
131         try
132         {
133             _buf.position(index);
134             _buf.get(b,offset,l);
135         }
136         finally
137         {
138             _buf.position(0);
139         }
140         
141         return l;
142     }
143 
144     public void poke(int index, byte b)
145     {
146         if (isReadOnly()) throw new IllegalStateException(__READONLY);
147         if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
148         if (index > capacity())
149                 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
150         _buf.put(index,b);
151     }
152 
153     @Override
154     public int poke(int index, Buffer src)
155     {
156         if (isReadOnly()) throw new IllegalStateException(__READONLY);
157 
158         byte[] array=src.array();
159         if (array!=null)
160         {
161             return poke(index,array,src.getIndex(),src.length());
162         }
163         else
164         {
165             Buffer src_buf=src.buffer();
166             if (src_buf instanceof DirectNIOBuffer)
167             {
168                 ByteBuffer src_bytebuf = ((DirectNIOBuffer)src_buf)._buf;
169                 if (src_bytebuf==_buf)
170                     src_bytebuf=_buf.duplicate();
171                 try
172                 {   
173                     _buf.position(index);
174                     int space = _buf.remaining();
175                     
176                     int length=src.length();
177                     if (length>space)    
178                         length=space;
179                     
180                     src_bytebuf.position(src.getIndex());
181                     src_bytebuf.limit(src.getIndex()+length);
182                     
183                     _buf.put(src_bytebuf);
184                     return length;
185                 }
186                 finally
187                 {
188                     _buf.position(0);
189                     src_bytebuf.limit(src_bytebuf.capacity());
190                     src_bytebuf.position(0);
191                 }
192             }
193             else
194                 return super.poke(index,src);
195         }
196     }
197     
198     @Override
199     public int poke(int index, byte[] b, int offset, int length)
200     {
201         if (isReadOnly()) throw new IllegalStateException(__READONLY);
202 
203         if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
204 
205         if (index + length > capacity())
206         {
207             length=capacity()-index;
208             if (length<0)
209                 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
210         }
211 
212         try
213         {
214             _buf.position(index);
215             
216             int space=_buf.remaining();
217             
218             if (length>space)
219                 length=space;
220             if (length>0)
221                 _buf.put(b,offset,length);
222             return length;
223         }
224         finally
225         {
226             _buf.position(0);
227         }
228     }
229     
230     /* ------------------------------------------------------------ */
231     public ByteBuffer getByteBuffer()
232     {
233         return _buf;
234     }
235 
236     /* ------------------------------------------------------------ */
237     @Override
238     public int readFrom(InputStream in, int max) throws IOException
239     {
240         if (_in==null || !_in.isOpen() || in!=_inStream)
241         {
242             _in=Channels.newChannel(in);
243             _inStream=in;
244         }
245 
246         if (max<0 || max>space())
247             max=space();
248         int p = putIndex();
249         
250         try
251         {
252             int len=0, total=0, available=max;
253             int loop=0;
254             while (total<max) 
255             {
256                 _buf.position(p);
257                 _buf.limit(p+available);
258                 len=_in.read(_buf);
259                 if (len<0)
260                 {
261                     _in=null;
262                     _inStream=in;
263                     break;
264                 }
265                 else if (len>0)
266                 {
267                     p += len;
268                     total += len;
269                     available -= len;
270                     setPutIndex(p);
271                     loop=0;
272                 }
273                 else if (loop++>1)
274                     break;
275                 if (in.available()<=0)
276                     break;
277             }
278             if (len<0 && total==0)
279                 return -1;
280             return total;
281             
282         }
283         catch(IOException e)
284         {
285             _in=null;
286             _inStream=in;
287             throw e;
288         }
289         finally
290         {
291             if (_in!=null && !_in.isOpen())
292             {
293                 _in=null;
294                 _inStream=in;
295             }
296             _buf.position(0);
297             _buf.limit(_buf.capacity());
298         }
299     }
300 
301     /* ------------------------------------------------------------ */
302     @Override
303     public void writeTo(OutputStream out) throws IOException
304     {
305         if (_out==null || !_out.isOpen() || out!=_outStream)
306         {
307             _out=Channels.newChannel(out);
308             _outStream=out;
309         }
310 
311         synchronized (_buf)
312         {
313             try
314             {
315                 int loop=0;
316                 while(hasContent() && _out.isOpen())
317                 {
318                     _buf.position(getIndex());
319                     _buf.limit(putIndex());
320                     int len=_out.write(_buf);
321                     if (len<0)
322                         break;
323                     else if (len>0)
324                     {
325                         skip(len);
326                         loop=0;
327                     }
328                     else if (loop++>1)
329                         break;
330                 }
331 
332             }
333             catch(IOException e)
334             {
335                 _out=null;
336                 _outStream=null;
337                 throw e;
338             }
339             finally
340             {
341                 if (_out!=null && !_out.isOpen())
342                 {
343                     _out=null;
344                     _outStream=null;
345                 }
346                 _buf.position(0);
347                 _buf.limit(_buf.capacity());
348             }
349         }
350     }
351 
352     
353     
354 }