1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.jsr356;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.concurrent.Future;
24
25 import javax.websocket.EncodeException;
26 import javax.websocket.Encoder;
27 import javax.websocket.RemoteEndpoint;
28 import javax.websocket.SendHandler;
29
30 import org.eclipse.jetty.util.BufferUtil;
31 import org.eclipse.jetty.util.log.Log;
32 import org.eclipse.jetty.util.log.Logger;
33 import org.eclipse.jetty.websocket.api.BatchMode;
34 import org.eclipse.jetty.websocket.common.WebSocketRemoteEndpoint;
35 import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
36 import org.eclipse.jetty.websocket.common.message.MessageOutputStream;
37 import org.eclipse.jetty.websocket.common.message.MessageWriter;
38 import org.eclipse.jetty.websocket.jsr356.encoders.EncodeFailedFuture;
39
40 public abstract class AbstractJsrRemote implements RemoteEndpoint
41 {
42 private static final Logger LOG = Log.getLogger(AbstractJsrRemote.class);
43
44 protected final JsrSession session;
45 protected final WebSocketRemoteEndpoint jettyRemote;
46 protected final EncoderFactory encoders;
47
48 protected AbstractJsrRemote(JsrSession session)
49 {
50 this.session = session;
51 if (!(session.getRemote() instanceof WebSocketRemoteEndpoint))
52 {
53 StringBuilder err = new StringBuilder();
54 err.append("Unexpected implementation [");
55 err.append(session.getRemote().getClass().getName());
56 err.append("]. Expected an instanceof [");
57 err.append(WebSocketRemoteEndpoint.class.getName());
58 err.append("]");
59 throw new IllegalStateException(err.toString());
60 }
61 this.jettyRemote = (WebSocketRemoteEndpoint)session.getRemote();
62 this.encoders = session.getEncoderFactory();
63 }
64
65 protected void assertMessageNotNull(Object data)
66 {
67 if (data == null)
68 {
69 throw new IllegalArgumentException("message cannot be null");
70 }
71 }
72
73 protected void assertSendHandlerNotNull(SendHandler handler)
74 {
75 if (handler == null)
76 {
77 throw new IllegalArgumentException("SendHandler cannot be null");
78 }
79 }
80
81 @Override
82 public void flushBatch() throws IOException
83 {
84 jettyRemote.flush();
85 }
86
87 @Override
88 public boolean getBatchingAllowed()
89 {
90 return jettyRemote.getBatchMode() == BatchMode.ON;
91 }
92
93 @Override
94 public void setBatchingAllowed(boolean allowed) throws IOException
95 {
96 if (jettyRemote.getBatchMode() == BatchMode.ON && !allowed)
97 jettyRemote.flush();
98 jettyRemote.setBatchMode(allowed ? BatchMode.ON : BatchMode.OFF);
99 }
100
101 @SuppressWarnings(
102 {"rawtypes", "unchecked"})
103 public Future<Void> sendObjectViaFuture(Object data)
104 {
105 assertMessageNotNull(data);
106 if (LOG.isDebugEnabled())
107 {
108 LOG.debug("sendObject({})", data);
109 }
110
111 Encoder encoder = encoders.getEncoderFor(data.getClass());
112 if (encoder == null)
113 {
114 throw new IllegalArgumentException("No encoder for type: " + data.getClass());
115 }
116
117 if (encoder instanceof Encoder.Text)
118 {
119 Encoder.Text text = (Encoder.Text)encoder;
120 try
121 {
122 String msg = text.encode(data);
123 return jettyRemote.sendStringByFuture(msg);
124 }
125 catch (EncodeException e)
126 {
127 return new EncodeFailedFuture(data, text, Encoder.Text.class, e);
128 }
129 }
130 else if (encoder instanceof Encoder.TextStream)
131 {
132 Encoder.TextStream etxt = (Encoder.TextStream)encoder;
133 FutureWriteCallback callback = new FutureWriteCallback();
134 try (MessageWriter writer = new MessageWriter(session))
135 {
136 writer.setCallback(callback);
137 etxt.encode(data, writer);
138 return callback;
139 }
140 catch (EncodeException | IOException e)
141 {
142 return new EncodeFailedFuture(data, etxt, Encoder.Text.class, e);
143 }
144 }
145 else if (encoder instanceof Encoder.Binary)
146 {
147 Encoder.Binary ebin = (Encoder.Binary)encoder;
148 try
149 {
150 ByteBuffer buf = ebin.encode(data);
151 return jettyRemote.sendBytesByFuture(buf);
152 }
153 catch (EncodeException e)
154 {
155 return new EncodeFailedFuture(data, ebin, Encoder.Binary.class, e);
156 }
157 }
158 else if (encoder instanceof Encoder.BinaryStream)
159 {
160 Encoder.BinaryStream ebin = (Encoder.BinaryStream)encoder;
161 FutureWriteCallback callback = new FutureWriteCallback();
162 try (MessageOutputStream out = new MessageOutputStream(session))
163 {
164 out.setCallback(callback);
165 ebin.encode(data, out);
166 return callback;
167 }
168 catch (EncodeException | IOException e)
169 {
170 return new EncodeFailedFuture(data, ebin, Encoder.Binary.class, e);
171 }
172 }
173
174 throw new IllegalArgumentException("Unknown encoder type: " + encoder);
175 }
176
177 @Override
178 public void sendPing(ByteBuffer data) throws IOException, IllegalArgumentException
179 {
180 if (LOG.isDebugEnabled())
181 {
182 LOG.debug("sendPing({})", BufferUtil.toDetailString(data));
183 }
184 jettyRemote.sendPing(data);
185 }
186
187 @Override
188 public void sendPong(ByteBuffer data) throws IOException, IllegalArgumentException
189 {
190 if (LOG.isDebugEnabled())
191 {
192 LOG.debug("sendPong({})", BufferUtil.toDetailString(data));
193 }
194 jettyRemote.sendPong(data);
195 }
196 }