1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.ClosedChannelException;
26 import java.nio.charset.Charset;
27 import java.nio.charset.StandardCharsets;
28 import java.util.Queue;
29
30 import org.eclipse.jetty.util.ArrayQueue;
31 import org.eclipse.jetty.util.BufferUtil;
32 import org.eclipse.jetty.util.log.Log;
33 import org.eclipse.jetty.util.log.Logger;
34 import org.eclipse.jetty.util.thread.Locker;
35 import org.eclipse.jetty.util.thread.Scheduler;
36
37
38
39
40
41
42 public class ByteArrayEndPoint extends AbstractEndPoint
43 {
44 static final Logger LOG = Log.getLogger(ByteArrayEndPoint.class);
45 public final static InetSocketAddress NOIP=new InetSocketAddress(0);
46 private static final ByteBuffer EOF = BufferUtil.allocate(0);
47
48 private final Runnable _runFillable = new Runnable()
49 {
50 @Override
51 public void run()
52 {
53 getFillInterest().fillable();
54 }
55 };
56
57 private final Locker _locker = new Locker();
58 private final Queue<ByteBuffer> _inQ = new ArrayQueue<>();
59 private ByteBuffer _out;
60 private boolean _ishut;
61 private boolean _oshut;
62 private boolean _closed;
63 private boolean _growOutput;
64
65
66
67
68
69 public ByteArrayEndPoint()
70 {
71 this(null,0,null,null);
72 }
73
74
75
76
77
78
79 public ByteArrayEndPoint(byte[] input, int outputSize)
80 {
81 this(null,0,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize));
82 }
83
84
85
86
87
88
89 public ByteArrayEndPoint(String input, int outputSize)
90 {
91 this(null,0,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize));
92 }
93
94
95 public ByteArrayEndPoint(Scheduler scheduler, long idleTimeoutMs)
96 {
97 this(scheduler,idleTimeoutMs,null,null);
98 }
99
100
101 public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, byte[] input, int outputSize)
102 {
103 this(timer,idleTimeoutMs,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize));
104 }
105
106
107 public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, String input, int outputSize)
108 {
109 this(timer,idleTimeoutMs,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize));
110 }
111
112
113 public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, ByteBuffer input, ByteBuffer output)
114 {
115 super(timer,NOIP,NOIP);
116 if (BufferUtil.hasContent(input))
117 addInput(input);
118 _out=output==null?BufferUtil.allocate(1024):output;
119 setIdleTimeout(idleTimeoutMs);
120 }
121
122
123 @Override
124 protected void onIncompleteFlush()
125 {
126
127 }
128
129
130 protected void execute(Runnable task)
131 {
132 new Thread(task,"BAEPoint-"+Integer.toHexString(hashCode())).start();
133 }
134
135
136 @Override
137 protected void needsFillInterest() throws IOException
138 {
139 try(Locker.Lock lock = _locker.lock())
140 {
141 if (_closed)
142 throw new ClosedChannelException();
143
144 ByteBuffer in = _inQ.peek();
145 if (BufferUtil.hasContent(in) || in==EOF)
146 execute(_runFillable);
147 }
148 }
149
150
151
152
153 public void addInputEOF()
154 {
155 addInput((ByteBuffer)null);
156 }
157
158
159
160
161
162 public void addInput(ByteBuffer in)
163 {
164 boolean fillable=false;
165 try(Locker.Lock lock = _locker.lock())
166 {
167 if (_inQ.peek()==EOF)
168 throw new RuntimeIOException(new EOFException());
169 boolean was_empty=_inQ.isEmpty();
170 if (in==null)
171 {
172 _inQ.add(EOF);
173 fillable=true;
174 }
175 if (BufferUtil.hasContent(in))
176 {
177 _inQ.add(in);
178 fillable=was_empty;
179 }
180 }
181 if (fillable)
182 _runFillable.run();
183 }
184
185 public void addInputAndExecute(ByteBuffer in)
186 {
187 boolean fillable=false;
188 try(Locker.Lock lock = _locker.lock())
189 {
190 if (_inQ.peek()==EOF)
191 throw new RuntimeIOException(new EOFException());
192 boolean was_empty=_inQ.isEmpty();
193 if (in==null)
194 {
195 _inQ.add(EOF);
196 fillable=true;
197 }
198 if (BufferUtil.hasContent(in))
199 {
200 _inQ.add(in);
201 fillable=was_empty;
202 }
203 }
204 if (fillable)
205 execute(_runFillable);
206 }
207
208
209 public void addInput(String s)
210 {
211 addInput(BufferUtil.toBuffer(s,StandardCharsets.UTF_8));
212 }
213
214
215 public void addInput(String s,Charset charset)
216 {
217 addInput(BufferUtil.toBuffer(s,charset));
218 }
219
220
221
222
223
224 public ByteBuffer getOutput()
225 {
226 return _out;
227 }
228
229
230
231
232
233 public String getOutputString()
234 {
235 return getOutputString(StandardCharsets.UTF_8);
236 }
237
238
239
240
241
242
243 public String getOutputString(Charset charset)
244 {
245 return BufferUtil.toString(_out,charset);
246 }
247
248
249
250
251
252 public ByteBuffer takeOutput()
253 {
254 ByteBuffer b=_out;
255 _out=BufferUtil.allocate(b.capacity());
256 getWriteFlusher().completeWrite();
257 return b;
258 }
259
260
261
262
263
264 public String takeOutputString()
265 {
266 return takeOutputString(StandardCharsets.UTF_8);
267 }
268
269
270
271
272
273
274 public String takeOutputString(Charset charset)
275 {
276 ByteBuffer buffer=takeOutput();
277 return BufferUtil.toString(buffer,charset);
278 }
279
280
281
282
283
284 public void setOutput(ByteBuffer out)
285 {
286 _out = out;
287 getWriteFlusher().completeWrite();
288 }
289
290
291
292
293
294 @Override
295 public boolean isOpen()
296 {
297 try(Locker.Lock lock = _locker.lock())
298 {
299 return !_closed;
300 }
301 }
302
303
304
305
306 @Override
307 public boolean isInputShutdown()
308 {
309 try(Locker.Lock lock = _locker.lock())
310 {
311 return _ishut||_closed;
312 }
313 }
314
315
316
317
318 @Override
319 public boolean isOutputShutdown()
320 {
321 try(Locker.Lock lock = _locker.lock())
322 {
323 return _oshut||_closed;
324 }
325 }
326
327
328 public void shutdownInput()
329 {
330 boolean close=false;
331 try(Locker.Lock lock = _locker.lock())
332 {
333 _ishut=true;
334 if (_oshut && !_closed)
335 close=_closed=true;
336 }
337 if (close)
338 super.close();
339 }
340
341
342
343
344
345 @Override
346 public void shutdownOutput()
347 {
348 boolean close=false;
349 try(Locker.Lock lock = _locker.lock())
350 {
351 _oshut=true;
352 if (_ishut && !_closed)
353 close=_closed=true;
354 }
355 if (close)
356 super.close();
357 }
358
359
360
361
362
363 @Override
364 public void close()
365 {
366 boolean close=false;
367 try(Locker.Lock lock = _locker.lock())
368 {
369 if (!_closed)
370 close=_closed=_ishut=_oshut=true;
371 }
372 if (close)
373 super.close();
374 }
375
376
377
378
379
380 public boolean hasMore()
381 {
382 return getOutput().position()>0;
383 }
384
385
386
387
388
389 @Override
390 public int fill(ByteBuffer buffer) throws IOException
391 {
392 int filled=0;
393 boolean close=false;
394 try(Locker.Lock lock = _locker.lock())
395 {
396 while(true)
397 {
398 if (_closed)
399 throw new EofException("CLOSED");
400
401 if (_ishut)
402 return -1;
403
404 if (_inQ.isEmpty())
405 break;
406
407 ByteBuffer in= _inQ.peek();
408 if (in==EOF)
409 {
410 _ishut=true;
411 if (_oshut)
412 close=_closed=true;
413 filled=-1;
414 break;
415 }
416
417 if (BufferUtil.hasContent(in))
418 {
419 filled=BufferUtil.append(buffer,in);
420 if (BufferUtil.isEmpty(in))
421 _inQ.poll();
422 break;
423 }
424 _inQ.poll();
425 }
426 }
427
428 if (close)
429 super.close();
430 if (filled>0)
431 notIdle();
432 return filled;
433 }
434
435
436
437
438
439 @Override
440 public boolean flush(ByteBuffer... buffers) throws IOException
441 {
442 if (_closed)
443 throw new IOException("CLOSED");
444 if (_oshut)
445 throw new IOException("OSHUT");
446
447 boolean flushed=true;
448 boolean idle=true;
449
450 for (ByteBuffer b : buffers)
451 {
452 if (BufferUtil.hasContent(b))
453 {
454 if (_growOutput && b.remaining()>BufferUtil.space(_out))
455 {
456 BufferUtil.compact(_out);
457 if (b.remaining()>BufferUtil.space(_out))
458 {
459 ByteBuffer n = BufferUtil.allocate(_out.capacity()+b.remaining()*2);
460 BufferUtil.append(n,_out);
461 _out=n;
462 }
463 }
464
465 if (BufferUtil.append(_out,b)>0)
466 idle=false;
467
468 if (BufferUtil.hasContent(b))
469 {
470 flushed=false;
471 break;
472 }
473 }
474 }
475 if (!idle)
476 notIdle();
477 return flushed;
478 }
479
480
481
482
483
484 public void reset()
485 {
486 getFillInterest().onClose();
487 getWriteFlusher().onClose();
488 _ishut=false;
489 _oshut=false;
490 _closed=false;
491 _inQ.clear();
492 BufferUtil.clear(_out);
493 }
494
495
496
497
498
499 @Override
500 public Object getTransport()
501 {
502 return null;
503 }
504
505
506
507
508
509 public boolean isGrowOutput()
510 {
511 return _growOutput;
512 }
513
514
515
516
517
518 public void setGrowOutput(boolean growOutput)
519 {
520 _growOutput=growOutput;
521 }
522
523
524 }