1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package com.acme;
20
21 import java.io.IOException;
22 import java.util.HashMap;
23 import java.util.LinkedList;
24 import java.util.Map;
25 import java.util.Queue;
26 import java.util.concurrent.atomic.AtomicReference;
27
28 import javax.servlet.AsyncContext;
29 import javax.servlet.AsyncEvent;
30 import javax.servlet.AsyncListener;
31 import javax.servlet.ServletException;
32 import javax.servlet.http.HttpServlet;
33 import javax.servlet.http.HttpServletRequest;
34 import javax.servlet.http.HttpServletResponse;
35
36 import org.eclipse.jetty.util.log.Log;
37 import org.eclipse.jetty.util.log.Logger;
38
39
40
41
42 @SuppressWarnings("serial")
43 public class ChatServlet extends HttpServlet
44 {
45 private static final Logger LOG = Log.getLogger(ChatServlet.class);
46
47 private long asyncTimeout = 10000;
48
49 public void init()
50 {
51 String parameter = getServletConfig().getInitParameter("asyncTimeout");
52 if (parameter != null)
53 asyncTimeout = Long.parseLong(parameter);
54 }
55
56
57 class Member implements AsyncListener
58 {
59 final String _name;
60 final AtomicReference<AsyncContext> _async = new AtomicReference<>();
61 final Queue<String> _queue = new LinkedList<>();
62
63 Member(String name)
64 {
65 _name = name;
66 }
67
68 @Override
69 public void onTimeout(AsyncEvent event) throws IOException
70 {
71 LOG.debug("resume request");
72 AsyncContext async = _async.get();
73 if (async != null && _async.compareAndSet(async, null))
74 {
75 HttpServletResponse response = (HttpServletResponse)async.getResponse();
76 response.setContentType("text/json;charset=utf-8");
77 response.getOutputStream().write("{action:\"poll\"}".getBytes());
78 async.complete();
79 }
80 }
81
82 @Override
83 public void onStartAsync(AsyncEvent event) throws IOException
84 {
85 event.getAsyncContext().addListener(this);
86 }
87
88 @Override
89 public void onError(AsyncEvent event) throws IOException
90 {
91 }
92
93 @Override
94 public void onComplete(AsyncEvent event) throws IOException
95 {
96 }
97 }
98
99 Map<String, Map<String, Member>> _rooms = new HashMap<>();
100
101
102
103 @Override
104 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
105 {
106
107 boolean join = Boolean.parseBoolean(request.getParameter("join"));
108 String message = request.getParameter("message");
109 String username = request.getParameter("user");
110
111 LOG.debug("doPost called. join={},message={},username={}", join, message, username);
112 if (username == null)
113 {
114 LOG.debug("no paramter user set, sending 503");
115 response.sendError(503, "user==null");
116 return;
117 }
118
119 Map<String, Member> room = getRoom(request.getPathInfo());
120 Member member = getMember(username, room);
121
122 if (message != null)
123 {
124 sendMessageToAllMembers(message, username, room);
125 }
126
127
128
129
130 if (message == null || join)
131 {
132 synchronized (member)
133 {
134 LOG.debug("Queue size: {}", member._queue.size());
135 if (member._queue.size() > 0)
136 {
137 sendSingleMessage(response, member);
138 }
139 else
140 {
141 LOG.debug("starting async");
142 AsyncContext async = request.startAsync();
143 async.setTimeout(asyncTimeout);
144 async.addListener(member);
145 member._async.set(async);
146 }
147 }
148 }
149 }
150
151 private Member getMember(String username, Map<String, Member> room)
152 {
153 Member member = room.get(username);
154 if (member == null)
155 {
156 LOG.debug("user: {} in room: {} doesn't exist. Creating new user.", username, room);
157 member = new Member(username);
158 room.put(username, member);
159 }
160 return member;
161 }
162
163 private Map<String, Member> getRoom(String path)
164 {
165 Map<String, Member> room = _rooms.get(path);
166 if (room == null)
167 {
168 LOG.debug("room: {} doesn't exist. Creating new room.", path);
169 room = new HashMap<>();
170 _rooms.put(path, room);
171 }
172 return room;
173 }
174
175 private void sendSingleMessage(HttpServletResponse response, Member member) throws IOException
176 {
177 response.setContentType("text/json;charset=utf-8");
178 StringBuilder buf = new StringBuilder();
179
180 buf.append("{\"from\":\"");
181 buf.append(member._queue.poll());
182 buf.append("\",");
183
184 String returnMessage = member._queue.poll();
185 int quote = returnMessage.indexOf('"');
186 while (quote >= 0)
187 {
188 returnMessage = returnMessage.substring(0, quote) + '\\' + returnMessage.substring(quote);
189 quote = returnMessage.indexOf('"', quote + 2);
190 }
191 buf.append("\"chat\":\"");
192 buf.append(returnMessage);
193 buf.append("\"}");
194 byte[] bytes = buf.toString().getBytes("utf-8");
195 response.setContentLength(bytes.length);
196 response.getOutputStream().write(bytes);
197 }
198
199 private void sendMessageToAllMembers(String message, String username, Map<String, Member> room)
200 {
201 LOG.debug("Sending message: {} from: {}", message, username);
202 for (Member m : room.values())
203 {
204 synchronized (m)
205 {
206 m._queue.add(username);
207 m._queue.add(message);
208
209
210 AsyncContext async = m._async.get();
211 LOG.debug("Async found: {}", async);
212 if (async != null & m._async.compareAndSet(async, null))
213 {
214 LOG.debug("dispatch");
215 async.dispatch();
216 }
217 }
218 }
219 }
220
221
222
223 @Override
224 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
225 {
226 if (request.getParameter("action") != null)
227 doPost(request, response);
228 else
229 getServletContext().getNamedDispatcher("default").forward(request, response);
230 }
231
232 }