1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.charset.StandardCharsets;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29
30 import org.eclipse.jetty.io.ByteArrayEndPoint;
31 import org.eclipse.jetty.io.ByteBufferPool;
32 import org.eclipse.jetty.io.Connection;
33 import org.eclipse.jetty.util.BufferUtil;
34 import org.eclipse.jetty.util.ssl.SslContextFactory;
35 import org.eclipse.jetty.util.thread.Scheduler;
36
37 public class LocalConnector extends AbstractConnector
38 {
39 private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<>();
40
41
42 public LocalConnector(Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories)
43 {
44 super(server,executor,scheduler,pool,acceptors,factories);
45 setIdleTimeout(30000);
46 }
47
48 public LocalConnector(Server server)
49 {
50 this(server, null, null, null, -1, new HttpConnectionFactory());
51 }
52
53 public LocalConnector(Server server, SslContextFactory sslContextFactory)
54 {
55 this(server, null, null, null, -1,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
56 }
57
58 public LocalConnector(Server server, ConnectionFactory connectionFactory)
59 {
60 this(server, null, null, null, -1, connectionFactory);
61 }
62
63 public LocalConnector(Server server, ConnectionFactory connectionFactory, SslContextFactory sslContextFactory)
64 {
65 this(server, null, null, null, -1, AbstractConnectionFactory.getFactories(sslContextFactory,connectionFactory));
66 }
67
68 @Override
69 public Object getTransport()
70 {
71 return this;
72 }
73
74
75
76
77
78
79
80
81
82
83
84 public String getResponses(String requests) throws Exception
85 {
86 return getResponses(requests, 5, TimeUnit.SECONDS);
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100
101 public String getResponses(String requests,long idleFor,TimeUnit units) throws Exception
102 {
103 ByteBuffer result = getResponses(BufferUtil.toBuffer(requests,StandardCharsets.UTF_8),idleFor,units);
104 return result==null?null:BufferUtil.toString(result,StandardCharsets.UTF_8);
105 }
106
107
108
109
110
111
112
113
114
115
116
117 public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception
118 {
119 return getResponses(requestsBuffer, 5, TimeUnit.SECONDS);
120 }
121
122
123
124
125
126
127
128
129
130
131
132
133
134 public ByteBuffer getResponses(ByteBuffer requestsBuffer,long idleFor,TimeUnit units) throws Exception
135 {
136 if (LOG.isDebugEnabled())
137 LOG.debug("requests {}", BufferUtil.toUTF8String(requestsBuffer));
138 LocalEndPoint endp = executeRequest(requestsBuffer);
139 endp.waitUntilClosedOrIdleFor(idleFor,units);
140 ByteBuffer responses = endp.takeOutput();
141 if (endp.isOutputShutdown())
142 endp.close();
143 if (LOG.isDebugEnabled())
144 LOG.debug("responses {}", BufferUtil.toUTF8String(responses));
145 return responses;
146 }
147
148
149
150
151
152
153
154 public LocalEndPoint executeRequest(String rawRequest)
155 {
156 return executeRequest(BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8));
157 }
158
159 private LocalEndPoint executeRequest(ByteBuffer rawRequest)
160 {
161 if (!isStarted())
162 throw new IllegalStateException("!STARTED");
163 LocalEndPoint endp = new LocalEndPoint();
164 endp.setInput(rawRequest);
165 _connects.add(endp);
166 return endp;
167 }
168
169 @Override
170 protected void accept(int acceptorID) throws IOException, InterruptedException
171 {
172 if (LOG.isDebugEnabled())
173 LOG.debug("accepting {}", acceptorID);
174 LocalEndPoint endPoint = _connects.take();
175 endPoint.onOpen();
176 onEndPointOpened(endPoint);
177
178 Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint);
179 endPoint.setConnection(connection);
180
181 connection.onOpen();
182 }
183
184 public class LocalEndPoint extends ByteArrayEndPoint
185 {
186 private final CountDownLatch _closed = new CountDownLatch(1);
187
188 public LocalEndPoint()
189 {
190 super(getScheduler(), LocalConnector.this.getIdleTimeout());
191 setGrowOutput(true);
192 }
193
194 public void addInput(String s)
195 {
196
197 while(getIn()==null || BufferUtil.hasContent(getIn()))
198 Thread.yield();
199 setInput(BufferUtil.toBuffer(s, StandardCharsets.UTF_8));
200 }
201
202 @Override
203 public void close()
204 {
205 boolean wasOpen=isOpen();
206 super.close();
207 if (wasOpen)
208 {
209
210 getConnection().onClose();
211 onClose();
212 }
213 }
214
215 @Override
216 public void onClose()
217 {
218 LocalConnector.this.onEndPointClosed(this);
219 super.onClose();
220 _closed.countDown();
221 }
222
223 @Override
224 public void shutdownOutput()
225 {
226 super.shutdownOutput();
227 close();
228 }
229
230 public void waitUntilClosed()
231 {
232 while (isOpen())
233 {
234 try
235 {
236 if (!_closed.await(10,TimeUnit.SECONDS))
237 break;
238 }
239 catch(Exception e)
240 {
241 LOG.warn(e);
242 }
243 }
244 }
245
246 public void waitUntilClosedOrIdleFor(long idleFor,TimeUnit units)
247 {
248 Thread.yield();
249 int size=getOutput().remaining();
250 while (isOpen())
251 {
252 try
253 {
254 if (!_closed.await(idleFor,units))
255 {
256 if (size==getOutput().remaining())
257 {
258 if (LOG.isDebugEnabled())
259 LOG.debug("idle for {} {}",idleFor,units);
260 return;
261 }
262 size=getOutput().remaining();
263 }
264 }
265 catch(Exception e)
266 {
267 LOG.warn(e);
268 }
269 }
270 }
271 }
272 }