1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.charset.StandardCharsets;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29
30 import org.eclipse.jetty.io.ByteArrayEndPoint;
31 import org.eclipse.jetty.io.ByteBufferPool;
32 import org.eclipse.jetty.io.Connection;
33 import org.eclipse.jetty.util.BufferUtil;
34 import org.eclipse.jetty.util.ssl.SslContextFactory;
35 import org.eclipse.jetty.util.thread.Scheduler;
36
37 public class LocalConnector extends AbstractConnector
38 {
39 private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<>();
40
41
42 public LocalConnector(Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories)
43 {
44 super(server,executor,scheduler,pool,acceptors,factories);
45 setIdleTimeout(30000);
46 }
47
48 public LocalConnector(Server server)
49 {
50 this(server, null, null, null, -1, new HttpConnectionFactory());
51 }
52
53 public LocalConnector(Server server, SslContextFactory sslContextFactory)
54 {
55 this(server, null, null, null, -1,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
56 }
57
58 public LocalConnector(Server server, ConnectionFactory connectionFactory)
59 {
60 this(server, null, null, null, -1, connectionFactory);
61 }
62
63 public LocalConnector(Server server, ConnectionFactory connectionFactory, SslContextFactory sslContextFactory)
64 {
65 this(server, null, null, null, -1, AbstractConnectionFactory.getFactories(sslContextFactory,connectionFactory));
66 }
67
68 @Override
69 public Object getTransport()
70 {
71 return this;
72 }
73
74
75
76
77
78
79
80
81
82
83
84 public String getResponses(String requests) throws Exception
85 {
86 return getResponses(requests, 5, TimeUnit.SECONDS);
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100
101 public String getResponses(String requests,long idleFor,TimeUnit units) throws Exception
102 {
103 ByteBuffer result = getResponses(BufferUtil.toBuffer(requests,StandardCharsets.UTF_8),idleFor,units);
104 return result==null?null:BufferUtil.toString(result,StandardCharsets.UTF_8);
105 }
106
107
108
109
110
111
112
113
114
115
116
117 public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception
118 {
119 return getResponses(requestsBuffer, 5, TimeUnit.SECONDS);
120 }
121
122
123
124
125
126
127
128
129
130
131
132
133
134 public ByteBuffer getResponses(ByteBuffer requestsBuffer,long idleFor,TimeUnit units) throws Exception
135 {
136 if (LOG.isDebugEnabled())
137 LOG.debug("requests {}", BufferUtil.toUTF8String(requestsBuffer));
138 LocalEndPoint endp = executeRequest(requestsBuffer);
139 endp.waitUntilClosedOrIdleFor(idleFor,units);
140 ByteBuffer responses = endp.takeOutput();
141 if (endp.isOutputShutdown())
142 endp.close();
143 if (LOG.isDebugEnabled())
144 LOG.debug("responses {}", BufferUtil.toUTF8String(responses));
145 return responses;
146 }
147
148
149
150
151
152
153
154 public LocalEndPoint executeRequest(String rawRequest)
155 {
156 return executeRequest(BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8));
157 }
158
159 private LocalEndPoint executeRequest(ByteBuffer rawRequest)
160 {
161 if (!isStarted())
162 throw new IllegalStateException("!STARTED");
163 LocalEndPoint endp = new LocalEndPoint();
164 endp.addInput(rawRequest);
165 _connects.add(endp);
166 return endp;
167 }
168
169 @Override
170 protected void accept(int acceptorID) throws IOException, InterruptedException
171 {
172 if (LOG.isDebugEnabled())
173 LOG.debug("accepting {}", acceptorID);
174 LocalEndPoint endPoint = _connects.take();
175 endPoint.onOpen();
176 onEndPointOpened(endPoint);
177
178 Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint);
179 endPoint.setConnection(connection);
180
181 connection.onOpen();
182 }
183
184 public class LocalEndPoint extends ByteArrayEndPoint
185 {
186 private final CountDownLatch _closed = new CountDownLatch(1);
187
188 public LocalEndPoint()
189 {
190 super(LocalConnector.this.getScheduler(), LocalConnector.this.getIdleTimeout());
191 setGrowOutput(true);
192 }
193
194 protected void execute(Runnable task)
195 {
196 getExecutor().execute(task);
197 }
198
199 @Override
200 public void close()
201 {
202 boolean wasOpen=isOpen();
203 super.close();
204 if (wasOpen)
205 {
206 getConnection().onClose();
207 onClose();
208 }
209 }
210
211 @Override
212 public void onClose()
213 {
214 LocalConnector.this.onEndPointClosed(this);
215 super.onClose();
216 _closed.countDown();
217 }
218
219 @Override
220 public void shutdownOutput()
221 {
222 super.shutdownOutput();
223 close();
224 }
225
226 public void waitUntilClosed()
227 {
228 while (isOpen())
229 {
230 try
231 {
232 if (!_closed.await(10,TimeUnit.SECONDS))
233 break;
234 }
235 catch(Exception e)
236 {
237 LOG.warn(e);
238 }
239 }
240 }
241
242 public void waitUntilClosedOrIdleFor(long idleFor,TimeUnit units)
243 {
244 Thread.yield();
245 int size=getOutput().remaining();
246 while (isOpen())
247 {
248 try
249 {
250 if (!_closed.await(idleFor,units))
251 {
252 if (size==getOutput().remaining())
253 {
254 if (LOG.isDebugEnabled())
255 LOG.debug("idle for {} {}",idleFor,units);
256 return;
257 }
258 size=getOutput().remaining();
259 }
260 }
261 catch(Exception e)
262 {
263 LOG.warn(e);
264 }
265 }
266 }
267 }
268 }