View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.message;
20  
21  import java.io.IOException;
22  import java.io.Writer;
23  import java.nio.ByteBuffer;
24  import java.util.concurrent.ExecutionException;
25  
26  import org.eclipse.jetty.io.ByteBufferPool;
27  import org.eclipse.jetty.util.BufferUtil;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  import org.eclipse.jetty.websocket.api.WriteCallback;
31  import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
32  import org.eclipse.jetty.websocket.common.WebSocketSession;
33  import org.eclipse.jetty.websocket.common.frames.TextFrame;
34  import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
35  
36  /**
37   * Support for writing a single WebSocket TEXT message via a {@link Writer}
38   * <p>
39   * Note: Per WebSocket spec, all WebSocket TEXT messages must be encoded in UTF-8
40   */
41  public class MessageWriter extends Writer
42  {
43      private static final Logger LOG = Log.getLogger(MessageWriter.class);
44      private final OutgoingFrames outgoing;
45      private final ByteBufferPool bufferPool;
46      private long frameCount = 0;
47      private TextFrame frame;
48      private ByteBuffer buffer;
49      private Utf8CharBuffer utf;
50      private FutureWriteCallback blocker;
51      private WriteCallback callback;
52      private boolean closed = false;
53  
54      public MessageWriter(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool)
55      {
56          this.outgoing = outgoing;
57          this.bufferPool = bufferPool;
58          this.buffer = bufferPool.acquire(bufferSize,true);
59          BufferUtil.flipToFill(buffer);
60          this.utf = Utf8CharBuffer.wrap(buffer);
61          this.frame = new TextFrame();
62      }
63  
64      public MessageWriter(WebSocketSession session)
65      {
66          this(session.getOutgoingHandler(),session.getPolicy().getMaxTextMessageBufferSize(),session.getBufferPool());
67      }
68  
69      private void assertNotClosed() throws IOException
70      {
71          if (closed)
72          {
73              IOException e = new IOException("Stream is closed");
74              notifyFailure(e);
75              throw e;
76          }
77      }
78  
79      @Override
80      public synchronized void close() throws IOException
81      {
82          assertNotClosed();
83  
84          // finish sending whatever in the buffer with FIN=true
85          flush(true);
86  
87          // close stream
88          closed = true;
89          if (callback != null)
90          {
91              callback.writeSuccess();
92          }
93          bufferPool.release(buffer);
94          LOG.debug("closed (frame count={})",frameCount);
95      }
96  
97      @Override
98      public void flush() throws IOException
99      {
100         assertNotClosed();
101 
102         // flush whatever is in the buffer with FIN=false
103         flush(false);
104     }
105 
106     /**
107      * Flush whatever is in the buffer.
108      * 
109      * @param fin
110      *            fin flag
111      * @throws IOException
112      */
113     private synchronized void flush(boolean fin) throws IOException
114     {
115         ByteBuffer data = utf.getByteBuffer();
116         frame.setPayload(data);
117         frame.setFin(fin);
118 
119         try
120         {
121             blocker = new FutureWriteCallback();
122             outgoing.outgoingFrame(frame,blocker);
123             try
124             {
125                 // block on write
126                 blocker.get();
127                 // write success
128                 // clear utf buffer
129                 utf.clear();
130                 frameCount++;
131                 frame.setIsContinuation();
132             }
133             catch (ExecutionException e)
134             {
135                 Throwable cause = e.getCause();
136                 if (cause != null)
137                 {
138                     if (cause instanceof IOException)
139                     {
140                         throw (IOException)cause;
141                     }
142                     else
143                     {
144                         throw new IOException(cause);
145                     }
146                 }
147                 throw new IOException("Failed to flush",e);
148             }
149             catch (InterruptedException e)
150             {
151                 throw new IOException("Failed to flush",e);
152             }
153         }
154         catch (IOException e)
155         {
156             notifyFailure(e);
157             throw e;
158         }
159     }
160 
161     private void notifyFailure(IOException e)
162     {
163         if (callback != null)
164         {
165             callback.writeFailed(e);
166         }
167     }
168 
169     public void setCallback(WriteCallback callback)
170     {
171         this.callback = callback;
172     }
173 
174     @Override
175     public void write(char[] cbuf) throws IOException
176     {
177         try
178         {
179             this.write(cbuf,0,cbuf.length);
180         }
181         catch (IOException e)
182         {
183             notifyFailure(e);
184             throw e;
185         }
186     }
187 
188     @Override
189     public void write(char[] cbuf, int off, int len) throws IOException
190     {
191         assertNotClosed();
192         int left = len; // bytes left to write
193         int offset = off; // offset within provided array
194         while (left > 0)
195         {
196             int space = utf.remaining();
197             int size = Math.min(space,left);
198             assert (space > 0);
199             assert (size > 0);
200             utf.append(cbuf,offset,size); // append with utf logic
201             left -= size; // decrement char left
202             if (left > 0)
203             {
204                 flush(false);
205             }
206             offset += size; // increment offset
207         }
208     }
209 
210     @Override
211     public void write(int c) throws IOException
212     {
213         assertNotClosed();
214 
215         // buffer up to limit, flush once buffer reached.
216         utf.append(c); // append with utf logic
217         if (utf.remaining() <= 0)
218         {
219             flush(false);
220         }
221     }
222 }