1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.extensions.fragment;
20
21
22 import java.nio.ByteBuffer;
23 import java.util.Queue;
24
25 import org.eclipse.jetty.util.ConcurrentArrayQueue;
26 import org.eclipse.jetty.util.IteratingCallback;
27 import org.eclipse.jetty.util.log.Log;
28 import org.eclipse.jetty.util.log.Logger;
29 import org.eclipse.jetty.websocket.api.BatchMode;
30 import org.eclipse.jetty.websocket.api.WriteCallback;
31 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
32 import org.eclipse.jetty.websocket.api.extensions.Frame;
33 import org.eclipse.jetty.websocket.common.OpCode;
34 import org.eclipse.jetty.websocket.common.extensions.AbstractExtension;
35 import org.eclipse.jetty.websocket.common.frames.DataFrame;
36
37
38
39
40 public class FragmentExtension extends AbstractExtension
41 {
42 private static final Logger LOG = Log.getLogger(FragmentExtension.class);
43
44 private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>();
45 private final IteratingCallback flusher = new Flusher();
46 private int maxLength;
47
48 @Override
49 public String getName()
50 {
51 return "fragment";
52 }
53
54 @Override
55 public void incomingFrame(Frame frame)
56 {
57 nextIncomingFrame(frame);
58 }
59
60 @Override
61 public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
62 {
63 ByteBuffer payload = frame.getPayload();
64 int length = payload != null ? payload.remaining() : 0;
65 if (OpCode.isControlFrame(frame.getOpCode()) || maxLength <= 0 || length <= maxLength)
66 {
67 nextOutgoingFrame(frame, callback, batchMode);
68 return;
69 }
70
71 FrameEntry entry = new FrameEntry(frame, callback, batchMode);
72 if (LOG.isDebugEnabled())
73 LOG.debug("Queuing {}", entry);
74 entries.offer(entry);
75 flusher.iterate();
76 }
77
78 @Override
79 public void setConfig(ExtensionConfig config)
80 {
81 super.setConfig(config);
82 maxLength = config.getParameter("maxLength", -1);
83 }
84
85 private static class FrameEntry
86 {
87 private final Frame frame;
88 private final WriteCallback callback;
89 private final BatchMode batchMode;
90
91 private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
92 {
93 this.frame = frame;
94 this.callback = callback;
95 this.batchMode = batchMode;
96 }
97
98 @Override
99 public String toString()
100 {
101 return frame.toString();
102 }
103 }
104
105 private class Flusher extends IteratingCallback implements WriteCallback
106 {
107 private FrameEntry current;
108 private boolean finished = true;
109
110 @Override
111 protected Action process() throws Exception
112 {
113 if (finished)
114 {
115 current = entries.poll();
116 LOG.debug("Processing {}", current);
117 if (current == null)
118 return Action.IDLE;
119 fragment(current, true);
120 }
121 else
122 {
123 fragment(current, false);
124 }
125 return Action.SCHEDULED;
126 }
127
128 private void fragment(FrameEntry entry, boolean first)
129 {
130 Frame frame = entry.frame;
131 ByteBuffer payload = frame.getPayload();
132 int remaining = payload.remaining();
133 int length = Math.min(remaining, maxLength);
134 finished = length == remaining;
135
136 boolean continuation = frame.getType().isContinuation() || !first;
137 DataFrame fragment = new DataFrame(frame, continuation);
138 boolean fin = frame.isFin() && finished;
139 fragment.setFin(fin);
140
141 int limit = payload.limit();
142 int newLimit = payload.position() + length;
143 payload.limit(newLimit);
144 ByteBuffer payloadFragment = payload.slice();
145 payload.limit(limit);
146 fragment.setPayload(payloadFragment);
147 if (LOG.isDebugEnabled())
148 LOG.debug("Fragmented {}->{}", frame, fragment);
149 payload.position(newLimit);
150
151 nextOutgoingFrame(fragment, this, entry.batchMode);
152 }
153
154 @Override
155 protected void onCompleteSuccess()
156 {
157
158 }
159
160 @Override
161 protected void onCompleteFailure(Throwable x)
162 {
163
164
165
166 }
167
168 @Override
169 public void writeSuccess()
170 {
171
172
173 notifyCallbackSuccess(current.callback);
174 succeeded();
175 }
176
177 @Override
178 public void writeFailed(Throwable x)
179 {
180
181
182
183
184
185 notifyCallbackFailure(current.callback, x);
186 succeeded();
187 }
188
189 private void notifyCallbackSuccess(WriteCallback callback)
190 {
191 try
192 {
193 if (callback != null)
194 callback.writeSuccess();
195 }
196 catch (Throwable x)
197 {
198 if (LOG.isDebugEnabled())
199 LOG.debug("Exception while notifying success of callback " + callback, x);
200 }
201 }
202
203 private void notifyCallbackFailure(WriteCallback callback, Throwable failure)
204 {
205 try
206 {
207 if (callback != null)
208 callback.writeFailed(failure);
209 }
210 catch (Throwable x)
211 {
212 if (LOG.isDebugEnabled())
213 LOG.debug("Exception while notifying failure of callback " + callback, x);
214 }
215 }
216 }
217 }