View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.message;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.nio.ByteBuffer;
24  
25  import org.eclipse.jetty.util.BufferUtil;
26  import org.eclipse.jetty.websocket.common.events.AnnotatedEventDriver;
27  
28  /**
29   * Support class for reading binary message data as an InputStream.
30   */
31  public class MessageInputStream extends InputStream implements MessageAppender
32  {
33      private static final int BUFFER_SIZE = 65535;
34      /**
35       * Threshold (of bytes) to perform compaction at
36       */
37      private static final int COMPACT_THRESHOLD = 5;
38      private final AnnotatedEventDriver driver;
39      private final ByteBuffer buf;
40      private int size;
41      private boolean finished;
42      private boolean needsNotification;
43      private int readPosition;
44  
45      public MessageInputStream(AnnotatedEventDriver driver)
46      {
47          this.driver = driver;
48          this.buf = ByteBuffer.allocate(BUFFER_SIZE);
49          BufferUtil.clearToFill(this.buf);
50          size = 0;
51          readPosition = this.buf.position();
52          finished = false;
53          needsNotification = true;
54      }
55  
56      @Override
57      public void appendMessage(ByteBuffer payload) throws IOException
58      {
59          if (finished)
60          {
61              throw new IOException("Cannot append to finished buffer");
62          }
63  
64          if (payload == null)
65          {
66              // empty payload is valid
67              return;
68          }
69  
70          driver.getPolicy().assertValidMessageSize(size + payload.remaining());
71          size += payload.remaining();
72  
73          synchronized (buf)
74          {
75              // TODO: grow buffer till max binary message size?
76              // TODO: compact this buffer to fit incoming buffer?
77              // TODO: tell connection to suspend if buffer too full?
78              BufferUtil.put(payload,buf);
79          }
80  
81          if (needsNotification)
82          {
83              needsNotification = true;
84              this.driver.onInputStream(this);
85          }
86      }
87  
88      @Override
89      public void close() throws IOException
90      {
91          finished = true;
92          super.close();
93      }
94  
95      @Override
96      public void messageComplete()
97      {
98          finished = true;
99      }
100 
101     @Override
102     public int read() throws IOException
103     {
104         synchronized (buf)
105         {
106             byte b = buf.get(readPosition);
107             readPosition++;
108             if (readPosition <= (buf.limit() - COMPACT_THRESHOLD))
109             {
110                 int curPos = buf.position();
111                 buf.compact();
112                 int offsetPos = buf.position() - curPos;
113                 readPosition += offsetPos;
114             }
115             return b;
116         }
117     }
118 }