View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.servlets;
21  
22  import java.io.BufferedReader;
23  import java.io.IOException;
24  import java.io.StringReader;
25  import java.nio.charset.StandardCharsets;
26  import java.util.Enumeration;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.Future;
29  import java.util.concurrent.ScheduledExecutorService;
30  import java.util.concurrent.TimeUnit;
31  
32  import javax.servlet.AsyncContext;
33  import javax.servlet.ServletException;
34  import javax.servlet.ServletOutputStream;
35  import javax.servlet.http.HttpServlet;
36  import javax.servlet.http.HttpServletRequest;
37  import javax.servlet.http.HttpServletResponse;
38  
39  
40  /**
41   * <p>A servlet that implements the <a href="http://www.w3.org/TR/eventsource/">event source protocol</a>,
42   * also known as "server sent events".</p>
43   * <p>This servlet must be subclassed to implement abstract method {@link #newEventSource(HttpServletRequest)}
44   * to return an instance of {@link EventSource} that allows application to listen for event source events
45   * and to emit event source events.</p>
46   * <p>This servlet supports the following configuration parameters:</p>
47   * <ul>
48   *     <li><code>heartBeatPeriod</code>, that specifies the heartbeat period, in seconds, used to check
49   *     whether the connection has been closed by the client; defaults to 10 seconds.</li>
50   * </ul>
51   *
52   * <p>NOTE: there is currently no support for <code>last-event-id</code>.</p>
53   */
54  public abstract class EventSourceServlet extends HttpServlet
55  {
56      private static final byte[] CRLF = new byte[]{'\r', '\n'};
57      private static final byte[] EVENT_FIELD = "event: ".getBytes(StandardCharsets.UTF_8);
58      private static final byte[] DATA_FIELD = "data: ".getBytes(StandardCharsets.UTF_8);
59      private static final byte[] COMMENT_FIELD = ": ".getBytes(StandardCharsets.UTF_8);
60  
61      private ScheduledExecutorService scheduler;
62      private int heartBeatPeriod = 10;
63  
64      @Override
65      public void init() throws ServletException
66      {
67          String heartBeatPeriodParam = getServletConfig().getInitParameter("heartBeatPeriod");
68          if (heartBeatPeriodParam != null)
69              heartBeatPeriod = Integer.parseInt(heartBeatPeriodParam);
70          scheduler = Executors.newSingleThreadScheduledExecutor();
71      }
72  
73      @Override
74      public void destroy()
75      {
76          if (scheduler != null)
77              scheduler.shutdown();
78      }
79  
80      @Override
81      protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
82      {
83          @SuppressWarnings("unchecked")
84          Enumeration<String> acceptValues = request.getHeaders("Accept");
85          while (acceptValues.hasMoreElements())
86          {
87              String accept = acceptValues.nextElement();
88              if (accept.equals("text/event-stream"))
89              {
90                  EventSource eventSource = newEventSource(request);
91                  if (eventSource == null)
92                  {
93                      response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
94                  }
95                  else
96                  {
97                      respond(request, response);
98                      AsyncContext async = request.startAsync();
99                      // Infinite timeout because the continuation is never resumed,
100                     // but only completed on close
101                     async.setTimeout(0);
102                     EventSourceEmitter emitter = new EventSourceEmitter(eventSource, async);
103                     emitter.scheduleHeartBeat();
104                     open(eventSource, emitter);
105                 }
106                 return;
107             }
108         }
109         super.doGet(request, response);
110     }
111 
112     protected abstract EventSource newEventSource(HttpServletRequest request);
113 
114     protected void respond(HttpServletRequest request, HttpServletResponse response) throws IOException
115     {
116         response.setStatus(HttpServletResponse.SC_OK);
117         response.setCharacterEncoding(StandardCharsets.UTF_8.name());
118         response.setContentType("text/event-stream");
119         // By adding this header, and not closing the connection,
120         // we disable HTTP chunking, and we can use write()+flush()
121         // to send data in the text/event-stream protocol
122         response.addHeader("Connection", "close");
123         response.flushBuffer();
124     }
125 
126     protected void open(EventSource eventSource, EventSource.Emitter emitter) throws IOException
127     {
128         eventSource.onOpen(emitter);
129     }
130 
131     protected class EventSourceEmitter implements EventSource.Emitter, Runnable
132     {
133         private final EventSource eventSource;
134         private final AsyncContext async;
135         private final ServletOutputStream output;
136         private Future<?> heartBeat;
137         private boolean closed;
138 
139         public EventSourceEmitter(EventSource eventSource, AsyncContext async) throws IOException
140         {
141             this.eventSource = eventSource;
142             this.async = async;
143             this.output = async.getResponse().getOutputStream();
144         }
145 
146         @Override
147         public void event(String name, String data) throws IOException
148         {
149             synchronized (this)
150             {
151                 output.write(EVENT_FIELD);
152                 output.write(name.getBytes(StandardCharsets.UTF_8));
153                 output.write(CRLF);
154                 data(data);
155             }
156         }
157 
158         @Override
159         public void data(String data) throws IOException
160         {
161             synchronized (this)
162             {
163                 BufferedReader reader = new BufferedReader(new StringReader(data));
164                 String line;
165                 while ((line = reader.readLine()) != null)
166                 {
167                     output.write(DATA_FIELD);
168                     output.write(line.getBytes(StandardCharsets.UTF_8));
169                     output.write(CRLF);
170                 }
171                 output.write(CRLF);
172                 flush();
173             }
174         }
175 
176         @Override
177         public void comment(String comment) throws IOException
178         {
179             synchronized (this)
180             {
181                 output.write(COMMENT_FIELD);
182                 output.write(comment.getBytes(StandardCharsets.UTF_8));
183                 output.write(CRLF);
184                 output.write(CRLF);
185                 flush();
186             }
187         }
188 
189         @Override
190         public void run()
191         {
192             // If the other peer closes the connection, the first
193             // flush() should generate a TCP reset that is detected
194             // on the second flush()
195             try
196             {
197                 synchronized (this)
198                 {
199                     output.write('\r');
200                     flush();
201                     output.write('\n');
202                     flush();
203                 }
204                 // We could write, reschedule heartbeat
205                 scheduleHeartBeat();
206             }
207             catch (IOException x)
208             {
209                 // The other peer closed the connection
210                 close();
211                 eventSource.onClose();
212             }
213         }
214 
215         protected void flush() throws IOException
216         {
217             async.getResponse().flushBuffer();
218         }
219         
220         @Override
221         public void close()
222         {
223             synchronized (this)
224             {
225                 closed = true;
226                 heartBeat.cancel(false);
227             }
228             async.complete();
229         }
230 
231         private void scheduleHeartBeat()
232         {
233             synchronized (this)
234             {
235                 if (!closed)
236                     heartBeat = scheduler.schedule(this, heartBeatPeriod, TimeUnit.SECONDS);
237             }
238         }
239     }
240 }