View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.message;
20  
21  import java.io.IOException;
22  import java.io.Writer;
23  import java.nio.ByteBuffer;
24  
25  import org.eclipse.jetty.io.ByteBufferPool;
26  import org.eclipse.jetty.util.BufferUtil;
27  import org.eclipse.jetty.util.log.Log;
28  import org.eclipse.jetty.util.log.Logger;
29  import org.eclipse.jetty.websocket.api.BatchMode;
30  import org.eclipse.jetty.websocket.api.WriteCallback;
31  import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
32  import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
33  import org.eclipse.jetty.websocket.common.WebSocketSession;
34  import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
35  import org.eclipse.jetty.websocket.common.frames.TextFrame;
36  
37  /**
38   * Support for writing a single WebSocket TEXT message via a {@link Writer}
39   * <p/>
40   * Note: Per WebSocket spec, all WebSocket TEXT messages must be encoded in UTF-8
41   */
42  public class MessageWriter extends Writer
43  {
44      private static final Logger LOG = Log.getLogger(MessageWriter.class);
45  
46      private final OutgoingFrames outgoing;
47      private final ByteBufferPool bufferPool;
48      private final BlockingWriteCallback blocker;
49      private long frameCount;
50      private TextFrame frame;
51      private ByteBuffer buffer;
52      private Utf8CharBuffer utf;
53      private WriteCallback callback;
54      private boolean closed;
55  
56      public MessageWriter(WebSocketSession session)
57      {
58          this(session.getOutgoingHandler(), session.getPolicy().getMaxTextMessageBufferSize(), session.getBufferPool());
59      }
60  
61      public MessageWriter(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool)
62      {
63          this.outgoing = outgoing;
64          this.bufferPool = bufferPool;
65          this.blocker = new BlockingWriteCallback();
66          this.buffer = bufferPool.acquire(bufferSize, true);
67          BufferUtil.flipToFill(buffer);
68          this.frame = new TextFrame();
69          this.utf = Utf8CharBuffer.wrap(buffer);
70      }
71  
72      @Override
73      public void write(char[] chars, int off, int len) throws IOException
74      {
75          try
76          {
77              send(chars, off, len);
78          }
79          catch (Throwable x)
80          {
81              // Notify without holding locks.
82              notifyFailure(x);
83              throw x;
84          }
85      }
86  
87      @Override
88      public void write(int c) throws IOException
89      {
90          try
91          {
92              send(new char[]{(char)c}, 0, 1);
93          }
94          catch (Throwable x)
95          {
96              // Notify without holding locks.
97              notifyFailure(x);
98              throw x;
99          }
100     }
101 
102     @Override
103     public void flush() throws IOException
104     {
105         try
106         {
107             flush(false);
108         }
109         catch (Throwable x)
110         {
111             // Notify without holding locks.
112             notifyFailure(x);
113             throw x;
114         }
115     }
116 
117     @Override
118     public void close() throws IOException
119     {
120         try
121         {
122             flush(true);
123             bufferPool.release(buffer);
124             LOG.debug("Stream closed, {} frames sent", frameCount);
125             // Notify without holding locks.
126             notifySuccess();
127         }
128         catch (Throwable x)
129         {
130             // Notify without holding locks.
131             notifyFailure(x);
132             throw x;
133         }
134     }
135 
136     private void flush(boolean fin) throws IOException
137     {
138         synchronized (this)
139         {
140             if (closed)
141                 throw new IOException("Stream is closed");
142 
143             closed = fin;
144 
145             ByteBuffer data = utf.getByteBuffer();
146             LOG.debug("flush({}): {}", fin, BufferUtil.toDetailString(buffer));
147             frame.setPayload(data);
148             frame.setFin(fin);
149 
150             try (WriteBlocker b = blocker.acquireWriteBlocker())
151             {
152                 outgoing.outgoingFrame(frame, b, BatchMode.OFF);
153                 b.block();
154             }
155 
156             ++frameCount;
157             // Any flush after the first will be a CONTINUATION frame.
158             frame.setIsContinuation();
159 
160             utf.clear();
161         }
162     }
163 
164     private void send(char[] chars, int offset, int length) throws IOException
165     {
166         synchronized (this)
167         {
168             if (closed)
169                 throw new IOException("Stream is closed");
170 
171             while (length > 0)
172             {
173                 // There may be no space available, we want
174                 // to handle correctly when space == 0.
175                 int space = utf.remaining();
176                 int size = Math.min(space, length);
177                 utf.append(chars, offset, size);
178                 offset += size;
179                 length -= size;
180                 if (length > 0)
181                 {
182                     // If we could not write everything, it means
183                     // that the buffer was full, so flush it.
184                     flush(false);
185                 }
186             }
187         }
188     }
189 
190     public void setCallback(WriteCallback callback)
191     {
192         synchronized (this)
193         {
194             this.callback = callback;
195         }
196     }
197 
198     private void notifySuccess()
199     {
200         WriteCallback callback;
201         synchronized (this)
202         {
203             callback = this.callback;
204         }
205         if (callback != null)
206         {
207             callback.writeSuccess();
208         }
209     }
210 
211     private void notifyFailure(Throwable failure)
212     {
213         WriteCallback callback;
214         synchronized (this)
215         {
216             callback = this.callback;
217         }
218         if (callback != null)
219         {
220             callback.writeFailed(failure);
221         }
222     }
223 }