View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io.nio;
20  
21  import java.io.File;
22  import java.io.FileInputStream;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.io.OutputStream;
26  import java.nio.ByteBuffer;
27  import java.nio.channels.Channels;
28  import java.nio.channels.FileChannel;
29  import java.nio.channels.ReadableByteChannel;
30  import java.nio.channels.WritableByteChannel;
31  
32  import org.eclipse.jetty.io.AbstractBuffer;
33  import org.eclipse.jetty.io.Buffer;
34  
35  /* ------------------------------------------------------------------------------- */
36  /** 
37   * 
38   * 
39   */
40  public class DirectNIOBuffer extends AbstractBuffer implements NIOBuffer
41  { 	
42      protected final ByteBuffer _buf;
43      private ReadableByteChannel _in;
44      private InputStream _inStream;
45      private WritableByteChannel _out;
46      private OutputStream _outStream;
47  
48      public DirectNIOBuffer(int size)
49      {
50          super(READWRITE,NON_VOLATILE);
51          _buf = ByteBuffer.allocateDirect(size);
52          _buf.position(0);
53          _buf.limit(_buf.capacity());
54      }
55      
56      public DirectNIOBuffer(ByteBuffer buffer,boolean immutable)
57      {
58          super(immutable?IMMUTABLE:READWRITE,NON_VOLATILE);
59          if (!buffer.isDirect())
60              throw new IllegalArgumentException();
61          _buf = buffer;
62          setGetIndex(buffer.position());
63          setPutIndex(buffer.limit());
64      }
65  
66      /**
67       * @param file
68       */
69      public DirectNIOBuffer(File file) throws IOException
70      {
71          super(READONLY,NON_VOLATILE);
72          FileInputStream fis = new FileInputStream(file);
73          FileChannel fc = fis.getChannel();
74          _buf = fc.map(FileChannel.MapMode.READ_ONLY, 0, file.length());
75          setGetIndex(0);
76          setPutIndex((int)file.length());
77          _access=IMMUTABLE;
78      }
79  
80      /* ------------------------------------------------------------ */
81      public boolean isDirect()
82      {
83          return true;
84      }
85  
86      /* ------------------------------------------------------------ */
87      public byte[] array()
88      {
89          return null;
90      }
91  
92      /* ------------------------------------------------------------ */
93      public int capacity()
94      {
95          return _buf.capacity();
96      }
97  
98      /* ------------------------------------------------------------ */
99      public byte peek(int position)
100     {
101         return _buf.get(position);
102     }
103 
104     public int peek(int index, byte[] b, int offset, int length)
105     {
106         int l = length;
107         if (index+l > capacity())
108         {
109             l=capacity()-index;
110             if (l==0)
111                 return -1;
112         }
113         
114         if (l < 0) 
115             return -1;
116         try
117         {
118             _buf.position(index);
119             _buf.get(b,offset,l);
120         }
121         finally
122         {
123             _buf.position(0);
124         }
125         
126         return l;
127     }
128 
129     public void poke(int index, byte b)
130     {
131         if (isReadOnly()) throw new IllegalStateException(__READONLY);
132         if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
133         if (index > capacity())
134                 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
135         _buf.put(index,b);
136     }
137 
138     @Override
139     public int poke(int index, Buffer src)
140     {
141         if (isReadOnly()) throw new IllegalStateException(__READONLY);
142 
143         byte[] array=src.array();
144         if (array!=null)
145         {
146             return poke(index,array,src.getIndex(),src.length());
147         }
148         else
149         {
150             Buffer src_buf=src.buffer();
151             if (src_buf instanceof DirectNIOBuffer)
152             {
153                 ByteBuffer src_bytebuf = ((DirectNIOBuffer)src_buf)._buf;
154                 if (src_bytebuf==_buf)
155                     src_bytebuf=_buf.duplicate();
156                 try
157                 {   
158                     _buf.position(index);
159                     int space = _buf.remaining();
160                     
161                     int length=src.length();
162                     if (length>space)    
163                         length=space;
164                     
165                     src_bytebuf.position(src.getIndex());
166                     src_bytebuf.limit(src.getIndex()+length);
167                     
168                     _buf.put(src_bytebuf);
169                     return length;
170                 }
171                 finally
172                 {
173                     _buf.position(0);
174                     src_bytebuf.limit(src_bytebuf.capacity());
175                     src_bytebuf.position(0);
176                 }
177             }
178             else
179                 return super.poke(index,src);
180         }
181     }
182     
183     @Override
184     public int poke(int index, byte[] b, int offset, int length)
185     {
186         if (isReadOnly()) throw new IllegalStateException(__READONLY);
187 
188         if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
189 
190         if (index + length > capacity())
191         {
192             length=capacity()-index;
193             if (length<0)
194                 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
195         }
196 
197         try
198         {
199             _buf.position(index);
200             
201             int space=_buf.remaining();
202             
203             if (length>space)
204                 length=space;
205             if (length>0)
206                 _buf.put(b,offset,length);
207             return length;
208         }
209         finally
210         {
211             _buf.position(0);
212         }
213     }
214     
215     /* ------------------------------------------------------------ */
216     public ByteBuffer getByteBuffer()
217     {
218         return _buf;
219     }
220 
221     /* ------------------------------------------------------------ */
222     @Override
223     public int readFrom(InputStream in, int max) throws IOException
224     {
225         if (_in==null || !_in.isOpen() || in!=_inStream)
226         {
227             _in=Channels.newChannel(in);
228             _inStream=in;
229         }
230 
231         if (max<0 || max>space())
232             max=space();
233         int p = putIndex();
234         
235         try
236         {
237             int len=0, total=0, available=max;
238             int loop=0;
239             while (total<max) 
240             {
241                 _buf.position(p);
242                 _buf.limit(p+available);
243                 len=_in.read(_buf);
244                 if (len<0)
245                 {
246                     _in=null;
247                     _inStream=in;
248                     break;
249                 }
250                 else if (len>0)
251                 {
252                     p += len;
253                     total += len;
254                     available -= len;
255                     setPutIndex(p);
256                     loop=0;
257                 }
258                 else if (loop++>1)
259                     break;
260                 if (in.available()<=0)
261                     break;
262             }
263             if (len<0 && total==0)
264                 return -1;
265             return total;
266             
267         }
268         catch(IOException e)
269         {
270             _in=null;
271             _inStream=in;
272             throw e;
273         }
274         finally
275         {
276             if (_in!=null && !_in.isOpen())
277             {
278                 _in=null;
279                 _inStream=in;
280             }
281             _buf.position(0);
282             _buf.limit(_buf.capacity());
283         }
284     }
285 
286     /* ------------------------------------------------------------ */
287     @Override
288     public void writeTo(OutputStream out) throws IOException
289     {
290         if (_out==null || !_out.isOpen() || out!=_outStream)
291         {
292             _out=Channels.newChannel(out);
293             _outStream=out;
294         }
295 
296         synchronized (_buf)
297         {
298             try
299             {
300                 int loop=0;
301                 while(hasContent() && _out.isOpen())
302                 {
303                     _buf.position(getIndex());
304                     _buf.limit(putIndex());
305                     int len=_out.write(_buf);
306                     if (len<0)
307                         break;
308                     else if (len>0)
309                     {
310                         skip(len);
311                         loop=0;
312                     }
313                     else if (loop++>1)
314                         break;
315                 }
316 
317             }
318             catch(IOException e)
319             {
320                 _out=null;
321                 _outStream=null;
322                 throw e;
323             }
324             finally
325             {
326                 if (_out!=null && !_out.isOpen())
327                 {
328                     _out=null;
329                     _outStream=null;
330                 }
331                 _buf.position(0);
332                 _buf.limit(_buf.capacity());
333             }
334         }
335     }
336 
337     
338     
339 }