View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.message;
20  
21  import java.io.IOException;
22  import java.io.Writer;
23  import java.nio.ByteBuffer;
24  
25  import org.eclipse.jetty.io.ByteBufferPool;
26  import org.eclipse.jetty.util.BufferUtil;
27  import org.eclipse.jetty.util.log.Log;
28  import org.eclipse.jetty.util.log.Logger;
29  import org.eclipse.jetty.websocket.api.BatchMode;
30  import org.eclipse.jetty.websocket.api.WriteCallback;
31  import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
32  import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
33  import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
34  import org.eclipse.jetty.websocket.common.WebSocketSession;
35  import org.eclipse.jetty.websocket.common.frames.TextFrame;
36  
37  /**
38   * Support for writing a single WebSocket TEXT message via a {@link Writer}
39   * <p>
40   * Note: Per WebSocket spec, all WebSocket TEXT messages must be encoded in UTF-8
41   */
42  public class MessageWriter extends Writer
43  {
44      private static final Logger LOG = Log.getLogger(MessageWriter.class);
45  
46      private final OutgoingFrames outgoing;
47      private final ByteBufferPool bufferPool;
48      private final BlockingWriteCallback blocker;
49      private long frameCount;
50      private TextFrame frame;
51      private ByteBuffer buffer;
52      private Utf8CharBuffer utf;
53      private WriteCallback callback;
54      private boolean closed;
55  
56      public MessageWriter(WebSocketSession session)
57      {
58          this(session.getOutgoingHandler(), session.getPolicy().getMaxTextMessageBufferSize(), session.getBufferPool());
59      }
60  
61      public MessageWriter(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool)
62      {
63          this.outgoing = outgoing;
64          this.bufferPool = bufferPool;
65          this.blocker = new BlockingWriteCallback();
66          this.buffer = bufferPool.acquire(bufferSize, true);
67          BufferUtil.flipToFill(buffer);
68          this.frame = new TextFrame();
69          this.utf = Utf8CharBuffer.wrap(buffer);
70      }
71  
72      @Override
73      public void write(char[] chars, int off, int len) throws IOException
74      {
75          try
76          {
77              send(chars, off, len);
78          }
79          catch (Throwable x)
80          {
81              // Notify without holding locks.
82              notifyFailure(x);
83              throw x;
84          }
85      }
86  
87      @Override
88      public void write(int c) throws IOException
89      {
90          try
91          {
92              send(new char[]{(char)c}, 0, 1);
93          }
94          catch (Throwable x)
95          {
96              // Notify without holding locks.
97              notifyFailure(x);
98              throw x;
99          }
100     }
101 
102     @Override
103     public void flush() throws IOException
104     {
105         try
106         {
107             flush(false);
108         }
109         catch (Throwable x)
110         {
111             // Notify without holding locks.
112             notifyFailure(x);
113             throw x;
114         }
115     }
116 
117     @Override
118     public void close() throws IOException
119     {
120         try
121         {
122             flush(true);
123             bufferPool.release(buffer);
124             if (LOG.isDebugEnabled())
125                 LOG.debug("Stream closed, {} frames sent", frameCount);
126             // Notify without holding locks.
127             notifySuccess();
128         }
129         catch (Throwable x)
130         {
131             // Notify without holding locks.
132             notifyFailure(x);
133             throw x;
134         }
135     }
136 
137     private void flush(boolean fin) throws IOException
138     {
139         synchronized (this)
140         {
141             if (closed)
142                 throw new IOException("Stream is closed");
143 
144             closed = fin;
145 
146             ByteBuffer data = utf.getByteBuffer();
147             if (LOG.isDebugEnabled())
148                 LOG.debug("flush({}): {}", fin, BufferUtil.toDetailString(buffer));
149             frame.setPayload(data);
150             frame.setFin(fin);
151 
152             try (WriteBlocker b = blocker.acquireWriteBlocker())
153             {
154                 outgoing.outgoingFrame(frame, b, BatchMode.OFF);
155                 b.block();
156             }
157 
158             ++frameCount;
159             // Any flush after the first will be a CONTINUATION frame.
160             frame.setIsContinuation();
161 
162             utf.clear();
163         }
164     }
165 
166     private void send(char[] chars, int offset, int length) throws IOException
167     {
168         synchronized (this)
169         {
170             if (closed)
171                 throw new IOException("Stream is closed");
172 
173             while (length > 0)
174             {
175                 // There may be no space available, we want
176                 // to handle correctly when space == 0.
177                 int space = utf.remaining();
178                 int size = Math.min(space, length);
179                 utf.append(chars, offset, size);
180                 offset += size;
181                 length -= size;
182                 if (length > 0)
183                 {
184                     // If we could not write everything, it means
185                     // that the buffer was full, so flush it.
186                     flush(false);
187                 }
188             }
189         }
190     }
191 
192     public void setCallback(WriteCallback callback)
193     {
194         synchronized (this)
195         {
196             this.callback = callback;
197         }
198     }
199 
200     private void notifySuccess()
201     {
202         WriteCallback callback;
203         synchronized (this)
204         {
205             callback = this.callback;
206         }
207         if (callback != null)
208         {
209             callback.writeSuccess();
210         }
211     }
212 
213     private void notifyFailure(Throwable failure)
214     {
215         WriteCallback callback;
216         synchronized (this)
217         {
218             callback = this.callback;
219         }
220         if (callback != null)
221         {
222             callback.writeFailed(failure);
223         }
224     }
225 }