1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.eclipse.jetty.servlets;
21
22 import java.io.BufferedReader;
23 import java.io.IOException;
24 import java.io.StringReader;
25 import java.nio.charset.StandardCharsets;
26 import java.util.Enumeration;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.TimeUnit;
31
32 import javax.servlet.AsyncContext;
33 import javax.servlet.ServletException;
34 import javax.servlet.ServletOutputStream;
35 import javax.servlet.http.HttpServlet;
36 import javax.servlet.http.HttpServletRequest;
37 import javax.servlet.http.HttpServletResponse;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 public abstract class EventSourceServlet extends HttpServlet
55 {
56 private static final byte[] CRLF = new byte[]{'\r', '\n'};
57 private static final byte[] EVENT_FIELD = "event: ".getBytes(StandardCharsets.UTF_8);
58 private static final byte[] DATA_FIELD = "data: ".getBytes(StandardCharsets.UTF_8);
59 private static final byte[] COMMENT_FIELD = ": ".getBytes(StandardCharsets.UTF_8);
60
61 private ScheduledExecutorService scheduler;
62 private int heartBeatPeriod = 10;
63
64 @Override
65 public void init() throws ServletException
66 {
67 String heartBeatPeriodParam = getServletConfig().getInitParameter("heartBeatPeriod");
68 if (heartBeatPeriodParam != null)
69 heartBeatPeriod = Integer.parseInt(heartBeatPeriodParam);
70 scheduler = Executors.newSingleThreadScheduledExecutor();
71 }
72
73 @Override
74 public void destroy()
75 {
76 if (scheduler != null)
77 scheduler.shutdown();
78 }
79
80 @Override
81 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
82 {
83 @SuppressWarnings("unchecked")
84 Enumeration<String> acceptValues = request.getHeaders("Accept");
85 while (acceptValues.hasMoreElements())
86 {
87 String accept = acceptValues.nextElement();
88 if (accept.equals("text/event-stream"))
89 {
90 EventSource eventSource = newEventSource(request);
91 if (eventSource == null)
92 {
93 response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
94 }
95 else
96 {
97 respond(request, response);
98 AsyncContext async = request.startAsync();
99
100
101 async.setTimeout(0);
102 EventSourceEmitter emitter = new EventSourceEmitter(eventSource, async);
103 emitter.scheduleHeartBeat();
104 open(eventSource, emitter);
105 }
106 return;
107 }
108 }
109 super.doGet(request, response);
110 }
111
112 protected abstract EventSource newEventSource(HttpServletRequest request);
113
114 protected void respond(HttpServletRequest request, HttpServletResponse response) throws IOException
115 {
116 response.setStatus(HttpServletResponse.SC_OK);
117 response.setCharacterEncoding(StandardCharsets.UTF_8.name());
118 response.setContentType("text/event-stream");
119
120
121
122 response.addHeader("Connection", "close");
123 response.flushBuffer();
124 }
125
126 protected void open(EventSource eventSource, EventSource.Emitter emitter) throws IOException
127 {
128 eventSource.onOpen(emitter);
129 }
130
131 protected class EventSourceEmitter implements EventSource.Emitter, Runnable
132 {
133 private final EventSource eventSource;
134 private final AsyncContext async;
135 private final ServletOutputStream output;
136 private Future<?> heartBeat;
137 private boolean closed;
138
139 public EventSourceEmitter(EventSource eventSource, AsyncContext async) throws IOException
140 {
141 this.eventSource = eventSource;
142 this.async = async;
143 this.output = async.getResponse().getOutputStream();
144 }
145
146 @Override
147 public void event(String name, String data) throws IOException
148 {
149 synchronized (this)
150 {
151 output.write(EVENT_FIELD);
152 output.write(name.getBytes(StandardCharsets.UTF_8));
153 output.write(CRLF);
154 data(data);
155 }
156 }
157
158 @Override
159 public void data(String data) throws IOException
160 {
161 synchronized (this)
162 {
163 BufferedReader reader = new BufferedReader(new StringReader(data));
164 String line;
165 while ((line = reader.readLine()) != null)
166 {
167 output.write(DATA_FIELD);
168 output.write(line.getBytes(StandardCharsets.UTF_8));
169 output.write(CRLF);
170 }
171 output.write(CRLF);
172 flush();
173 }
174 }
175
176 @Override
177 public void comment(String comment) throws IOException
178 {
179 synchronized (this)
180 {
181 output.write(COMMENT_FIELD);
182 output.write(comment.getBytes(StandardCharsets.UTF_8));
183 output.write(CRLF);
184 output.write(CRLF);
185 flush();
186 }
187 }
188
189 @Override
190 public void run()
191 {
192
193
194
195 try
196 {
197 synchronized (this)
198 {
199 output.write('\r');
200 flush();
201 output.write('\n');
202 flush();
203 }
204
205 scheduleHeartBeat();
206 }
207 catch (IOException x)
208 {
209
210 close();
211 eventSource.onClose();
212 }
213 }
214
215 protected void flush() throws IOException
216 {
217 async.getResponse().flushBuffer();
218 }
219
220 @Override
221 public void close()
222 {
223 synchronized (this)
224 {
225 closed = true;
226 heartBeat.cancel(false);
227 }
228 async.complete();
229 }
230
231 private void scheduleHeartBeat()
232 {
233 synchronized (this)
234 {
235 if (!closed)
236 heartBeat = scheduler.schedule(this, heartBeatPeriod, TimeUnit.SECONDS);
237 }
238 }
239 }
240 }