View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.File;
17  import java.io.FileInputStream;
18  import java.io.IOException;
19  import java.io.InputStream;
20  import java.io.OutputStream;
21  import java.nio.ByteBuffer;
22  import java.nio.channels.Channels;
23  import java.nio.channels.FileChannel;
24  import java.nio.channels.ReadableByteChannel;
25  import java.nio.channels.WritableByteChannel;
26  
27  import org.eclipse.jetty.io.AbstractBuffer;
28  import org.eclipse.jetty.io.Buffer;
29  
30  /* ------------------------------------------------------------------------------- */
31  /** 
32   * 
33   * 
34   */
35  public class DirectNIOBuffer extends AbstractBuffer implements NIOBuffer
36  { 	
37      protected final ByteBuffer _buf;
38      private ReadableByteChannel _in;
39      private InputStream _inStream;
40      private WritableByteChannel _out;
41      private OutputStream _outStream;
42  
43      public DirectNIOBuffer(int size)
44      {
45          super(READWRITE,NON_VOLATILE);
46          _buf = ByteBuffer.allocateDirect(size);
47          _buf.position(0);
48          _buf.limit(_buf.capacity());
49      }
50      
51      public DirectNIOBuffer(ByteBuffer buffer,boolean immutable)
52      {
53          super(immutable?IMMUTABLE:READWRITE,NON_VOLATILE);
54          if (!buffer.isDirect())
55              throw new IllegalArgumentException();
56          _buf = buffer;
57          setGetIndex(buffer.position());
58          setPutIndex(buffer.limit());
59      }
60  
61      /**
62       * @param file
63       */
64      public DirectNIOBuffer(File file) throws IOException
65      {
66          super(READONLY,NON_VOLATILE);
67          FileInputStream fis = new FileInputStream(file);
68          FileChannel fc = fis.getChannel();
69          _buf = fc.map(FileChannel.MapMode.READ_ONLY, 0, file.length());
70          setGetIndex(0);
71          setPutIndex((int)file.length());
72          _access=IMMUTABLE;
73      }
74  
75      /* ------------------------------------------------------------ */
76      public boolean isDirect()
77      {
78          return true;
79      }
80  
81      /* ------------------------------------------------------------ */
82      public byte[] array()
83      {
84          return null;
85      }
86  
87      /* ------------------------------------------------------------ */
88      public int capacity()
89      {
90          return _buf.capacity();
91      }
92  
93      /* ------------------------------------------------------------ */
94      public byte peek(int position)
95      {
96          return _buf.get(position);
97      }
98  
99      public int peek(int index, byte[] b, int offset, int length)
100     {
101         int l = length;
102         if (index+l > capacity())
103         {
104             l=capacity()-index;
105             if (l==0)
106                 return -1;
107         }
108         
109         if (l < 0) 
110             return -1;
111         try
112         {
113             _buf.position(index);
114             _buf.get(b,offset,l);
115         }
116         finally
117         {
118             _buf.position(0);
119         }
120         
121         return l;
122     }
123 
124     public void poke(int index, byte b)
125     {
126         if (isReadOnly()) throw new IllegalStateException(__READONLY);
127         if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
128         if (index > capacity())
129                 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
130         _buf.put(index,b);
131     }
132 
133     public int poke(int index, Buffer src)
134     {
135         if (isReadOnly()) throw new IllegalStateException(__READONLY);
136 
137         byte[] array=src.array();
138         if (array!=null)
139         {
140             return poke(index,array,src.getIndex(),src.length());
141         }
142         else
143         {
144             Buffer src_buf=src.buffer();
145             if (src_buf instanceof DirectNIOBuffer)
146             {
147                 ByteBuffer src_bytebuf = ((DirectNIOBuffer)src_buf)._buf;
148                 if (src_bytebuf==_buf)
149                     src_bytebuf=_buf.duplicate();
150                 try
151                 {   
152                     _buf.position(index);
153                     int space = _buf.remaining();
154                     
155                     int length=src.length();
156                     if (length>space)    
157                         length=space;
158                     
159                     src_bytebuf.position(src.getIndex());
160                     src_bytebuf.limit(src.getIndex()+length);
161                     
162                     _buf.put(src_bytebuf);
163                     return length;
164                 }
165                 finally
166                 {
167                     _buf.position(0);
168                     src_bytebuf.limit(src_bytebuf.capacity());
169                     src_bytebuf.position(0);
170                 }
171             }
172             else
173                 return super.poke(index,src);
174         }
175     }
176     
177     public int poke(int index, byte[] b, int offset, int length)
178     {
179         if (isReadOnly()) throw new IllegalStateException(__READONLY);
180 
181         if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
182 
183         if (index + length > capacity())
184         {
185             length=capacity()-index;
186             if (length<0)
187                 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
188         }
189 
190         try
191         {
192             _buf.position(index);
193             
194             int space=_buf.remaining();
195             
196             if (length>space)
197                 length=space;
198             if (length>0)
199                 _buf.put(b,offset,length);
200             return length;
201         }
202         finally
203         {
204             _buf.position(0);
205         }
206     }
207     
208     /* ------------------------------------------------------------ */
209     public ByteBuffer getByteBuffer()
210     {
211         return _buf;
212     }
213 
214     /* ------------------------------------------------------------ */
215     public int readFrom(InputStream in, int max) throws IOException
216     {
217         if (_in==null || !_in.isOpen() || in!=_inStream)
218         {
219             _in=Channels.newChannel(in);
220             _inStream=in;
221         }
222 
223         if (max<0 || max>space())
224             max=space();
225         int p = putIndex();
226         
227         try
228         {
229             int len=0, total=0, available=max;
230             int loop=0;
231             while (total<max) 
232             {
233                 _buf.position(p);
234                 _buf.limit(p+available);
235                 len=_in.read(_buf);
236                 if (len<0)
237                 {
238                     _in=null;
239                     _inStream=in;
240                     break;
241                 }
242                 else if (len>0)
243                 {
244                     p += len;
245                     total += len;
246                     available -= len;
247                     setPutIndex(p);
248                     loop=0;
249                 }
250                 else if (loop++>1)
251                     break;
252                 if (in.available()<=0)
253                     break;
254             }
255             if (len<0 && total==0)
256                 return -1;
257             return total;
258             
259         }
260         catch(IOException e)
261         {
262             _in=null;
263             _inStream=in;
264             throw e;
265         }
266         finally
267         {
268             if (_in!=null && !_in.isOpen())
269             {
270                 _in=null;
271                 _inStream=in;
272             }
273             _buf.position(0);
274             _buf.limit(_buf.capacity());
275         }
276     }
277 
278     /* ------------------------------------------------------------ */
279     public void writeTo(OutputStream out) throws IOException
280     {
281         if (_out==null || !_out.isOpen() || out!=_outStream)
282         {
283             _out=Channels.newChannel(out);
284             _outStream=out;
285         }
286 
287         synchronized (_buf)
288         {
289             try
290             {
291                 int loop=0;
292                 while(hasContent() && _out.isOpen())
293                 {
294                     _buf.position(getIndex());
295                     _buf.limit(putIndex());
296                     int len=_out.write(_buf);
297                     if (len<0)
298                         break;
299                     else if (len>0)
300                     {
301                         skip(len);
302                         loop=0;
303                     }
304                     else if (loop++>1)
305                         break;
306                 }
307 
308             }
309             catch(IOException e)
310             {
311                 _out=null;
312                 _outStream=null;
313                 throw e;
314             }
315             finally
316             {
317                 if (_out!=null && !_out.isOpen())
318                 {
319                     _out=null;
320                     _outStream=null;
321                 }
322                 _buf.position(0);
323                 _buf.limit(_buf.capacity());
324             }
325         }
326     }
327 
328     
329     
330 }