1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.servlets;
21
22 import java.io.BufferedReader;
23 import java.io.IOException;
24 import java.io.StringReader;
25 import java.io.UnsupportedEncodingException;
26 import java.nio.charset.Charset;
27 import java.util.Enumeration;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.TimeUnit;
32
33 import javax.servlet.AsyncContext;
34 import javax.servlet.ServletException;
35 import javax.servlet.ServletOutputStream;
36 import javax.servlet.http.HttpServlet;
37 import javax.servlet.http.HttpServletRequest;
38 import javax.servlet.http.HttpServletResponse;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 public abstract class EventSourceServlet extends HttpServlet
56 {
57 private static final Charset UTF_8 = Charset.forName("UTF-8");
58 private static final byte[] CRLF = new byte[]{'\r', '\n'};
59 private static final byte[] EVENT_FIELD;
60 private static final byte[] DATA_FIELD;
61 private static final byte[] COMMENT_FIELD;
62 static
63 {
64 try
65 {
66 EVENT_FIELD = "event: ".getBytes(UTF_8.name());
67 DATA_FIELD = "data: ".getBytes(UTF_8.name());
68 COMMENT_FIELD = ": ".getBytes(UTF_8.name());
69 }
70 catch (UnsupportedEncodingException x)
71 {
72 throw new RuntimeException(x);
73 }
74 }
75
76 private ScheduledExecutorService scheduler;
77 private int heartBeatPeriod = 10;
78
79 @Override
80 public void init() throws ServletException
81 {
82 String heartBeatPeriodParam = getServletConfig().getInitParameter("heartBeatPeriod");
83 if (heartBeatPeriodParam != null)
84 heartBeatPeriod = Integer.parseInt(heartBeatPeriodParam);
85 scheduler = Executors.newSingleThreadScheduledExecutor();
86 }
87
88 @Override
89 public void destroy()
90 {
91 if (scheduler != null)
92 scheduler.shutdown();
93 }
94
95 @Override
96 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
97 {
98 @SuppressWarnings("unchecked")
99 Enumeration<String> acceptValues = request.getHeaders("Accept");
100 while (acceptValues.hasMoreElements())
101 {
102 String accept = acceptValues.nextElement();
103 if (accept.equals("text/event-stream"))
104 {
105 EventSource eventSource = newEventSource(request);
106 if (eventSource == null)
107 {
108 response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
109 }
110 else
111 {
112 respond(request, response);
113 AsyncContext async = request.startAsync();
114
115
116 async.setTimeout(0);
117 EventSourceEmitter emitter = new EventSourceEmitter(eventSource, async);
118 emitter.scheduleHeartBeat();
119 open(eventSource, emitter);
120 }
121 return;
122 }
123 }
124 super.doGet(request, response);
125 }
126
127 protected abstract EventSource newEventSource(HttpServletRequest request);
128
129 protected void respond(HttpServletRequest request, HttpServletResponse response) throws IOException
130 {
131 response.setStatus(HttpServletResponse.SC_OK);
132 response.setCharacterEncoding(UTF_8.name());
133 response.setContentType("text/event-stream");
134
135
136
137 response.addHeader("Connection", "close");
138 response.flushBuffer();
139 }
140
141 protected void open(EventSource eventSource, EventSource.Emitter emitter) throws IOException
142 {
143 eventSource.onOpen(emitter);
144 }
145
146 protected class EventSourceEmitter implements EventSource.Emitter, Runnable
147 {
148 private final EventSource eventSource;
149 private final AsyncContext async;
150 private final ServletOutputStream output;
151 private Future<?> heartBeat;
152 private boolean closed;
153
154 public EventSourceEmitter(EventSource eventSource, AsyncContext async) throws IOException
155 {
156 this.eventSource = eventSource;
157 this.async = async;
158 this.output = async.getResponse().getOutputStream();
159 }
160
161 @Override
162 public void event(String name, String data) throws IOException
163 {
164 synchronized (this)
165 {
166 output.write(EVENT_FIELD);
167 output.write(name.getBytes(UTF_8.name()));
168 output.write(CRLF);
169 data(data);
170 }
171 }
172
173 @Override
174 public void data(String data) throws IOException
175 {
176 synchronized (this)
177 {
178 BufferedReader reader = new BufferedReader(new StringReader(data));
179 String line;
180 while ((line = reader.readLine()) != null)
181 {
182 output.write(DATA_FIELD);
183 output.write(line.getBytes(UTF_8.name()));
184 output.write(CRLF);
185 }
186 output.write(CRLF);
187 flush();
188 }
189 }
190
191 @Override
192 public void comment(String comment) throws IOException
193 {
194 synchronized (this)
195 {
196 output.write(COMMENT_FIELD);
197 output.write(comment.getBytes(UTF_8.name()));
198 output.write(CRLF);
199 output.write(CRLF);
200 flush();
201 }
202 }
203
204 @Override
205 public void run()
206 {
207
208
209
210 try
211 {
212 synchronized (this)
213 {
214 output.write('\r');
215 flush();
216 output.write('\n');
217 flush();
218 }
219
220 scheduleHeartBeat();
221 }
222 catch (IOException x)
223 {
224
225 close();
226 eventSource.onClose();
227 }
228 }
229
230 protected void flush() throws IOException
231 {
232 async.getResponse().flushBuffer();
233 }
234
235 @Override
236 public void close()
237 {
238 synchronized (this)
239 {
240 closed = true;
241 heartBeat.cancel(false);
242 }
243 async.complete();
244 }
245
246 private void scheduleHeartBeat()
247 {
248 synchronized (this)
249 {
250 if (!closed)
251 heartBeat = scheduler.schedule(this, heartBeatPeriod, TimeUnit.SECONDS);
252 }
253 }
254 }
255 }