View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.servlets;
21  
22  import java.io.BufferedReader;
23  import java.io.IOException;
24  import java.io.StringReader;
25  import java.io.UnsupportedEncodingException;
26  import java.nio.charset.Charset;
27  import java.util.Enumeration;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.Future;
30  import java.util.concurrent.ScheduledExecutorService;
31  import java.util.concurrent.TimeUnit;
32  
33  import javax.servlet.AsyncContext;
34  import javax.servlet.ServletException;
35  import javax.servlet.ServletOutputStream;
36  import javax.servlet.http.HttpServlet;
37  import javax.servlet.http.HttpServletRequest;
38  import javax.servlet.http.HttpServletResponse;
39  
40  
41  /**
42   * <p>A servlet that implements the <a href="http://www.w3.org/TR/eventsource/">event source protocol</a>,
43   * also known as "server sent events".</p>
44   * <p>This servlet must be subclassed to implement abstract method {@link #newEventSource(HttpServletRequest)}
45   * to return an instance of {@link EventSource} that allows application to listen for event source events
46   * and to emit event source events.</p>
47   * <p>This servlet supports the following configuration parameters:</p>
48   * <ul>
49   *     <li><code>heartBeatPeriod</code>, that specifies the heartbeat period, in seconds, used to check
50   *     whether the connection has been closed by the client; defaults to 10 seconds.</li>
51   * </ul>
52   *
53   * <p>NOTE: there is currently no support for <code>last-event-id</code>.</p>
54   */
55  public abstract class EventSourceServlet extends HttpServlet
56  {
57      private static final Charset UTF_8 = Charset.forName("UTF-8");
58      private static final byte[] CRLF = new byte[]{'\r', '\n'};
59      private static final byte[] EVENT_FIELD;
60      private static final byte[] DATA_FIELD;
61      private static final byte[] COMMENT_FIELD;
62      static
63      {
64          try
65          {
66              EVENT_FIELD = "event: ".getBytes(UTF_8.name());
67              DATA_FIELD = "data: ".getBytes(UTF_8.name());
68              COMMENT_FIELD = ": ".getBytes(UTF_8.name());
69          }
70          catch (UnsupportedEncodingException x)
71          {
72              throw new RuntimeException(x);
73          }
74      }
75  
76      private ScheduledExecutorService scheduler;
77      private int heartBeatPeriod = 10;
78  
79      @Override
80      public void init() throws ServletException
81      {
82          String heartBeatPeriodParam = getServletConfig().getInitParameter("heartBeatPeriod");
83          if (heartBeatPeriodParam != null)
84              heartBeatPeriod = Integer.parseInt(heartBeatPeriodParam);
85          scheduler = Executors.newSingleThreadScheduledExecutor();
86      }
87  
88      @Override
89      public void destroy()
90      {
91          if (scheduler != null)
92              scheduler.shutdown();
93      }
94  
95      @Override
96      protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
97      {
98          @SuppressWarnings("unchecked")
99          Enumeration<String> acceptValues = request.getHeaders("Accept");
100         while (acceptValues.hasMoreElements())
101         {
102             String accept = acceptValues.nextElement();
103             if (accept.equals("text/event-stream"))
104             {
105                 EventSource eventSource = newEventSource(request);
106                 if (eventSource == null)
107                 {
108                     response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
109                 }
110                 else
111                 {
112                     respond(request, response);
113                     AsyncContext async = request.startAsync();
114                     // Infinite timeout because the continuation is never resumed,
115                     // but only completed on close
116                     async.setTimeout(0);
117                     EventSourceEmitter emitter = new EventSourceEmitter(eventSource, async);
118                     emitter.scheduleHeartBeat();
119                     open(eventSource, emitter);
120                 }
121                 return;
122             }
123         }
124         super.doGet(request, response);
125     }
126 
127     protected abstract EventSource newEventSource(HttpServletRequest request);
128 
129     protected void respond(HttpServletRequest request, HttpServletResponse response) throws IOException
130     {
131         response.setStatus(HttpServletResponse.SC_OK);
132         response.setCharacterEncoding(UTF_8.name());
133         response.setContentType("text/event-stream");
134         // By adding this header, and not closing the connection,
135         // we disable HTTP chunking, and we can use write()+flush()
136         // to send data in the text/event-stream protocol
137         response.addHeader("Connection", "close");
138         response.flushBuffer();
139     }
140 
141     protected void open(EventSource eventSource, EventSource.Emitter emitter) throws IOException
142     {
143         eventSource.onOpen(emitter);
144     }
145 
146     protected class EventSourceEmitter implements EventSource.Emitter, Runnable
147     {
148         private final EventSource eventSource;
149         private final AsyncContext async;
150         private final ServletOutputStream output;
151         private Future<?> heartBeat;
152         private boolean closed;
153 
154         public EventSourceEmitter(EventSource eventSource, AsyncContext async) throws IOException
155         {
156             this.eventSource = eventSource;
157             this.async = async;
158             this.output = async.getResponse().getOutputStream();
159         }
160 
161         @Override
162         public void event(String name, String data) throws IOException
163         {
164             synchronized (this)
165             {
166                 output.write(EVENT_FIELD);
167                 output.write(name.getBytes(UTF_8.name()));
168                 output.write(CRLF);
169                 data(data);
170             }
171         }
172 
173         @Override
174         public void data(String data) throws IOException
175         {
176             synchronized (this)
177             {
178                 BufferedReader reader = new BufferedReader(new StringReader(data));
179                 String line;
180                 while ((line = reader.readLine()) != null)
181                 {
182                     output.write(DATA_FIELD);
183                     output.write(line.getBytes(UTF_8.name()));
184                     output.write(CRLF);
185                 }
186                 output.write(CRLF);
187                 flush();
188             }
189         }
190 
191         @Override
192         public void comment(String comment) throws IOException
193         {
194             synchronized (this)
195             {
196                 output.write(COMMENT_FIELD);
197                 output.write(comment.getBytes(UTF_8.name()));
198                 output.write(CRLF);
199                 output.write(CRLF);
200                 flush();
201             }
202         }
203 
204         @Override
205         public void run()
206         {
207             // If the other peer closes the connection, the first
208             // flush() should generate a TCP reset that is detected
209             // on the second flush()
210             try
211             {
212                 synchronized (this)
213                 {
214                     output.write('\r');
215                     flush();
216                     output.write('\n');
217                     flush();
218                 }
219                 // We could write, reschedule heartbeat
220                 scheduleHeartBeat();
221             }
222             catch (IOException x)
223             {
224                 // The other peer closed the connection
225                 close();
226                 eventSource.onClose();
227             }
228         }
229 
230         protected void flush() throws IOException
231         {
232             async.getResponse().flushBuffer();
233         }
234         
235         @Override
236         public void close()
237         {
238             synchronized (this)
239             {
240                 closed = true;
241                 heartBeat.cancel(false);
242             }
243             async.complete();
244         }
245 
246         private void scheduleHeartBeat()
247         {
248             synchronized (this)
249             {
250                 if (!closed)
251                     heartBeat = scheduler.schedule(this, heartBeatPeriod, TimeUnit.SECONDS);
252             }
253         }
254     }
255 }