1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package com.acme;
20
21 import java.io.IOException;
22 import java.util.HashMap;
23 import java.util.LinkedList;
24 import java.util.Map;
25 import java.util.Queue;
26 import java.util.concurrent.atomic.AtomicReference;
27 import javax.servlet.AsyncContext;
28 import javax.servlet.AsyncEvent;
29 import javax.servlet.AsyncListener;
30 import javax.servlet.ServletException;
31 import javax.servlet.http.HttpServlet;
32 import javax.servlet.http.HttpServletRequest;
33 import javax.servlet.http.HttpServletResponse;
34
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37
38
39
40
41 @SuppressWarnings("serial")
42 public class ChatServlet extends HttpServlet
43 {
44 private static final Logger LOG = Log.getLogger(ChatServlet.class);
45
46 private long asyncTimeout = 10000;
47
48 public void init()
49 {
50 String parameter = getServletConfig().getInitParameter("asyncTimeout");
51 if (parameter != null)
52 asyncTimeout = Long.parseLong(parameter);
53 }
54
55
56 class Member implements AsyncListener
57 {
58 final String _name;
59 final AtomicReference<AsyncContext> _async = new AtomicReference<>();
60 final Queue<String> _queue = new LinkedList<>();
61
62 Member(String name)
63 {
64 _name = name;
65 }
66
67 @Override
68 public void onTimeout(AsyncEvent event) throws IOException
69 {
70 LOG.debug("resume request");
71 AsyncContext async = _async.get();
72 if (async != null && _async.compareAndSet(async, null))
73 {
74 HttpServletResponse response = (HttpServletResponse)async.getResponse();
75 response.setContentType("text/json;charset=utf-8");
76 response.getOutputStream().write("{action:\"poll\"}".getBytes());
77 async.complete();
78 }
79 }
80
81 @Override
82 public void onStartAsync(AsyncEvent event) throws IOException
83 {
84 event.getAsyncContext().addListener(this);
85 }
86
87 @Override
88 public void onError(AsyncEvent event) throws IOException
89 {
90 }
91
92 @Override
93 public void onComplete(AsyncEvent event) throws IOException
94 {
95 }
96 }
97
98 Map<String, Map<String, Member>> _rooms = new HashMap<>();
99
100
101
102 @Override
103 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
104 {
105
106 boolean join = Boolean.parseBoolean(request.getParameter("join"));
107 String message = request.getParameter("message");
108 String username = request.getParameter("user");
109
110 LOG.debug("doPost called. join={},message={},username={}", join, message, username);
111 if (username == null)
112 {
113 LOG.debug("no paramter user set, sending 503");
114 response.sendError(503, "user==null");
115 return;
116 }
117
118 Map<String, Member> room = getRoom(request.getPathInfo());
119 Member member = getMember(username, room);
120
121 if (message != null)
122 {
123 sendMessageToAllMembers(message, username, room);
124 }
125
126
127
128
129 if (message == null || join)
130 {
131 synchronized (member)
132 {
133 LOG.debug("Queue size: {}", member._queue.size());
134 if (member._queue.size() > 0)
135 {
136 sendSingleMessage(response, member);
137 }
138 else
139 {
140 LOG.debug("starting async");
141 AsyncContext async = request.startAsync();
142 async.setTimeout(asyncTimeout);
143 async.addListener(member);
144 member._async.set(async);
145 }
146 }
147 }
148 }
149
150 private Member getMember(String username, Map<String, Member> room)
151 {
152 Member member = room.get(username);
153 if (member == null)
154 {
155 LOG.debug("user: {} in room: {} doesn't exist. Creating new user.", username, room);
156 member = new Member(username);
157 room.put(username, member);
158 }
159 return member;
160 }
161
162 private Map<String, Member> getRoom(String path)
163 {
164 Map<String, Member> room = _rooms.get(path);
165 if (room == null)
166 {
167 LOG.debug("room: {} doesn't exist. Creating new room.", path);
168 room = new HashMap<>();
169 _rooms.put(path, room);
170 }
171 return room;
172 }
173
174 private void sendSingleMessage(HttpServletResponse response, Member member) throws IOException
175 {
176 response.setContentType("text/json;charset=utf-8");
177 StringBuilder buf = new StringBuilder();
178
179 buf.append("{\"from\":\"");
180 buf.append(member._queue.poll());
181 buf.append("\",");
182
183 String returnMessage = member._queue.poll();
184 int quote = returnMessage.indexOf('"');
185 while (quote >= 0)
186 {
187 returnMessage = returnMessage.substring(0, quote) + '\\' + returnMessage.substring(quote);
188 quote = returnMessage.indexOf('"', quote + 2);
189 }
190 buf.append("\"chat\":\"");
191 buf.append(returnMessage);
192 buf.append("\"}");
193 byte[] bytes = buf.toString().getBytes("utf-8");
194 response.setContentLength(bytes.length);
195 response.getOutputStream().write(bytes);
196 }
197
198 private void sendMessageToAllMembers(String message, String username, Map<String, Member> room)
199 {
200 LOG.debug("Sending message: {} from: {}", message, username);
201 for (Member m : room.values())
202 {
203 synchronized (m)
204 {
205 m._queue.add(username);
206 m._queue.add(message);
207
208
209 AsyncContext async = m._async.get();
210 LOG.debug("Async found: {}", async);
211 if (async != null & m._async.compareAndSet(async, null))
212 {
213 LOG.debug("dispatch");
214 async.dispatch();
215 }
216 }
217 }
218 }
219
220
221
222 @Override
223 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
224 {
225 if (request.getParameter("action") != null)
226 doPost(request, response);
227 else
228 getServletContext().getNamedDispatcher("default").forward(request, response);
229 }
230
231 }