View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.ClosedChannelException;
26  import java.nio.charset.Charset;
27  import java.nio.charset.StandardCharsets;
28  import java.util.Queue;
29  
30  import org.eclipse.jetty.util.ArrayQueue;
31  import org.eclipse.jetty.util.BufferUtil;
32  import org.eclipse.jetty.util.log.Log;
33  import org.eclipse.jetty.util.log.Logger;
34  import org.eclipse.jetty.util.thread.Locker;
35  import org.eclipse.jetty.util.thread.Scheduler;
36  
37  
38  /* ------------------------------------------------------------ */
39  /** ByteArrayEndPoint.
40   *
41   */
42  public class ByteArrayEndPoint extends AbstractEndPoint
43  {
44      static final Logger LOG = Log.getLogger(ByteArrayEndPoint.class);
45      public final static InetSocketAddress NOIP=new InetSocketAddress(0);
46      private static final ByteBuffer EOF = BufferUtil.allocate(0);
47  
48      private final Runnable _runFillable = new Runnable()
49      {
50          @Override
51          public void run()
52          {
53              getFillInterest().fillable();
54          }
55      };
56  
57      private final Locker _locker = new Locker();
58      private final Queue<ByteBuffer> _inQ = new ArrayQueue<>();
59      private ByteBuffer _out;
60      private boolean _ishut;
61      private boolean _oshut;
62      private boolean _closed;
63      private boolean _growOutput;
64  
65      /* ------------------------------------------------------------ */
66      /**
67       *
68       */
69      public ByteArrayEndPoint()
70      {
71          this(null,0,null,null);
72      }
73  
74      /* ------------------------------------------------------------ */
75      /**
76       * @param input the input bytes
77       * @param outputSize the output size
78       */
79      public ByteArrayEndPoint(byte[] input, int outputSize)
80      {
81          this(null,0,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize));
82      }
83  
84      /* ------------------------------------------------------------ */
85      /**
86       * @param input the input string (converted to bytes using default encoding charset)
87       * @param outputSize the output size
88       */
89      public ByteArrayEndPoint(String input, int outputSize)
90      {
91          this(null,0,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize));
92      }
93  
94      /* ------------------------------------------------------------ */
95      public ByteArrayEndPoint(Scheduler scheduler, long idleTimeoutMs)
96      {
97          this(scheduler,idleTimeoutMs,null,null);
98      }
99  
100     /* ------------------------------------------------------------ */
101     public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, byte[] input, int outputSize)
102     {
103         this(timer,idleTimeoutMs,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize));
104     }
105 
106     /* ------------------------------------------------------------ */
107     public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, String input, int outputSize)
108     {
109         this(timer,idleTimeoutMs,input!=null?BufferUtil.toBuffer(input):null,BufferUtil.allocate(outputSize));
110     }
111 
112     /* ------------------------------------------------------------ */
113     public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, ByteBuffer input, ByteBuffer output)
114     {
115         super(timer,NOIP,NOIP);
116         if (BufferUtil.hasContent(input))
117             addInput(input);
118         _out=output==null?BufferUtil.allocate(1024):output;
119         setIdleTimeout(idleTimeoutMs);
120     }
121 
122     /* ------------------------------------------------------------ */
123     @Override
124     protected void onIncompleteFlush()
125     {
126         // Don't need to do anything here as takeOutput does the signalling.
127     }
128 
129     /* ------------------------------------------------------------ */
130     protected void execute(Runnable task)
131     {
132         new Thread(task,"BAEPoint-"+Integer.toHexString(hashCode())).start();
133     }
134 
135     /* ------------------------------------------------------------ */
136     @Override
137     protected void needsFillInterest() throws IOException
138     {
139         try(Locker.Lock lock = _locker.lock())
140         {
141             if (_closed)
142                 throw new ClosedChannelException();
143 
144             ByteBuffer in = _inQ.peek();
145             if (BufferUtil.hasContent(in) || in==EOF)
146                 execute(_runFillable);
147         }
148     }
149 
150     /* ------------------------------------------------------------ */
151     /**
152      */
153     public void addInputEOF()
154     {
155         addInput((ByteBuffer)null);
156     }
157 
158     /* ------------------------------------------------------------ */
159     /**
160      * @param in The in to set.
161      */
162     public void addInput(ByteBuffer in)
163     {
164         boolean fillable=false;
165         try(Locker.Lock lock = _locker.lock())
166         {
167             if (_inQ.peek()==EOF)
168                 throw new RuntimeIOException(new EOFException());
169             boolean was_empty=_inQ.isEmpty();
170             if (in==null)
171             {
172                 _inQ.add(EOF);
173                 fillable=true;
174             }
175             if (BufferUtil.hasContent(in))
176             {
177                 _inQ.add(in);
178                 fillable=was_empty;
179             }
180         }
181         if (fillable)
182             _runFillable.run();
183     }
184 
185     public void addInputAndExecute(ByteBuffer in)
186     {
187         boolean fillable=false;
188         try(Locker.Lock lock = _locker.lock())
189         {
190             if (_inQ.peek()==EOF)
191                 throw new RuntimeIOException(new EOFException());
192             boolean was_empty=_inQ.isEmpty();
193             if (in==null)
194             {
195                 _inQ.add(EOF);
196                 fillable=true;
197             }
198             if (BufferUtil.hasContent(in))
199             {
200                 _inQ.add(in);
201                 fillable=was_empty;
202             }
203         }
204         if (fillable)
205             execute(_runFillable);
206     }
207 
208     /* ------------------------------------------------------------ */
209     public void addInput(String s)
210     {
211         addInput(BufferUtil.toBuffer(s,StandardCharsets.UTF_8));
212     }
213 
214     /* ------------------------------------------------------------ */
215     public void addInput(String s,Charset charset)
216     {
217         addInput(BufferUtil.toBuffer(s,charset));
218     }
219 
220     /* ------------------------------------------------------------ */
221     /**
222      * @return Returns the out.
223      */
224     public ByteBuffer getOutput()
225     {
226         return _out;
227     }
228 
229     /* ------------------------------------------------------------ */
230     /**
231      * @return Returns the out.
232      */
233     public String getOutputString()
234     {
235         return getOutputString(StandardCharsets.UTF_8);
236     }
237 
238     /* ------------------------------------------------------------ */
239     /**
240      * @param charset the charset to encode the output as
241      * @return Returns the out.
242      */
243     public String getOutputString(Charset charset)
244     {
245         return BufferUtil.toString(_out,charset);
246     }
247 
248     /* ------------------------------------------------------------ */
249     /**
250      * @return Returns the out.
251      */
252     public ByteBuffer takeOutput()
253     {
254         ByteBuffer b=_out;
255         _out=BufferUtil.allocate(b.capacity());
256         getWriteFlusher().completeWrite();
257         return b;
258     }
259 
260     /* ------------------------------------------------------------ */
261     /**
262      * @return Returns the out.
263      */
264     public String takeOutputString()
265     {
266         return takeOutputString(StandardCharsets.UTF_8);
267     }
268 
269     /* ------------------------------------------------------------ */
270     /**
271      * @param charset the charset to encode the output as
272      * @return Returns the out.
273      */
274     public String takeOutputString(Charset charset)
275     {
276         ByteBuffer buffer=takeOutput();
277         return BufferUtil.toString(buffer,charset);
278     }
279 
280     /* ------------------------------------------------------------ */
281     /**
282      * @param out The out to set.
283      */
284     public void setOutput(ByteBuffer out)
285     {
286         _out = out;
287         getWriteFlusher().completeWrite();
288     }
289 
290     /* ------------------------------------------------------------ */
291     /*
292      * @see org.eclipse.io.EndPoint#isOpen()
293      */
294     @Override
295     public boolean isOpen()
296     {
297         try(Locker.Lock lock = _locker.lock())
298         {
299             return !_closed;
300         }
301     }
302 
303     /* ------------------------------------------------------------ */
304     /*
305      */
306     @Override
307     public boolean isInputShutdown()
308     {
309         try(Locker.Lock lock = _locker.lock())
310         {
311             return _ishut||_closed;
312         }
313     }
314 
315     /* ------------------------------------------------------------ */
316     /*
317      */
318     @Override
319     public boolean isOutputShutdown()
320     {
321         try(Locker.Lock lock = _locker.lock())
322         {
323             return _oshut||_closed;
324         }
325     }
326 
327     /* ------------------------------------------------------------ */
328     public void shutdownInput()
329     {
330         boolean close=false;
331         try(Locker.Lock lock = _locker.lock())
332         {
333             _ishut=true;
334             if (_oshut && !_closed)
335                 close=_closed=true;
336         }
337         if (close)
338             super.close();
339     }
340 
341     /* ------------------------------------------------------------ */
342     /*
343      * @see org.eclipse.io.EndPoint#shutdownOutput()
344      */
345     @Override
346     public void shutdownOutput()
347     {
348         boolean close=false;
349         try(Locker.Lock lock = _locker.lock())
350         {
351             _oshut=true;
352             if (_ishut && !_closed)
353                 close=_closed=true;
354         }
355         if (close)
356             super.close();
357     }
358 
359     /* ------------------------------------------------------------ */
360     /*
361      * @see org.eclipse.io.EndPoint#close()
362      */
363     @Override
364     public void close()
365     {
366         boolean close=false;
367         try(Locker.Lock lock = _locker.lock())
368         {
369             if (!_closed)
370                 close=_closed=_ishut=_oshut=true;
371         }
372         if (close)
373             super.close();
374     }
375 
376     /* ------------------------------------------------------------ */
377     /**
378      * @return <code>true</code> if there are bytes remaining to be read from the encoded input
379      */
380     public boolean hasMore()
381     {
382         return getOutput().position()>0;
383     }
384 
385     /* ------------------------------------------------------------ */
386     /*
387      * @see org.eclipse.io.EndPoint#fill(org.eclipse.io.Buffer)
388      */
389     @Override
390     public int fill(ByteBuffer buffer) throws IOException
391     {
392         int filled=0;
393         boolean close=false;
394         try(Locker.Lock lock = _locker.lock())
395         {
396             while(true)
397             {
398                 if (_closed)
399                     throw new EofException("CLOSED");
400 
401                 if (_ishut)
402                     return -1;
403 
404                 if (_inQ.isEmpty())
405                     break;
406 
407                 ByteBuffer in= _inQ.peek();
408                 if (in==EOF)
409                 {
410                     _ishut=true;
411                     if (_oshut)
412                         close=_closed=true;
413                     filled=-1;
414                     break;
415                 }
416 
417                 if (BufferUtil.hasContent(in))
418                 {
419                     filled=BufferUtil.append(buffer,in);
420                     if (BufferUtil.isEmpty(in))
421                         _inQ.poll();
422                     break;
423                 }
424                 _inQ.poll();
425             }
426         }
427 
428         if (close)
429             super.close();
430         if (filled>0)
431             notIdle();
432         return filled;
433     }
434 
435     /* ------------------------------------------------------------ */
436     /*
437      * @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer)
438      */
439     @Override
440     public boolean flush(ByteBuffer... buffers) throws IOException
441     {
442         if (_closed)
443             throw new IOException("CLOSED");
444         if (_oshut)
445             throw new IOException("OSHUT");
446 
447         boolean flushed=true;
448         boolean idle=true;
449 
450         for (ByteBuffer b : buffers)
451         {
452             if (BufferUtil.hasContent(b))
453             {
454                 if (_growOutput && b.remaining()>BufferUtil.space(_out))
455                 {
456                     BufferUtil.compact(_out);
457                     if (b.remaining()>BufferUtil.space(_out))
458                     {
459                         ByteBuffer n = BufferUtil.allocate(_out.capacity()+b.remaining()*2);
460                         BufferUtil.append(n,_out);
461                         _out=n;
462                     }
463                 }
464 
465                 if (BufferUtil.append(_out,b)>0)
466                     idle=false;
467 
468                 if (BufferUtil.hasContent(b))
469                 {
470                     flushed=false;
471                     break;
472                 }
473             }
474         }
475         if (!idle)
476             notIdle();
477         return flushed;
478     }
479 
480     /* ------------------------------------------------------------ */
481     /**
482      *
483      */
484     public void reset()
485     {
486         getFillInterest().onClose();
487         getWriteFlusher().onClose();
488         _ishut=false;
489         _oshut=false;
490         _closed=false;
491         _inQ.clear();
492         BufferUtil.clear(_out);
493     }
494 
495     /* ------------------------------------------------------------ */
496     /*
497      * @see org.eclipse.io.EndPoint#getConnection()
498      */
499     @Override
500     public Object getTransport()
501     {
502         return null;
503     }
504 
505     /* ------------------------------------------------------------ */
506     /**
507      * @return the growOutput
508      */
509     public boolean isGrowOutput()
510     {
511         return _growOutput;
512     }
513 
514     /* ------------------------------------------------------------ */
515     /**
516      * @param growOutput the growOutput to set
517      */
518     public void setGrowOutput(boolean growOutput)
519     {
520         _growOutput=growOutput;
521     }
522 
523 
524 }