View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.message;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.nio.ByteBuffer;
24  import java.util.concurrent.BlockingDeque;
25  import java.util.concurrent.LinkedBlockingDeque;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  
28  import org.eclipse.jetty.util.BufferUtil;
29  import org.eclipse.jetty.util.log.Log;
30  import org.eclipse.jetty.util.log.Logger;
31  import org.eclipse.jetty.websocket.common.LogicalConnection;
32  
33  /**
34   * Support class for reading a (single) WebSocket BINARY message via a InputStream.
35   * <p>
36   * An InputStream that can access a queue of ByteBuffer payloads, along with expected InputStream blocking behavior.
37   */
38  public class MessageInputStream extends InputStream implements MessageAppender
39  {
40      private static final Logger LOG = Log.getLogger(MessageInputStream.class);
41      /**
42       * Used for controlling read suspend/resume behavior if the queue is full, but the read operations haven't caught up yet.
43       */
44      @SuppressWarnings("unused")
45      private final LogicalConnection connection;
46      private final BlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
47      private AtomicBoolean closed = new AtomicBoolean(false);
48      // EOB / End of Buffers
49      private AtomicBoolean buffersExhausted = new AtomicBoolean(false);
50      private ByteBuffer activeBuffer = null;
51  
52      public MessageInputStream(LogicalConnection connection)
53      {
54          this.connection = connection;
55      }
56  
57      @Override
58      public void appendMessage(ByteBuffer payload, boolean isLast) throws IOException
59      {
60          if (LOG.isDebugEnabled())
61          {
62              LOG.debug("appendMessage(ByteBuffer,{}): {}",isLast,BufferUtil.toDetailString(payload));
63          }
64          
65          if (buffersExhausted.get())
66          {
67              // This indicates a programming mistake/error and must be bug fixed
68              throw new RuntimeException("Last frame already received");
69          }
70  
71          // if closed, we should just toss incoming payloads into the bit bucket.
72          if (closed.get())
73          {
74              return;
75          }
76  
77          // Put the payload into the queue
78          try
79          {
80              buffers.put(payload);
81              if (isLast)
82              {
83                  buffersExhausted.set(true);
84              }
85          }
86          catch (InterruptedException e)
87          {
88              throw new IOException(e);
89          }
90      }
91  
92      @Override
93      public void close() throws IOException
94      {
95          closed.set(true);
96          super.close();
97      }
98  
99      @Override
100     public synchronized void mark(int readlimit)
101     {
102         /* do nothing */
103     }
104 
105     @Override
106     public boolean markSupported()
107     {
108         return false;
109     }
110 
111     @Override
112     public void messageComplete()
113     {
114         LOG.debug("messageComplete()");
115         
116         buffersExhausted.set(true);
117         // toss an empty ByteBuffer into queue to let it drain
118         try
119         {
120             buffers.put(ByteBuffer.wrap(new byte[0]));
121         }
122         catch (InterruptedException ignore)
123         {
124             /* ignore */
125         }
126     }
127 
128     @Override
129     public int read() throws IOException
130     {
131         LOG.debug("read()");
132         
133         try
134         {
135             if (closed.get())
136             {
137                 return -1;
138             }
139 
140             if (activeBuffer == null)
141             {
142                 activeBuffer = buffers.take();
143             }
144 
145             while (activeBuffer.remaining() <= 0)
146             {
147                 if (buffersExhausted.get())
148                 {
149                     closed.set(true);
150                     return -1;
151                 }
152                 activeBuffer = buffers.take();
153             }
154 
155             return activeBuffer.get();
156         }
157         catch (InterruptedException e)
158         {
159             LOG.warn(e);
160             closed.set(true);
161             return -1;
162 //            throw new IOException(e);
163         }
164     }
165 
166     @Override
167     public synchronized void reset() throws IOException
168     {
169         throw new IOException("reset() not supported");
170     }
171 }