1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.servlets;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.RandomAccessFile;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.FileChannel.MapMode;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ScheduledThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30
31 import javax.servlet.AsyncContext;
32 import javax.servlet.ServletException;
33 import javax.servlet.ServletOutputStream;
34 import javax.servlet.WriteListener;
35 import javax.servlet.http.HttpServlet;
36 import javax.servlet.http.HttpServletRequest;
37 import javax.servlet.http.HttpServletResponse;
38
39 import org.eclipse.jetty.server.HttpOutput;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 public class DataRateLimitedServlet extends HttpServlet
63 {
64 private static final long serialVersionUID = -4771757707068097025L;
65 private int buffersize=8192;
66 private long pauseNS=TimeUnit.MILLISECONDS.toNanos(100);
67 ScheduledThreadPoolExecutor scheduler;
68 private final ConcurrentHashMap<String, ByteBuffer> cache=new ConcurrentHashMap<>();
69
70 @Override
71 public void init() throws ServletException
72 {
73
74 String tmp = getInitParameter("buffersize");
75 if (tmp!=null)
76 buffersize=Integer.parseInt(tmp);
77 tmp = getInitParameter("pause");
78 if (tmp!=null)
79 pauseNS=TimeUnit.MILLISECONDS.toNanos(Integer.parseInt(tmp));
80 tmp = getInitParameter("pool");
81 int pool=tmp==null?Runtime.getRuntime().availableProcessors():Integer.parseInt(tmp);
82
83
84 scheduler=new ScheduledThreadPoolExecutor(pool);
85 }
86
87 @Override
88 public void destroy()
89 {
90 scheduler.shutdown();
91 }
92
93 @Override
94 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
95 {
96
97 String info=request.getPathInfo();
98
99
100 if (info.endsWith("/"))
101 {
102 response.sendError(503,"directories not supported");
103 return;
104 }
105
106
107 String content_type=getServletContext().getMimeType(info);
108 response.setContentType(content_type==null?"application/x-data":content_type);
109
110
111 String path = request.getPathTranslated();
112
113
114 ServletOutputStream out = response.getOutputStream();
115 if (path != null && out instanceof HttpOutput)
116 {
117
118 File file = new File(path);
119 if (file.exists() && file.canRead())
120 {
121
122 response.setContentLengthLong(file.length());
123
124
125 ByteBuffer mapped=cache.get(path);
126
127
128 if (mapped==null)
129 {
130
131 try (RandomAccessFile raf = new RandomAccessFile(file, "r"))
132 {
133 ByteBuffer buf = raf.getChannel().map(MapMode.READ_ONLY,0,raf.length());
134 mapped=cache.putIfAbsent(path,buf);
135 if (mapped==null)
136 mapped=buf;
137 }
138 }
139
140
141 AsyncContext async=request.startAsync();
142
143
144 out.setWriteListener(new JettyDataStream(mapped,async,out));
145 return;
146 }
147 }
148
149
150
151
152 InputStream content = getServletContext().getResourceAsStream(info);
153 if (content==null)
154 {
155 response.sendError(404);
156 return;
157 }
158
159
160 out.setWriteListener(new StandardDataStream(content,request.startAsync(),out));
161 }
162
163
164
165
166 private final class StandardDataStream implements WriteListener, Runnable
167 {
168 private final InputStream content;
169 private final AsyncContext async;
170 private final ServletOutputStream out;
171
172 private StandardDataStream(InputStream content, AsyncContext async, ServletOutputStream out)
173 {
174 this.content = content;
175 this.async = async;
176 this.out = out;
177 }
178
179 @Override
180 public void onWritePossible() throws IOException
181 {
182
183 if(out.isReady())
184 {
185
186
187 byte[] buffer = new byte[buffersize];
188
189
190 int len=content.read(buffer);
191
192
193 if (len<0)
194 {
195
196 async.complete();
197 return;
198 }
199
200
201
202
203
204 out.write(buffer,0,len);
205
206
207
208 scheduler.schedule(this,pauseNS,TimeUnit.NANOSECONDS);
209 }
210 }
211
212 @Override
213 public void run()
214 {
215 try
216 {
217
218
219
220 onWritePossible();
221 }
222 catch(Exception e)
223 {
224 onError(e);
225 }
226 }
227
228 @Override
229 public void onError(Throwable t)
230 {
231 getServletContext().log("Async Error",t);
232 async.complete();
233 }
234 }
235
236
237
238
239
240
241 private final class JettyDataStream implements WriteListener, Runnable
242 {
243 private final ByteBuffer content;
244 private final int limit;
245 private final AsyncContext async;
246 private final HttpOutput out;
247
248 private JettyDataStream(ByteBuffer content, AsyncContext async, ServletOutputStream out)
249 {
250
251
252 this.content = content.asReadOnlyBuffer();
253
254 this.limit=this.content.limit();
255 this.async = async;
256 this.out = (HttpOutput)out;
257 }
258
259 @Override
260 public void onWritePossible() throws IOException
261 {
262
263 if(out.isReady())
264 {
265
266 int l=content.position()+buffersize;
267
268 if (l>limit)
269 l=limit;
270 content.limit(l);
271
272
273 if (!content.hasRemaining())
274 {
275
276 async.complete();
277 return;
278 }
279
280
281
282
283
284 out.write(content);
285
286
287
288 scheduler.schedule(this,pauseNS,TimeUnit.NANOSECONDS);
289 }
290 }
291
292 @Override
293 public void run()
294 {
295 try
296 {
297
298
299
300 onWritePossible();
301 }
302 catch(Exception e)
303 {
304 onError(e);
305 }
306 }
307
308 @Override
309 public void onError(Throwable t)
310 {
311 getServletContext().log("Async Error",t);
312 async.complete();
313 }
314 }
315 }