1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.File;
17 import java.io.FileInputStream;
18 import java.io.IOException;
19 import java.io.InputStream;
20 import java.io.OutputStream;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.Channels;
23 import java.nio.channels.FileChannel;
24 import java.nio.channels.ReadableByteChannel;
25 import java.nio.channels.WritableByteChannel;
26
27 import org.eclipse.jetty.io.AbstractBuffer;
28 import org.eclipse.jetty.io.Buffer;
29
30
31
32
33
34
35 public class DirectNIOBuffer extends AbstractBuffer implements NIOBuffer
36 {
37 protected final ByteBuffer _buf;
38 private ReadableByteChannel _in;
39 private InputStream _inStream;
40 private WritableByteChannel _out;
41 private OutputStream _outStream;
42
43 public DirectNIOBuffer(int size)
44 {
45 super(READWRITE,NON_VOLATILE);
46 _buf = ByteBuffer.allocateDirect(size);
47 _buf.position(0);
48 _buf.limit(_buf.capacity());
49 }
50
51 public DirectNIOBuffer(ByteBuffer buffer,boolean immutable)
52 {
53 super(immutable?IMMUTABLE:READWRITE,NON_VOLATILE);
54 if (!buffer.isDirect())
55 throw new IllegalArgumentException();
56 _buf = buffer;
57 setGetIndex(buffer.position());
58 setPutIndex(buffer.limit());
59 }
60
61
62
63
64 public DirectNIOBuffer(File file) throws IOException
65 {
66 super(READONLY,NON_VOLATILE);
67 FileInputStream fis = new FileInputStream(file);
68 FileChannel fc = fis.getChannel();
69 _buf = fc.map(FileChannel.MapMode.READ_ONLY, 0, file.length());
70 setGetIndex(0);
71 setPutIndex((int)file.length());
72 _access=IMMUTABLE;
73 }
74
75
76 public boolean isDirect()
77 {
78 return true;
79 }
80
81
82 public byte[] array()
83 {
84 return null;
85 }
86
87
88 public int capacity()
89 {
90 return _buf.capacity();
91 }
92
93
94 public byte peek(int position)
95 {
96 return _buf.get(position);
97 }
98
99 public int peek(int index, byte[] b, int offset, int length)
100 {
101 int l = length;
102 if (index+l > capacity())
103 {
104 l=capacity()-index;
105 if (l==0)
106 return -1;
107 }
108
109 if (l < 0)
110 return -1;
111 try
112 {
113 _buf.position(index);
114 _buf.get(b,offset,l);
115 }
116 finally
117 {
118 _buf.position(0);
119 }
120
121 return l;
122 }
123
124 public void poke(int index, byte b)
125 {
126 if (isReadOnly()) throw new IllegalStateException(__READONLY);
127 if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
128 if (index > capacity())
129 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
130 _buf.put(index,b);
131 }
132
133 public int poke(int index, Buffer src)
134 {
135 if (isReadOnly()) throw new IllegalStateException(__READONLY);
136
137 byte[] array=src.array();
138 if (array!=null)
139 {
140 return poke(index,array,src.getIndex(),src.length());
141 }
142 else
143 {
144 Buffer src_buf=src.buffer();
145 if (src_buf instanceof DirectNIOBuffer)
146 {
147 ByteBuffer src_bytebuf = ((DirectNIOBuffer)src_buf)._buf;
148 if (src_bytebuf==_buf)
149 src_bytebuf=_buf.duplicate();
150 try
151 {
152 _buf.position(index);
153 int space = _buf.remaining();
154
155 int length=src.length();
156 if (length>space)
157 length=space;
158
159 src_bytebuf.position(src.getIndex());
160 src_bytebuf.limit(src.getIndex()+length);
161
162 _buf.put(src_bytebuf);
163 return length;
164 }
165 finally
166 {
167 _buf.position(0);
168 src_bytebuf.limit(src_bytebuf.capacity());
169 src_bytebuf.position(0);
170 }
171 }
172 else
173 return super.poke(index,src);
174 }
175 }
176
177 public int poke(int index, byte[] b, int offset, int length)
178 {
179 if (isReadOnly()) throw new IllegalStateException(__READONLY);
180
181 if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
182
183 if (index + length > capacity())
184 {
185 length=capacity()-index;
186 if (length<0)
187 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
188 }
189
190 try
191 {
192 _buf.position(index);
193
194 int space=_buf.remaining();
195
196 if (length>space)
197 length=space;
198 if (length>0)
199 _buf.put(b,offset,length);
200 return length;
201 }
202 finally
203 {
204 _buf.position(0);
205 }
206 }
207
208
209 public ByteBuffer getByteBuffer()
210 {
211 return _buf;
212 }
213
214
215 public int readFrom(InputStream in, int max) throws IOException
216 {
217 if (_in==null || !_in.isOpen() || in!=_inStream)
218 {
219 _in=Channels.newChannel(in);
220 _inStream=in;
221 }
222
223 if (max<0 || max>space())
224 max=space();
225 int p = putIndex();
226
227 try
228 {
229 int len=0, total=0, available=max;
230 int loop=0;
231 while (total<max)
232 {
233 _buf.position(p);
234 _buf.limit(p+available);
235 len=_in.read(_buf);
236 if (len<0)
237 {
238 _in=null;
239 _inStream=in;
240 break;
241 }
242 else if (len>0)
243 {
244 p += len;
245 total += len;
246 available -= len;
247 setPutIndex(p);
248 loop=0;
249 }
250 else if (loop++>1)
251 break;
252 if (in.available()<=0)
253 break;
254 }
255 if (len<0 && total==0)
256 return -1;
257 return total;
258
259 }
260 catch(IOException e)
261 {
262 _in=null;
263 _inStream=in;
264 throw e;
265 }
266 finally
267 {
268 if (_in!=null && !_in.isOpen())
269 {
270 _in=null;
271 _inStream=in;
272 }
273 _buf.position(0);
274 _buf.limit(_buf.capacity());
275 }
276 }
277
278
279 public void writeTo(OutputStream out) throws IOException
280 {
281 if (_out==null || !_out.isOpen() || out!=_outStream)
282 {
283 _out=Channels.newChannel(out);
284 _outStream=out;
285 }
286
287 synchronized (_buf)
288 {
289 try
290 {
291 int loop=0;
292 while(hasContent() && _out.isOpen())
293 {
294 _buf.position(getIndex());
295 _buf.limit(putIndex());
296 int len=_out.write(_buf);
297 if (len<0)
298 break;
299 else if (len>0)
300 {
301 skip(len);
302 loop=0;
303 }
304 else if (loop++>1)
305 break;
306 }
307
308 }
309 catch(IOException e)
310 {
311 _out=null;
312 _outStream=null;
313 throw e;
314 }
315 finally
316 {
317 if (_out!=null && !_out.isOpen())
318 {
319 _out=null;
320 _outStream=null;
321 }
322 _buf.position(0);
323 _buf.limit(_buf.capacity());
324 }
325 }
326 }
327
328
329
330 }