View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.File;
17  import java.io.FileInputStream;
18  import java.io.IOException;
19  import java.io.InputStream;
20  import java.io.OutputStream;
21  import java.nio.ByteBuffer;
22  import java.nio.channels.Channels;
23  import java.nio.channels.FileChannel;
24  import java.nio.channels.ReadableByteChannel;
25  import java.nio.channels.WritableByteChannel;
26  
27  import org.eclipse.jetty.io.AbstractBuffer;
28  import org.eclipse.jetty.io.Buffer;
29  
30  /* ------------------------------------------------------------------------------- */
31  /** 
32   * 
33   * 
34   */
35  public class DirectNIOBuffer extends AbstractBuffer implements NIOBuffer
36  { 	
37      protected final ByteBuffer _buf;
38      private ReadableByteChannel _in;
39      private InputStream _inStream;
40      private WritableByteChannel _out;
41      private OutputStream _outStream;
42  
43      public DirectNIOBuffer(int size)
44      {
45          super(READWRITE,NON_VOLATILE);
46          _buf = ByteBuffer.allocateDirect(size);
47          _buf.position(0);
48          _buf.limit(_buf.capacity());
49      }
50      
51      public DirectNIOBuffer(ByteBuffer buffer,boolean immutable)
52      {
53          super(immutable?IMMUTABLE:READWRITE,NON_VOLATILE);
54          if (!buffer.isDirect())
55              throw new IllegalArgumentException();
56          _buf = buffer;
57          setGetIndex(buffer.position());
58          setPutIndex(buffer.limit());
59      }
60  
61      /**
62       * @param file
63       */
64      public DirectNIOBuffer(File file) throws IOException
65      {
66          super(READONLY,NON_VOLATILE);
67          FileInputStream fis = new FileInputStream(file);
68          FileChannel fc = fis.getChannel();
69          _buf = fc.map(FileChannel.MapMode.READ_ONLY, 0, file.length());
70          setGetIndex(0);
71          setPutIndex((int)file.length());
72          _access=IMMUTABLE;
73      }
74  
75      /* ------------------------------------------------------------ */
76      public boolean isDirect()
77      {
78          return true;
79      }
80  
81      /* ------------------------------------------------------------ */
82      public byte[] array()
83      {
84          return null;
85      }
86  
87      /* ------------------------------------------------------------ */
88      public int capacity()
89      {
90          return _buf.capacity();
91      }
92  
93      /* ------------------------------------------------------------ */
94      public byte peek(int position)
95      {
96          return _buf.get(position);
97      }
98  
99      public int peek(int index, byte[] b, int offset, int length)
100     {
101         int l = length;
102         if (index+l > capacity())
103         {
104             l=capacity()-index;
105             if (l==0)
106                 return -1;
107         }
108         
109         if (l < 0) 
110             return -1;
111         try
112         {
113             _buf.position(index);
114             _buf.get(b,offset,l);
115         }
116         finally
117         {
118             _buf.position(0);
119         }
120         
121         return l;
122     }
123 
124     public void poke(int index, byte b)
125     {
126         if (isReadOnly()) throw new IllegalStateException(__READONLY);
127         if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
128         if (index > capacity())
129                 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
130         _buf.put(index,b);
131     }
132 
133     @Override
134     public int poke(int index, Buffer src)
135     {
136         if (isReadOnly()) throw new IllegalStateException(__READONLY);
137 
138         byte[] array=src.array();
139         if (array!=null)
140         {
141             return poke(index,array,src.getIndex(),src.length());
142         }
143         else
144         {
145             Buffer src_buf=src.buffer();
146             if (src_buf instanceof DirectNIOBuffer)
147             {
148                 ByteBuffer src_bytebuf = ((DirectNIOBuffer)src_buf)._buf;
149                 if (src_bytebuf==_buf)
150                     src_bytebuf=_buf.duplicate();
151                 try
152                 {   
153                     _buf.position(index);
154                     int space = _buf.remaining();
155                     
156                     int length=src.length();
157                     if (length>space)    
158                         length=space;
159                     
160                     src_bytebuf.position(src.getIndex());
161                     src_bytebuf.limit(src.getIndex()+length);
162                     
163                     _buf.put(src_bytebuf);
164                     return length;
165                 }
166                 finally
167                 {
168                     _buf.position(0);
169                     src_bytebuf.limit(src_bytebuf.capacity());
170                     src_bytebuf.position(0);
171                 }
172             }
173             else
174                 return super.poke(index,src);
175         }
176     }
177     
178     @Override
179     public int poke(int index, byte[] b, int offset, int length)
180     {
181         if (isReadOnly()) throw new IllegalStateException(__READONLY);
182 
183         if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
184 
185         if (index + length > capacity())
186         {
187             length=capacity()-index;
188             if (length<0)
189                 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
190         }
191 
192         try
193         {
194             _buf.position(index);
195             
196             int space=_buf.remaining();
197             
198             if (length>space)
199                 length=space;
200             if (length>0)
201                 _buf.put(b,offset,length);
202             return length;
203         }
204         finally
205         {
206             _buf.position(0);
207         }
208     }
209     
210     /* ------------------------------------------------------------ */
211     public ByteBuffer getByteBuffer()
212     {
213         return _buf;
214     }
215 
216     /* ------------------------------------------------------------ */
217     @Override
218     public int readFrom(InputStream in, int max) throws IOException
219     {
220         if (_in==null || !_in.isOpen() || in!=_inStream)
221         {
222             _in=Channels.newChannel(in);
223             _inStream=in;
224         }
225 
226         if (max<0 || max>space())
227             max=space();
228         int p = putIndex();
229         
230         try
231         {
232             int len=0, total=0, available=max;
233             int loop=0;
234             while (total<max) 
235             {
236                 _buf.position(p);
237                 _buf.limit(p+available);
238                 len=_in.read(_buf);
239                 if (len<0)
240                 {
241                     _in=null;
242                     _inStream=in;
243                     break;
244                 }
245                 else if (len>0)
246                 {
247                     p += len;
248                     total += len;
249                     available -= len;
250                     setPutIndex(p);
251                     loop=0;
252                 }
253                 else if (loop++>1)
254                     break;
255                 if (in.available()<=0)
256                     break;
257             }
258             if (len<0 && total==0)
259                 return -1;
260             return total;
261             
262         }
263         catch(IOException e)
264         {
265             _in=null;
266             _inStream=in;
267             throw e;
268         }
269         finally
270         {
271             if (_in!=null && !_in.isOpen())
272             {
273                 _in=null;
274                 _inStream=in;
275             }
276             _buf.position(0);
277             _buf.limit(_buf.capacity());
278         }
279     }
280 
281     /* ------------------------------------------------------------ */
282     @Override
283     public void writeTo(OutputStream out) throws IOException
284     {
285         if (_out==null || !_out.isOpen() || out!=_outStream)
286         {
287             _out=Channels.newChannel(out);
288             _outStream=out;
289         }
290 
291         synchronized (_buf)
292         {
293             try
294             {
295                 int loop=0;
296                 while(hasContent() && _out.isOpen())
297                 {
298                     _buf.position(getIndex());
299                     _buf.limit(putIndex());
300                     int len=_out.write(_buf);
301                     if (len<0)
302                         break;
303                     else if (len>0)
304                     {
305                         skip(len);
306                         loop=0;
307                     }
308                     else if (loop++>1)
309                         break;
310                 }
311 
312             }
313             catch(IOException e)
314             {
315                 _out=null;
316                 _outStream=null;
317                 throw e;
318             }
319             finally
320             {
321                 if (_out!=null && !_out.isOpen())
322                 {
323                     _out=null;
324                     _outStream=null;
325                 }
326                 _buf.position(0);
327                 _buf.limit(_buf.capacity());
328             }
329         }
330     }
331 
332     
333     
334 }