1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.jsr356;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.concurrent.Future;
24 import javax.websocket.EncodeException;
25 import javax.websocket.Encoder;
26 import javax.websocket.RemoteEndpoint;
27 import javax.websocket.SendHandler;
28
29 import org.eclipse.jetty.util.BufferUtil;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32 import org.eclipse.jetty.websocket.api.BatchMode;
33 import org.eclipse.jetty.websocket.common.WebSocketRemoteEndpoint;
34 import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
35 import org.eclipse.jetty.websocket.common.message.MessageOutputStream;
36 import org.eclipse.jetty.websocket.common.message.MessageWriter;
37 import org.eclipse.jetty.websocket.jsr356.encoders.EncodeFailedFuture;
38
39 public abstract class AbstractJsrRemote implements RemoteEndpoint
40 {
41 private static final Logger LOG = Log.getLogger(AbstractJsrRemote.class);
42
43 protected final JsrSession session;
44 protected final WebSocketRemoteEndpoint jettyRemote;
45 protected final EncoderFactory encoders;
46
47 protected AbstractJsrRemote(JsrSession session)
48 {
49 this.session = session;
50 if (!(session.getRemote() instanceof WebSocketRemoteEndpoint))
51 {
52 StringBuilder err = new StringBuilder();
53 err.append("Unexpected implementation [");
54 err.append(session.getRemote().getClass().getName());
55 err.append("]. Expected an instanceof [");
56 err.append(WebSocketRemoteEndpoint.class.getName());
57 err.append("]");
58 throw new IllegalStateException(err.toString());
59 }
60 this.jettyRemote = (WebSocketRemoteEndpoint)session.getRemote();
61 this.encoders = session.getEncoderFactory();
62 }
63
64 protected void assertMessageNotNull(Object data)
65 {
66 if (data == null)
67 {
68 throw new IllegalArgumentException("message cannot be null");
69 }
70 }
71
72 protected void assertSendHandlerNotNull(SendHandler handler)
73 {
74 if (handler == null)
75 {
76 throw new IllegalArgumentException("SendHandler cannot be null");
77 }
78 }
79
80 @Override
81 public void flushBatch() throws IOException
82 {
83 jettyRemote.flush();
84 }
85
86 @Override
87 public boolean getBatchingAllowed()
88 {
89 return jettyRemote.getBatchMode() == BatchMode.ON;
90 }
91
92 @Override
93 public void setBatchingAllowed(boolean allowed) throws IOException
94 {
95 if (jettyRemote.getBatchMode() == BatchMode.ON && !allowed)
96 jettyRemote.flush();
97 jettyRemote.setBatchMode(allowed ? BatchMode.ON : BatchMode.OFF);
98 }
99
100 @SuppressWarnings(
101 {"rawtypes", "unchecked"})
102 public Future<Void> sendObjectViaFuture(Object data)
103 {
104 assertMessageNotNull(data);
105 if (LOG.isDebugEnabled())
106 {
107 LOG.debug("sendObject({})", data);
108 }
109
110 Encoder encoder = encoders.getEncoderFor(data.getClass());
111 if (encoder == null)
112 {
113 throw new IllegalArgumentException("No encoder for type: " + data.getClass());
114 }
115
116 if (encoder instanceof Encoder.Text)
117 {
118 Encoder.Text text = (Encoder.Text)encoder;
119 try
120 {
121 String msg = text.encode(data);
122 return jettyRemote.sendStringByFuture(msg);
123 }
124 catch (EncodeException e)
125 {
126 return new EncodeFailedFuture(data, text, Encoder.Text.class, e);
127 }
128 }
129 else if (encoder instanceof Encoder.TextStream)
130 {
131 Encoder.TextStream etxt = (Encoder.TextStream)encoder;
132 FutureWriteCallback callback = new FutureWriteCallback();
133 try (MessageWriter writer = new MessageWriter(session))
134 {
135 writer.setCallback(callback);
136 etxt.encode(data, writer);
137 return callback;
138 }
139 catch (EncodeException | IOException e)
140 {
141 return new EncodeFailedFuture(data, etxt, Encoder.Text.class, e);
142 }
143 }
144 else if (encoder instanceof Encoder.Binary)
145 {
146 Encoder.Binary ebin = (Encoder.Binary)encoder;
147 try
148 {
149 ByteBuffer buf = ebin.encode(data);
150 return jettyRemote.sendBytesByFuture(buf);
151 }
152 catch (EncodeException e)
153 {
154 return new EncodeFailedFuture(data, ebin, Encoder.Binary.class, e);
155 }
156 }
157 else if (encoder instanceof Encoder.BinaryStream)
158 {
159 Encoder.BinaryStream ebin = (Encoder.BinaryStream)encoder;
160 FutureWriteCallback callback = new FutureWriteCallback();
161 try (MessageOutputStream out = new MessageOutputStream(session))
162 {
163 out.setCallback(callback);
164 ebin.encode(data, out);
165 return callback;
166 }
167 catch (EncodeException | IOException e)
168 {
169 return new EncodeFailedFuture(data, ebin, Encoder.Binary.class, e);
170 }
171 }
172
173 throw new IllegalArgumentException("Unknown encoder type: " + encoder);
174 }
175
176 @Override
177 public void sendPing(ByteBuffer data) throws IOException, IllegalArgumentException
178 {
179 if (LOG.isDebugEnabled())
180 {
181 LOG.debug("sendPing({})", BufferUtil.toDetailString(data));
182 }
183 jettyRemote.sendPing(data);
184 }
185
186 @Override
187 public void sendPong(ByteBuffer data) throws IOException, IllegalArgumentException
188 {
189 if (LOG.isDebugEnabled())
190 {
191 LOG.debug("sendPong({})", BufferUtil.toDetailString(data));
192 }
193 jettyRemote.sendPong(data);
194 }
195 }